From ab0455922ae01bde1a7a5b3bf58eb993efc02db7 Mon Sep 17 00:00:00 2001 From: Matt Holt Date: Sun, 6 Mar 2022 17:43:39 -0700 Subject: reverseproxy: Dynamic upstreams (with SRV and A/AAAA support) (#4470) * reverseproxy: Begin refactor to enable dynamic upstreams Streamed here: https://www.youtube.com/watch?v=hj7yzXb11jU * Implement SRV and A/AAA upstream sources Also get upstreams at every retry loop iteration instead of just once before the loop. See #4442. * Minor tweaks from review * Limit size of upstreams caches * Add doc notes deprecating LookupSRV * Provision dynamic upstreams Still WIP, preparing to preserve health checker functionality * Rejigger health checks Move active health check results into handler-specific Upstreams. Improve documentation regarding health checks and upstreams. * Deprecation notice * Add Caddyfile support, use `caddy.Duration` * Interface guards * Implement custom resolvers, add resolvers to http transport Caddyfile * SRV: fix Caddyfile `name` inline arg, remove proto condition * Use pointer receiver * Add debug logs Co-authored-by: Francis Lavoie --- modules/caddyhttp/reverseproxy/healthchecks.go | 96 ++++++++++++++------------ 1 file changed, 53 insertions(+), 43 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/healthchecks.go') diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 230bf3a..317b283 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "log" "net" "net/http" "net/url" @@ -37,12 +36,32 @@ import ( type HealthChecks struct { // Active health checks run in the background on a timer. To // minimally enable active health checks, set either path or - // port (or both). + // port (or both). Note that active health check status + // (healthy/unhealthy) is stored per-proxy-handler, not + // globally; this allows different handlers to use different + // criteria to decide what defines a healthy backend. + // + // Active health checks do not run for dynamic upstreams. Active *ActiveHealthChecks `json:"active,omitempty"` // Passive health checks monitor proxied requests for errors or timeouts. // To minimally enable passive health checks, specify at least an empty - // config object. + // config object. Passive health check state is shared (stored globally), + // so a failure from one handler will be counted by all handlers; but + // the tolerances or standards for what defines healthy/unhealthy backends + // is configured per-proxy-handler. + // + // Passive health checks technically do operate on dynamic upstreams, + // but are only effective for very busy proxies where the list of + // upstreams is mostly stable. This is because the shared/global + // state of upstreams is cleaned up when the upstreams are no longer + // used. Since dynamic upstreams are allocated dynamically at each + // request (specifically, each iteration of the proxy loop per request), + // they are also cleaned up after every request. Thus, if there is a + // moment when no requests are actively referring to a particular + // upstream host, the passive health check state will be reset because + // it will be garbage-collected. It is usually better for the dynamic + // upstream module to only return healthy, available backends instead. Passive *PassiveHealthChecks `json:"passive,omitempty"` } @@ -50,8 +69,7 @@ type HealthChecks struct { // health checks (that is, health checks which occur in a // background goroutine independently). type ActiveHealthChecks struct { - // The path to use for health checks. - // DEPRECATED: Use 'uri' instead. + // DEPRECATED: Use 'uri' instead. This field will be removed. TODO: remove this field Path string `json:"path,omitempty"` // The URI (path and query) to use for health checks @@ -132,7 +150,9 @@ type CircuitBreaker interface { func (h *Handler) activeHealthChecker() { defer func() { if err := recover(); err != nil { - log.Printf("[PANIC] active health checks: %v\n%s", err, debug.Stack()) + h.HealthChecks.Active.logger.Error("active health checker panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) } }() ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) @@ -155,7 +175,9 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { go func(upstream *Upstream) { defer func() { if err := recover(); err != nil { - log.Printf("[PANIC] active health check: %v\n%s", err, debug.Stack()) + h.HealthChecks.Active.logger.Error("active health check panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) } }() @@ -195,7 +217,7 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { // so use a fake Host value instead; unix sockets are usually local hostAddr = "localhost" } - err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: dialAddr}, hostAddr, upstream.Host) + err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: dialAddr}, hostAddr, upstream) if err != nil { h.HealthChecks.Active.logger.Error("active health check failed", zap.String("address", hostAddr), @@ -206,14 +228,14 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { } } -// doActiveHealthCheck performs a health check to host which +// doActiveHealthCheck performs a health check to upstream which // can be reached at address hostAddr. The actual address for // the request will be built according to active health checker // config. The health status of the host will be updated // according to whether it passes the health check. An error is // returned only if the health check fails to occur or if marking // the host's health status fails. -func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host Host) error { +func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstream *Upstream) error { // create the URL for the request that acts as a health check scheme := "http" if ht, ok := h.Transport.(TLSTransport); ok && ht.TLSEnabled() { @@ -269,10 +291,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host H zap.String("host", hostAddr), zap.Error(err), ) - _, err2 := host.SetHealthy(false) - if err2 != nil { - return fmt.Errorf("marking unhealthy: %v", err2) - } + upstream.setHealthy(false) return nil } var body io.Reader = resp.Body @@ -292,10 +311,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host H zap.Int("status_code", resp.StatusCode), zap.String("host", hostAddr), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { @@ -303,10 +319,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host H zap.Int("status_code", resp.StatusCode), zap.String("host", hostAddr), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } @@ -318,33 +331,21 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host H zap.String("host", hostAddr), zap.Error(err), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) { h.HealthChecks.Active.logger.Info("response body failed expectations", zap.String("host", hostAddr), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } } // passed health check parameters, so mark as healthy - swapped, err := host.SetHealthy(true) - if swapped { - h.HealthChecks.Active.logger.Info("host is up", - zap.String("host", hostAddr), - ) - } - if err != nil { - return fmt.Errorf("marking healthy: %v", err) + if upstream.setHealthy(true) { + h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr)) } return nil @@ -366,7 +367,7 @@ func (h *Handler) countFailure(upstream *Upstream) { } // count failure immediately - err := upstream.Host.CountFail(1) + err := upstream.Host.countFail(1) if err != nil { h.HealthChecks.Passive.logger.Error("could not count failure", zap.String("host", upstream.Dial), @@ -375,14 +376,23 @@ func (h *Handler) countFailure(upstream *Upstream) { } // forget it later - go func(host Host, failDuration time.Duration) { + go func(host *Host, failDuration time.Duration) { defer func() { if err := recover(); err != nil { - log.Printf("[PANIC] health check failure forgetter: %v\n%s", err, debug.Stack()) + h.HealthChecks.Passive.logger.Error("passive health check failure forgetter panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) } }() - time.Sleep(failDuration) - err := host.CountFail(-1) + timer := time.NewTimer(failDuration) + select { + case <-h.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + } + err := host.countFail(-1) if err != nil { h.HealthChecks.Passive.logger.Error("could not forget failure", zap.String("host", upstream.Dial), -- cgit v1.2.3