From ab0455922ae01bde1a7a5b3bf58eb993efc02db7 Mon Sep 17 00:00:00 2001 From: Matt Holt Date: Sun, 6 Mar 2022 17:43:39 -0700 Subject: reverseproxy: Dynamic upstreams (with SRV and A/AAAA support) (#4470) * reverseproxy: Begin refactor to enable dynamic upstreams Streamed here: https://www.youtube.com/watch?v=hj7yzXb11jU * Implement SRV and A/AAA upstream sources Also get upstreams at every retry loop iteration instead of just once before the loop. See #4442. * Minor tweaks from review * Limit size of upstreams caches * Add doc notes deprecating LookupSRV * Provision dynamic upstreams Still WIP, preparing to preserve health checker functionality * Rejigger health checks Move active health check results into handler-specific Upstreams. Improve documentation regarding health checks and upstreams. * Deprecation notice * Add Caddyfile support, use `caddy.Duration` * Interface guards * Implement custom resolvers, add resolvers to http transport Caddyfile * SRV: fix Caddyfile `name` inline arg, remove proto condition * Use pointer receiver * Add debug logs Co-authored-by: Francis Lavoie --- modules/caddyhttp/reverseproxy/reverseproxy.go | 274 ++++++++++++++++--------- 1 file changed, 177 insertions(+), 97 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/reverseproxy.go') diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index a5bdc31..3355f0b 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -78,9 +78,20 @@ type Handler struct { // up or down. Down backends will not be proxied to. HealthChecks *HealthChecks `json:"health_checks,omitempty"` - // Upstreams is the list of backends to proxy to. + // Upstreams is the static list of backends to proxy to. Upstreams UpstreamPool `json:"upstreams,omitempty"` + // A module for retrieving the list of upstreams dynamically. Dynamic + // upstreams are retrieved at every iteration of the proxy loop for + // each request (i.e. before every proxy attempt within every request). + // Active health checks do not work on dynamic upstreams, and passive + // health checks are only effective on dynamic upstreams if the proxy + // server is busy enough that concurrent requests to the same backends + // are continuous. Instead of health checks for dynamic upstreams, it + // is recommended that the dynamic upstream module only return available + // backends in the first place. + DynamicUpstreamsRaw json.RawMessage `json:"dynamic_upstreams,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + // Adjusts how often to flush the response buffer. By default, // no periodic flushing is done. A negative value disables // response buffering, and flushes immediately after each @@ -137,8 +148,9 @@ type Handler struct { // - `{http.reverse_proxy.header.*}` The headers from the response HandleResponse []caddyhttp.ResponseHandler `json:"handle_response,omitempty"` - Transport http.RoundTripper `json:"-"` - CB CircuitBreaker `json:"-"` + Transport http.RoundTripper `json:"-"` + CB CircuitBreaker `json:"-"` + DynamicUpstreams UpstreamSource `json:"-"` // Holds the parsed CIDR ranges from TrustedProxies trustedProxies []*net.IPNet @@ -166,7 +178,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.ctx = ctx h.logger = ctx.Logger(h) - // verify SRV compatibility + // verify SRV compatibility - TODO: LookupSRV deprecated; will be removed for i, v := range h.Upstreams { if v.LookupSRV == "" { continue @@ -201,6 +213,13 @@ func (h *Handler) Provision(ctx caddy.Context) error { } h.CB = mod.(CircuitBreaker) } + if h.DynamicUpstreamsRaw != nil { + mod, err := ctx.LoadModule(h, "DynamicUpstreamsRaw") + if err != nil { + return fmt.Errorf("loading upstream source module: %v", err) + } + h.DynamicUpstreams = mod.(UpstreamSource) + } // parse trusted proxy CIDRs ahead of time for _, str := range h.TrustedProxies { @@ -270,38 +289,8 @@ func (h *Handler) Provision(ctx caddy.Context) error { } // set up upstreams - for _, upstream := range h.Upstreams { - // create or get the host representation for this upstream - var host Host = new(upstreamHost) - existingHost, loaded := hosts.LoadOrStore(upstream.String(), host) - if loaded { - host = existingHost.(Host) - } - upstream.Host = host - - // give it the circuit breaker, if any - upstream.cb = h.CB - - // if the passive health checker has a non-zero UnhealthyRequestCount - // but the upstream has no MaxRequests set (they are the same thing, - // but the passive health checker is a default value for for upstreams - // without MaxRequests), copy the value into this upstream, since the - // value in the upstream (MaxRequests) is what is used during - // availability checks - if h.HealthChecks != nil && h.HealthChecks.Passive != nil { - h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive") - if h.HealthChecks.Passive.UnhealthyRequestCount > 0 && - upstream.MaxRequests == 0 { - upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount - } - } - - // upstreams need independent access to the passive - // health check policy because passive health checks - // run without access to h. - if h.HealthChecks != nil { - upstream.healthCheckPolicy = h.HealthChecks.Passive - } + for _, u := range h.Upstreams { + h.provisionUpstream(u) } if h.HealthChecks != nil { @@ -413,79 +402,127 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht repl.Set("http.reverse_proxy.duration", time.Since(start)) }() + // in the proxy loop, each iteration is an attempt to proxy the request, + // and because we may retry some number of times, carry over the error + // from previous tries because of the nuances of load balancing & retries var proxyErr error for { - // choose an available upstream - upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, clonedReq, w) - if upstream == nil { - if proxyErr == nil { - proxyErr = fmt.Errorf("no upstreams available") - } - if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, clonedReq) { - break - } - continue + var done bool + done, proxyErr = h.proxyLoopIteration(clonedReq, w, proxyErr, start, repl, reqHeader, reqHost, next) + if done { + break } + } + + if proxyErr != nil { + return statusError(proxyErr) + } - // the dial address may vary per-request if placeholders are - // used, so perform those replacements here; the resulting - // DialInfo struct should have valid network address syntax - dialInfo, err := upstream.fillDialInfo(clonedReq) + return nil +} + +// proxyLoopIteration implements an iteration of the proxy loop. Despite the enormous amount of local state +// that has to be passed in, we brought this into its own method so that we could run defer more easily. +// It returns true when the loop is done and should break; false otherwise. The error value returned should +// be assigned to the proxyErr value for the next iteration of the loop (or the error handled after break). +func (h *Handler) proxyLoopIteration(r *http.Request, w http.ResponseWriter, proxyErr error, start time.Time, + repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler) (bool, error) { + // get the updated list of upstreams + upstreams := h.Upstreams + if h.DynamicUpstreams != nil { + dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r) if err != nil { - return statusError(fmt.Errorf("making dial info: %v", err)) + h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err)) + } else { + upstreams = dUpstreams + for _, dUp := range dUpstreams { + h.provisionUpstream(dUp) + } + h.logger.Debug("provisioned dynamic upstreams", zap.Int("count", len(dUpstreams))) + defer func() { + // these upstreams are dynamic, so they are only used for this iteration + // of the proxy loop; be sure to let them go away when we're done with them + for _, upstream := range dUpstreams { + _, _ = hosts.Delete(upstream.String()) + } + }() } + } - // attach to the request information about how to dial the upstream; - // this is necessary because the information cannot be sufficiently - // or satisfactorily represented in a URL - caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo) - - // set placeholders with information about this upstream - repl.Set("http.reverse_proxy.upstream.address", dialInfo.String()) - repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address) - repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host) - repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port) - repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests()) - repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests) - repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails()) - - // mutate request headers according to this upstream; - // because we're in a retry loop, we have to copy - // headers (and the Host value) from the original - // so that each retry is identical to the first - if h.Headers != nil && h.Headers.Request != nil { - clonedReq.Header = make(http.Header) - copyHeader(clonedReq.Header, reqHeader) - clonedReq.Host = reqHost - h.Headers.Request.ApplyToRequest(clonedReq) + // choose an available upstream + upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w) + if upstream == nil { + if proxyErr == nil { + proxyErr = fmt.Errorf("no upstreams available") } - - // proxy the request to that upstream - proxyErr = h.reverseProxy(w, clonedReq, repl, dialInfo, next) - if proxyErr == nil || proxyErr == context.Canceled { - // context.Canceled happens when the downstream client - // cancels the request, which is not our failure - return nil + if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) { + return true, proxyErr } + return false, proxyErr + } - // if the roundtrip was successful, don't retry the request or - // ding the health status of the upstream (an error can still - // occur after the roundtrip if, for example, a response handler - // after the roundtrip returns an error) - if succ, ok := proxyErr.(roundtripSucceeded); ok { - return succ.error - } + // the dial address may vary per-request if placeholders are + // used, so perform those replacements here; the resulting + // DialInfo struct should have valid network address syntax + dialInfo, err := upstream.fillDialInfo(r) + if err != nil { + return true, fmt.Errorf("making dial info: %v", err) + } - // remember this failure (if enabled) - h.countFailure(upstream) + h.logger.Debug("selected upstream", + zap.String("dial", dialInfo.Address), + zap.Int("total_upstreams", len(upstreams))) - // if we've tried long enough, break - if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, clonedReq) { - break - } + // attach to the request information about how to dial the upstream; + // this is necessary because the information cannot be sufficiently + // or satisfactorily represented in a URL + caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo) + + // set placeholders with information about this upstream + repl.Set("http.reverse_proxy.upstream.address", dialInfo.String()) + repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address) + repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host) + repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port) + repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests()) + repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests) + repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails()) + + // mutate request headers according to this upstream; + // because we're in a retry loop, we have to copy + // headers (and the r.Host value) from the original + // so that each retry is identical to the first + if h.Headers != nil && h.Headers.Request != nil { + r.Header = make(http.Header) + copyHeader(r.Header, reqHeader) + r.Host = reqHost + h.Headers.Request.ApplyToRequest(r) } - return statusError(proxyErr) + // proxy the request to that upstream + proxyErr = h.reverseProxy(w, r, repl, dialInfo, next) + if proxyErr == nil || proxyErr == context.Canceled { + // context.Canceled happens when the downstream client + // cancels the request, which is not our failure + return true, nil + } + + // if the roundtrip was successful, don't retry the request or + // ding the health status of the upstream (an error can still + // occur after the roundtrip if, for example, a response handler + // after the roundtrip returns an error) + if succ, ok := proxyErr.(roundtripSucceeded); ok { + return true, succ.error + } + + // remember this failure (if enabled) + h.countFailure(upstream) + + // if we've tried long enough, break + if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) { + return true, proxyErr + } + + return false, proxyErr } // prepareRequest clones req so that it can be safely modified without @@ -651,9 +688,9 @@ func (h Handler) addForwardedHeaders(req *http.Request) error { // (This method is mostly the beginning of what was borrowed from the net/http/httputil package in the // Go standard library which was used as the foundation.) func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error { - _ = di.Upstream.Host.CountRequest(1) + _ = di.Upstream.Host.countRequest(1) //nolint:errcheck - defer di.Upstream.Host.CountRequest(-1) + defer di.Upstream.Host.countRequest(-1) // point the request to this upstream h.directRequest(req, di) @@ -905,6 +942,35 @@ func (Handler) directRequest(req *http.Request, di DialInfo) { req.URL.Host = reqHost } +func (h Handler) provisionUpstream(upstream *Upstream) { + // create or get the host representation for this upstream + upstream.fillHost() + + // give it the circuit breaker, if any + upstream.cb = h.CB + + // if the passive health checker has a non-zero UnhealthyRequestCount + // but the upstream has no MaxRequests set (they are the same thing, + // but the passive health checker is a default value for for upstreams + // without MaxRequests), copy the value into this upstream, since the + // value in the upstream (MaxRequests) is what is used during + // availability checks + if h.HealthChecks != nil && h.HealthChecks.Passive != nil { + h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive") + if h.HealthChecks.Passive.UnhealthyRequestCount > 0 && + upstream.MaxRequests == 0 { + upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount + } + } + + // upstreams need independent access to the passive + // health check policy because passive health checks + // run without access to h. + if h.HealthChecks != nil { + upstream.healthCheckPolicy = h.HealthChecks.Passive + } +} + // bufferedBody reads originalBody into a buffer, then returns a reader for the buffer. // Always close the return value when done with it, just like if it was the original body! func (h Handler) bufferedBody(originalBody io.ReadCloser) io.ReadCloser { @@ -1085,6 +1151,20 @@ type Selector interface { Select(UpstreamPool, *http.Request, http.ResponseWriter) *Upstream } +// UpstreamSource gets the list of upstreams that can be used when +// proxying a request. Returned upstreams will be load balanced and +// health-checked. This should be a very fast function -- instant +// if possible -- and the return value must be as stable as possible. +// In other words, the list of upstreams should ideally not change much +// across successive calls. If the list of upstreams changes or the +// ordering is not stable, load balancing will suffer. This function +// may be called during each retry, multiple times per request, and as +// such, needs to be instantaneous. The returned slice will not be +// modified. +type UpstreamSource interface { + GetUpstreams(*http.Request) ([]*Upstream, error) +} + // Hop-by-hop headers. These are removed when sent to the backend. // As of RFC 7230, hop-by-hop headers are required to appear in the // Connection header field. These are the headers defined by the -- cgit v1.2.3