summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/reverseproxy.go
diff options
context:
space:
mode:
authorMatt Holt <mholt@users.noreply.github.com>2022-03-06 17:43:39 -0700
committerGitHub <noreply@github.com>2022-03-06 17:43:39 -0700
commitab0455922ae01bde1a7a5b3bf58eb993efc02db7 (patch)
tree6ecfccc2d29d601fab557092545fddb51ba1ebea /modules/caddyhttp/reverseproxy/reverseproxy.go
parentc50094fc9d34099efd705700e6d2efa2fa065412 (diff)
reverseproxy: Dynamic upstreams (with SRV and A/AAAA support) (#4470)
* reverseproxy: Begin refactor to enable dynamic upstreams Streamed here: https://www.youtube.com/watch?v=hj7yzXb11jU * Implement SRV and A/AAA upstream sources Also get upstreams at every retry loop iteration instead of just once before the loop. See #4442. * Minor tweaks from review * Limit size of upstreams caches * Add doc notes deprecating LookupSRV * Provision dynamic upstreams Still WIP, preparing to preserve health checker functionality * Rejigger health checks Move active health check results into handler-specific Upstreams. Improve documentation regarding health checks and upstreams. * Deprecation notice * Add Caddyfile support, use `caddy.Duration` * Interface guards * Implement custom resolvers, add resolvers to http transport Caddyfile * SRV: fix Caddyfile `name` inline arg, remove proto condition * Use pointer receiver * Add debug logs Co-authored-by: Francis Lavoie <lavofr@gmail.com>
Diffstat (limited to 'modules/caddyhttp/reverseproxy/reverseproxy.go')
-rw-r--r--modules/caddyhttp/reverseproxy/reverseproxy.go274
1 files changed, 177 insertions, 97 deletions
diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go
index a5bdc31..3355f0b 100644
--- a/modules/caddyhttp/reverseproxy/reverseproxy.go
+++ b/modules/caddyhttp/reverseproxy/reverseproxy.go
@@ -78,9 +78,20 @@ type Handler struct {
// up or down. Down backends will not be proxied to.
HealthChecks *HealthChecks `json:"health_checks,omitempty"`
- // Upstreams is the list of backends to proxy to.
+ // Upstreams is the static list of backends to proxy to.
Upstreams UpstreamPool `json:"upstreams,omitempty"`
+ // A module for retrieving the list of upstreams dynamically. Dynamic
+ // upstreams are retrieved at every iteration of the proxy loop for
+ // each request (i.e. before every proxy attempt within every request).
+ // Active health checks do not work on dynamic upstreams, and passive
+ // health checks are only effective on dynamic upstreams if the proxy
+ // server is busy enough that concurrent requests to the same backends
+ // are continuous. Instead of health checks for dynamic upstreams, it
+ // is recommended that the dynamic upstream module only return available
+ // backends in the first place.
+ DynamicUpstreamsRaw json.RawMessage `json:"dynamic_upstreams,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`
+
// Adjusts how often to flush the response buffer. By default,
// no periodic flushing is done. A negative value disables
// response buffering, and flushes immediately after each
@@ -137,8 +148,9 @@ type Handler struct {
// - `{http.reverse_proxy.header.*}` The headers from the response
HandleResponse []caddyhttp.ResponseHandler `json:"handle_response,omitempty"`
- Transport http.RoundTripper `json:"-"`
- CB CircuitBreaker `json:"-"`
+ Transport http.RoundTripper `json:"-"`
+ CB CircuitBreaker `json:"-"`
+ DynamicUpstreams UpstreamSource `json:"-"`
// Holds the parsed CIDR ranges from TrustedProxies
trustedProxies []*net.IPNet
@@ -166,7 +178,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.ctx = ctx
h.logger = ctx.Logger(h)
- // verify SRV compatibility
+ // verify SRV compatibility - TODO: LookupSRV deprecated; will be removed
for i, v := range h.Upstreams {
if v.LookupSRV == "" {
continue
@@ -201,6 +213,13 @@ func (h *Handler) Provision(ctx caddy.Context) error {
}
h.CB = mod.(CircuitBreaker)
}
+ if h.DynamicUpstreamsRaw != nil {
+ mod, err := ctx.LoadModule(h, "DynamicUpstreamsRaw")
+ if err != nil {
+ return fmt.Errorf("loading upstream source module: %v", err)
+ }
+ h.DynamicUpstreams = mod.(UpstreamSource)
+ }
// parse trusted proxy CIDRs ahead of time
for _, str := range h.TrustedProxies {
@@ -270,38 +289,8 @@ func (h *Handler) Provision(ctx caddy.Context) error {
}
// set up upstreams
- for _, upstream := range h.Upstreams {
- // create or get the host representation for this upstream
- var host Host = new(upstreamHost)
- existingHost, loaded := hosts.LoadOrStore(upstream.String(), host)
- if loaded {
- host = existingHost.(Host)
- }
- upstream.Host = host
-
- // give it the circuit breaker, if any
- upstream.cb = h.CB
-
- // if the passive health checker has a non-zero UnhealthyRequestCount
- // but the upstream has no MaxRequests set (they are the same thing,
- // but the passive health checker is a default value for for upstreams
- // without MaxRequests), copy the value into this upstream, since the
- // value in the upstream (MaxRequests) is what is used during
- // availability checks
- if h.HealthChecks != nil && h.HealthChecks.Passive != nil {
- h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive")
- if h.HealthChecks.Passive.UnhealthyRequestCount > 0 &&
- upstream.MaxRequests == 0 {
- upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
- }
- }
-
- // upstreams need independent access to the passive
- // health check policy because passive health checks
- // run without access to h.
- if h.HealthChecks != nil {
- upstream.healthCheckPolicy = h.HealthChecks.Passive
- }
+ for _, u := range h.Upstreams {
+ h.provisionUpstream(u)
}
if h.HealthChecks != nil {
@@ -413,79 +402,127 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
repl.Set("http.reverse_proxy.duration", time.Since(start))
}()
+ // in the proxy loop, each iteration is an attempt to proxy the request,
+ // and because we may retry some number of times, carry over the error
+ // from previous tries because of the nuances of load balancing & retries
var proxyErr error
for {
- // choose an available upstream
- upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, clonedReq, w)
- if upstream == nil {
- if proxyErr == nil {
- proxyErr = fmt.Errorf("no upstreams available")
- }
- if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, clonedReq) {
- break
- }
- continue
+ var done bool
+ done, proxyErr = h.proxyLoopIteration(clonedReq, w, proxyErr, start, repl, reqHeader, reqHost, next)
+ if done {
+ break
}
+ }
+
+ if proxyErr != nil {
+ return statusError(proxyErr)
+ }
- // the dial address may vary per-request if placeholders are
- // used, so perform those replacements here; the resulting
- // DialInfo struct should have valid network address syntax
- dialInfo, err := upstream.fillDialInfo(clonedReq)
+ return nil
+}
+
+// proxyLoopIteration implements an iteration of the proxy loop. Despite the enormous amount of local state
+// that has to be passed in, we brought this into its own method so that we could run defer more easily.
+// It returns true when the loop is done and should break; false otherwise. The error value returned should
+// be assigned to the proxyErr value for the next iteration of the loop (or the error handled after break).
+func (h *Handler) proxyLoopIteration(r *http.Request, w http.ResponseWriter, proxyErr error, start time.Time,
+ repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler) (bool, error) {
+ // get the updated list of upstreams
+ upstreams := h.Upstreams
+ if h.DynamicUpstreams != nil {
+ dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r)
if err != nil {
- return statusError(fmt.Errorf("making dial info: %v", err))
+ h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err))
+ } else {
+ upstreams = dUpstreams
+ for _, dUp := range dUpstreams {
+ h.provisionUpstream(dUp)
+ }
+ h.logger.Debug("provisioned dynamic upstreams", zap.Int("count", len(dUpstreams)))
+ defer func() {
+ // these upstreams are dynamic, so they are only used for this iteration
+ // of the proxy loop; be sure to let them go away when we're done with them
+ for _, upstream := range dUpstreams {
+ _, _ = hosts.Delete(upstream.String())
+ }
+ }()
}
+ }
- // attach to the request information about how to dial the upstream;
- // this is necessary because the information cannot be sufficiently
- // or satisfactorily represented in a URL
- caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo)
-
- // set placeholders with information about this upstream
- repl.Set("http.reverse_proxy.upstream.address", dialInfo.String())
- repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address)
- repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host)
- repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port)
- repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests())
- repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests)
- repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails())
-
- // mutate request headers according to this upstream;
- // because we're in a retry loop, we have to copy
- // headers (and the Host value) from the original
- // so that each retry is identical to the first
- if h.Headers != nil && h.Headers.Request != nil {
- clonedReq.Header = make(http.Header)
- copyHeader(clonedReq.Header, reqHeader)
- clonedReq.Host = reqHost
- h.Headers.Request.ApplyToRequest(clonedReq)
+ // choose an available upstream
+ upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w)
+ if upstream == nil {
+ if proxyErr == nil {
+ proxyErr = fmt.Errorf("no upstreams available")
}
-
- // proxy the request to that upstream
- proxyErr = h.reverseProxy(w, clonedReq, repl, dialInfo, next)
- if proxyErr == nil || proxyErr == context.Canceled {
- // context.Canceled happens when the downstream client
- // cancels the request, which is not our failure
- return nil
+ if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) {
+ return true, proxyErr
}
+ return false, proxyErr
+ }
- // if the roundtrip was successful, don't retry the request or
- // ding the health status of the upstream (an error can still
- // occur after the roundtrip if, for example, a response handler
- // after the roundtrip returns an error)
- if succ, ok := proxyErr.(roundtripSucceeded); ok {
- return succ.error
- }
+ // the dial address may vary per-request if placeholders are
+ // used, so perform those replacements here; the resulting
+ // DialInfo struct should have valid network address syntax
+ dialInfo, err := upstream.fillDialInfo(r)
+ if err != nil {
+ return true, fmt.Errorf("making dial info: %v", err)
+ }
- // remember this failure (if enabled)
- h.countFailure(upstream)
+ h.logger.Debug("selected upstream",
+ zap.String("dial", dialInfo.Address),
+ zap.Int("total_upstreams", len(upstreams)))
- // if we've tried long enough, break
- if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, clonedReq) {
- break
- }
+ // attach to the request information about how to dial the upstream;
+ // this is necessary because the information cannot be sufficiently
+ // or satisfactorily represented in a URL
+ caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo)
+
+ // set placeholders with information about this upstream
+ repl.Set("http.reverse_proxy.upstream.address", dialInfo.String())
+ repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address)
+ repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host)
+ repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port)
+ repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests())
+ repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests)
+ repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails())
+
+ // mutate request headers according to this upstream;
+ // because we're in a retry loop, we have to copy
+ // headers (and the r.Host value) from the original
+ // so that each retry is identical to the first
+ if h.Headers != nil && h.Headers.Request != nil {
+ r.Header = make(http.Header)
+ copyHeader(r.Header, reqHeader)
+ r.Host = reqHost
+ h.Headers.Request.ApplyToRequest(r)
}
- return statusError(proxyErr)
+ // proxy the request to that upstream
+ proxyErr = h.reverseProxy(w, r, repl, dialInfo, next)
+ if proxyErr == nil || proxyErr == context.Canceled {
+ // context.Canceled happens when the downstream client
+ // cancels the request, which is not our failure
+ return true, nil
+ }
+
+ // if the roundtrip was successful, don't retry the request or
+ // ding the health status of the upstream (an error can still
+ // occur after the roundtrip if, for example, a response handler
+ // after the roundtrip returns an error)
+ if succ, ok := proxyErr.(roundtripSucceeded); ok {
+ return true, succ.error
+ }
+
+ // remember this failure (if enabled)
+ h.countFailure(upstream)
+
+ // if we've tried long enough, break
+ if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) {
+ return true, proxyErr
+ }
+
+ return false, proxyErr
}
// prepareRequest clones req so that it can be safely modified without
@@ -651,9 +688,9 @@ func (h Handler) addForwardedHeaders(req *http.Request) error {
// (This method is mostly the beginning of what was borrowed from the net/http/httputil package in the
// Go standard library which was used as the foundation.)
func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error {
- _ = di.Upstream.Host.CountRequest(1)
+ _ = di.Upstream.Host.countRequest(1)
//nolint:errcheck
- defer di.Upstream.Host.CountRequest(-1)
+ defer di.Upstream.Host.countRequest(-1)
// point the request to this upstream
h.directRequest(req, di)
@@ -905,6 +942,35 @@ func (Handler) directRequest(req *http.Request, di DialInfo) {
req.URL.Host = reqHost
}
+func (h Handler) provisionUpstream(upstream *Upstream) {
+ // create or get the host representation for this upstream
+ upstream.fillHost()
+
+ // give it the circuit breaker, if any
+ upstream.cb = h.CB
+
+ // if the passive health checker has a non-zero UnhealthyRequestCount
+ // but the upstream has no MaxRequests set (they are the same thing,
+ // but the passive health checker is a default value for for upstreams
+ // without MaxRequests), copy the value into this upstream, since the
+ // value in the upstream (MaxRequests) is what is used during
+ // availability checks
+ if h.HealthChecks != nil && h.HealthChecks.Passive != nil {
+ h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive")
+ if h.HealthChecks.Passive.UnhealthyRequestCount > 0 &&
+ upstream.MaxRequests == 0 {
+ upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
+ }
+ }
+
+ // upstreams need independent access to the passive
+ // health check policy because passive health checks
+ // run without access to h.
+ if h.HealthChecks != nil {
+ upstream.healthCheckPolicy = h.HealthChecks.Passive
+ }
+}
+
// bufferedBody reads originalBody into a buffer, then returns a reader for the buffer.
// Always close the return value when done with it, just like if it was the original body!
func (h Handler) bufferedBody(originalBody io.ReadCloser) io.ReadCloser {
@@ -1085,6 +1151,20 @@ type Selector interface {
Select(UpstreamPool, *http.Request, http.ResponseWriter) *Upstream
}
+// UpstreamSource gets the list of upstreams that can be used when
+// proxying a request. Returned upstreams will be load balanced and
+// health-checked. This should be a very fast function -- instant
+// if possible -- and the return value must be as stable as possible.
+// In other words, the list of upstreams should ideally not change much
+// across successive calls. If the list of upstreams changes or the
+// ordering is not stable, load balancing will suffer. This function
+// may be called during each retry, multiple times per request, and as
+// such, needs to be instantaneous. The returned slice will not be
+// modified.
+type UpstreamSource interface {
+ GetUpstreams(*http.Request) ([]*Upstream, error)
+}
+
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the