From 7cca291d62c910c0544f0c0169a8f0c81627e5d3 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Sun, 23 Feb 2020 14:30:52 -0700 Subject: reverse_proxy: Health checks: Don't cross the streams Fixes https://caddy.community/t/v2-health-checks-are-going-to-the-wrong-upstream/7084?u=matt ... I think --- modules/caddyhttp/reverseproxy/healthchecks.go | 31 ++++++------ modules/caddyhttp/reverseproxy/reverseproxy.go | 65 +++++++++++++------------- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index a3b57e1..85274c6 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -124,26 +124,25 @@ type CircuitBreaker interface { // h.HealthChecks.Active.stopChan is closed. func (h *Handler) activeHealthChecker() { ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) - h.doActiveHealthChecksForAllHosts() + h.doActiveHealthCheckForAllHosts() for { select { case <-ticker.C: - h.doActiveHealthChecksForAllHosts() + h.doActiveHealthCheckForAllHosts() case <-h.HealthChecks.Active.stopChan: + // TODO: consider using a Context for cancellation instead ticker.Stop() return } } } -// doActiveHealthChecksForAllHosts immediately performs a -// health checks for all hosts in the global repository. -func (h *Handler) doActiveHealthChecksForAllHosts() { - hosts.Range(func(key, value interface{}) bool { - networkAddr := key.(string) - host := value.(Host) - - go func(networkAddr string, host Host) { +// doActiveHealthCheckForAllHosts immediately performs a +// health checks for all upstream hosts configured by h. +func (h *Handler) doActiveHealthCheckForAllHosts() { + for _, upstream := range h.Upstreams { + go func(upstream *Upstream) { + networkAddr := upstream.Dial addr, err := caddy.ParseNetworkAddress(networkAddr) if err != nil { h.HealthChecks.Active.logger.Error("bad network address", @@ -165,18 +164,15 @@ func (h *Handler) doActiveHealthChecksForAllHosts() { // so use a fake Host value instead; unix sockets are usually local hostAddr = "localhost" } - err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: hostAddr}, hostAddr, host) + err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: hostAddr}, hostAddr, upstream.Host) if err != nil { h.HealthChecks.Active.logger.Error("active health check failed", zap.String("address", networkAddr), zap.Error(err), ) } - }(networkAddr, host) - - // continue to iterate all hosts - return true - }) + }(upstream) + } } // doActiveHealthCheck performs a health check to host which @@ -209,7 +205,8 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host H u.Host = net.JoinHostPort(host, portStr) } - // attach dialing information to this request + // attach dialing information to this request - TODO: use caddy.Context's context + // so it can be canceled on config reload ctx := context.Background() ctx = context.WithValue(ctx, caddy.ReplacerCtxKey, caddy.NewReplacer()) ctx = context.WithValue(ctx, caddyhttp.VarsCtxKey, map[string]interface{}{ diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index acb5213..3601212 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -168,38 +168,6 @@ func (h *Handler) Provision(ctx caddy.Context) error { return err } - // if active health checks are enabled, configure them and start a worker - if h.HealthChecks != nil && - h.HealthChecks.Active != nil && - (h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) { - h.HealthChecks.Active.logger = h.logger.Named("health_checker.active") - - timeout := time.Duration(h.HealthChecks.Active.Timeout) - if timeout == 0 { - timeout = 5 * time.Second - } - - h.HealthChecks.Active.stopChan = make(chan struct{}) - h.HealthChecks.Active.httpClient = &http.Client{ - Timeout: timeout, - Transport: h.Transport, - } - - if h.HealthChecks.Active.Interval == 0 { - h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second) - } - - if h.HealthChecks.Active.ExpectBody != "" { - var err error - h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody) - if err != nil { - return fmt.Errorf("expect_body: compiling regular expression: %v", err) - } - } - - go h.activeHealthChecker() - } - // set up upstreams for _, upstream := range h.Upstreams { // create or get the host representation for this upstream @@ -235,6 +203,38 @@ func (h *Handler) Provision(ctx caddy.Context) error { } } + // if active health checks are enabled, configure them and start a worker + if h.HealthChecks != nil && + h.HealthChecks.Active != nil && + (h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) { + h.HealthChecks.Active.logger = h.logger.Named("health_checker.active") + + timeout := time.Duration(h.HealthChecks.Active.Timeout) + if timeout == 0 { + timeout = 5 * time.Second + } + + h.HealthChecks.Active.stopChan = make(chan struct{}) + h.HealthChecks.Active.httpClient = &http.Client{ + Timeout: timeout, + Transport: h.Transport, + } + + if h.HealthChecks.Active.Interval == 0 { + h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second) + } + + if h.HealthChecks.Active.ExpectBody != "" { + var err error + h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody) + if err != nil { + return fmt.Errorf("expect_body: compiling regular expression: %v", err) + } + } + + go h.activeHealthChecker() + } + return nil } @@ -244,6 +244,7 @@ func (h *Handler) Cleanup() error { if h.HealthChecks != nil && h.HealthChecks.Active != nil && h.HealthChecks.Active.stopChan != nil { + // TODO: consider using context cancellation, could be much simpler close(h.HealthChecks.Active.stopChan) } -- cgit v1.2.3