From 0830fbad0347ead1dbea60e664556b263e44653f Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Thu, 5 Sep 2019 13:14:39 -0600 Subject: Reconcile upstream dial addresses and request host/URL information My goodness that was complicated Blessed be request.Context Sort of --- modules/caddyhttp/reverseproxy/reverseproxy.go | 130 ++++++++++++++++--------- 1 file changed, 82 insertions(+), 48 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/reverseproxy.go') diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 7bf9a2f..5a37613 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -20,7 +20,6 @@ import ( "fmt" "net" "net/http" - "net/url" "regexp" "strings" "time" @@ -86,7 +85,18 @@ func (h *Handler) Provision(ctx caddy.Context) error { } if h.Transport == nil { - h.Transport = defaultTransport + t := &HTTPTransport{ + KeepAlive: &KeepAlive{ + ProbeInterval: caddy.Duration(30 * time.Second), + IdleConnTimeout: caddy.Duration(2 * time.Minute), + }, + DialTimeout: caddy.Duration(10 * time.Second), + } + err := t.Provision(ctx) + if err != nil { + return fmt.Errorf("provisioning default transport: %v", err) + } + h.Transport = t } if h.LoadBalancing == nil { @@ -133,51 +143,65 @@ func (h *Handler) Provision(ctx caddy.Context) error { go h.activeHealthChecker() } + var allUpstreams []*Upstream for _, upstream := range h.Upstreams { - upstream.cb = h.CB - - // url parser requires a scheme - if !strings.Contains(upstream.Address, "://") { - upstream.Address = "http://" + upstream.Address - } - u, err := url.Parse(upstream.Address) + // upstreams are allowed to map to only a single host, + // but an upstream's address may semantically represent + // multiple addresses, so make sure to handle each + // one in turn based on this one upstream config + network, addresses, err := caddy.ParseNetworkAddress(upstream.Dial) if err != nil { - return fmt.Errorf("invalid upstream address %s: %v", upstream.Address, err) - } - upstream.hostURL = u - - // if host already exists from a current config, - // use that instead; otherwise, add it - // TODO: make hosts modular, so that their state can be distributed in enterprise for example - // TODO: If distributed, the pool should be stored in storage... - var host Host = new(upstreamHost) - activeHost, loaded := hosts.LoadOrStore(u.String(), host) - if loaded { - host = activeHost.(Host) - } - upstream.Host = host - - // if the passive health checker has a non-zero "unhealthy - // request count" but the upstream has no MaxRequests set - // (they are the same thing, but one is a default value for - // for upstreams with a zero MaxRequests), copy the default - // value into this upstream, since the value in the upstream - // is what is used during availability checks - if h.HealthChecks != nil && - h.HealthChecks.Passive != nil && - h.HealthChecks.Passive.UnhealthyRequestCount > 0 && - upstream.MaxRequests == 0 { - upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount + return fmt.Errorf("parsing dial address: %v", err) } - if h.HealthChecks != nil { + for _, addr := range addresses { + // make a new upstream based on the original + // that has a singular dial address + upstreamCopy := *upstream + upstreamCopy.dialInfo = DialInfo{network, addr} + upstreamCopy.Dial = upstreamCopy.dialInfo.String() + upstreamCopy.cb = h.CB + + // if host already exists from a current config, + // use that instead; otherwise, add it + // TODO: make hosts modular, so that their state can be distributed in enterprise for example + // TODO: If distributed, the pool should be stored in storage... + var host Host = new(upstreamHost) + activeHost, loaded := hosts.LoadOrStore(upstreamCopy.Dial, host) + if loaded { + host = activeHost.(Host) + } + upstreamCopy.Host = host + + // if the passive health checker has a non-zero "unhealthy + // request count" but the upstream has no MaxRequests set + // (they are the same thing, but one is a default value for + // for upstreams with a zero MaxRequests), copy the default + // value into this upstream, since the value in the upstream + // (MaxRequests) is what is used during availability checks + if h.HealthChecks != nil && + h.HealthChecks.Passive != nil && + h.HealthChecks.Passive.UnhealthyRequestCount > 0 && + upstreamCopy.MaxRequests == 0 { + upstreamCopy.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount + } + // upstreams need independent access to the passive - // health check policy so they can, you know, passively - // do health checks - upstream.healthCheckPolicy = h.HealthChecks.Passive + // health check policy because they run outside of the + // scope of a request handler + if h.HealthChecks != nil { + upstreamCopy.healthCheckPolicy = h.HealthChecks.Passive + } + + allUpstreams = append(allUpstreams, &upstreamCopy) } } + // replace the unmarshaled upstreams (possible 1:many + // address mapping) with our list, which is mapped 1:1, + // thus may have expanded the original list + h.Upstreams = allUpstreams + return nil } @@ -192,7 +216,7 @@ func (h *Handler) Cleanup() error { // remove hosts from our config from the pool for _, upstream := range h.Upstreams { - hosts.Delete(upstream.hostURL.String()) + hosts.Delete(upstream.dialInfo.String()) } return nil @@ -222,6 +246,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht continue } + // attach to the request information about how to dial the upstream; + // this is necessary because the information cannot be sufficiently + // or satisfactorily represented in a URL + ctx := context.WithValue(r.Context(), DialInfoCtxKey, upstream.dialInfo) + r = r.WithContext(ctx) + // proxy the request to that upstream proxyErr = h.reverseProxy(w, r, upstream) if proxyErr == nil || proxyErr == context.Canceled { @@ -249,6 +279,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // This assumes that no mutations of the request are performed // by h during or after proxying. func (h Handler) prepareRequest(req *http.Request) error { + // as a special (but very common) case, if the transport + // is HTTP, then ensure the request has the proper scheme + // because incoming requests by default are lacking it + if req.URL.Scheme == "" { + req.URL.Scheme = "http" + if ht, ok := h.Transport.(*HTTPTransport); ok && ht.TLS != nil { + req.URL.Scheme = "https" + } + } + if req.ContentLength == 0 { req.Body = nil // Issue golang/go#16036: nil Body for http.Transport retries } @@ -433,14 +473,8 @@ func (h Handler) tryAgain(start time.Time, proxyErr error) bool { // directRequest modifies only req.URL so that it points to the // given upstream host. It must modify ONLY the request URL. func (h Handler) directRequest(req *http.Request, upstream *Upstream) { - target := upstream.hostURL - req.URL.Scheme = target.Scheme - req.URL.Host = target.Host - req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) // TODO: This might be a bug (if any part of the path was augmented from a previously-tried upstream; need to start from clean original path of request, same for query string!) - if target.RawQuery == "" || req.URL.RawQuery == "" { - req.URL.RawQuery = target.RawQuery + req.URL.RawQuery - } else { - req.URL.RawQuery = target.RawQuery + "&" + req.URL.RawQuery + if req.URL.Host == "" { + req.URL.Host = upstream.dialInfo.Address } } -- cgit v1.2.3