From ab0455922ae01bde1a7a5b3bf58eb993efc02db7 Mon Sep 17 00:00:00 2001 From: Matt Holt Date: Sun, 6 Mar 2022 17:43:39 -0700 Subject: reverseproxy: Dynamic upstreams (with SRV and A/AAAA support) (#4470) * reverseproxy: Begin refactor to enable dynamic upstreams Streamed here: https://www.youtube.com/watch?v=hj7yzXb11jU * Implement SRV and A/AAA upstream sources Also get upstreams at every retry loop iteration instead of just once before the loop. See #4442. * Minor tweaks from review * Limit size of upstreams caches * Add doc notes deprecating LookupSRV * Provision dynamic upstreams Still WIP, preparing to preserve health checker functionality * Rejigger health checks Move active health check results into handler-specific Upstreams. Improve documentation regarding health checks and upstreams. * Deprecation notice * Add Caddyfile support, use `caddy.Duration` * Interface guards * Implement custom resolvers, add resolvers to http transport Caddyfile * SRV: fix Caddyfile `name` inline arg, remove proto condition * Use pointer receiver * Add debug logs Co-authored-by: Francis Lavoie --- modules/caddyhttp/reverseproxy/hosts.go | 99 ++++++++++++++------------------- 1 file changed, 41 insertions(+), 58 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/hosts.go') diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index b9817d2..a973ecb 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -26,44 +26,14 @@ import ( "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) -// Host represents a remote host which can be proxied to. -// Its methods must be safe for concurrent use. -type Host interface { - // NumRequests returns the number of requests - // currently in process with the host. - NumRequests() int - - // Fails returns the count of recent failures. - Fails() int - - // Unhealthy returns true if the backend is unhealthy. - Unhealthy() bool - - // CountRequest atomically counts the given number of - // requests as currently in process with the host. The - // count should not go below 0. - CountRequest(int) error - - // CountFail atomically counts the given number of - // failures with the host. The count should not go - // below 0. - CountFail(int) error - - // SetHealthy atomically marks the host as either - // healthy (true) or unhealthy (false). If the given - // status is the same, this should be a no-op and - // return false. It returns true if the status was - // changed; i.e. if it is now different from before. - SetHealthy(bool) (bool, error) -} - // UpstreamPool is a collection of upstreams. type UpstreamPool []*Upstream // Upstream bridges this proxy's configuration to the // state of the backend host it is correlated with. +// Upstream values must not be copied. type Upstream struct { - Host `json:"-"` + *Host `json:"-"` // The [network address](/docs/conventions#network-addresses) // to dial to connect to the upstream. Must represent precisely @@ -77,6 +47,10 @@ type Upstream struct { // backends is down. Also be aware of open proxy vulnerabilities. Dial string `json:"dial,omitempty"` + // DEPRECATED: Use the SRVUpstreams module instead + // (http.reverse_proxy.upstreams.srv). This field will be + // removed in a future version of Caddy. TODO: Remove this field. + // // If DNS SRV records are used for service discovery with this // upstream, specify the DNS name for which to look up SRV // records here, instead of specifying a dial address. @@ -95,6 +69,7 @@ type Upstream struct { activeHealthCheckPort int healthCheckPolicy *PassiveHealthChecks cb CircuitBreaker + unhealthy int32 // accessed atomically; status from active health checker } func (u Upstream) String() string { @@ -117,7 +92,7 @@ func (u *Upstream) Available() bool { // is currently known to be healthy or "up". // It consults the circuit breaker, if any. func (u *Upstream) Healthy() bool { - healthy := !u.Host.Unhealthy() + healthy := u.healthy() if healthy && u.healthCheckPolicy != nil { healthy = u.Host.Fails() < u.healthCheckPolicy.MaxFails } @@ -142,7 +117,7 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) { var addr caddy.NetworkAddress if u.LookupSRV != "" { - // perform DNS lookup for SRV records and choose one + // perform DNS lookup for SRV records and choose one - TODO: deprecated srvName := repl.ReplaceAll(u.LookupSRV, "") _, records, err := net.DefaultResolver.LookupSRV(r.Context(), "", "", srvName) if err != nil { @@ -174,59 +149,67 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) { }, nil } -// upstreamHost is the basic, in-memory representation -// of the state of a remote host. It implements the -// Host interface. -type upstreamHost struct { +func (u *Upstream) fillHost() { + host := new(Host) + existingHost, loaded := hosts.LoadOrStore(u.String(), host) + if loaded { + host = existingHost.(*Host) + } + u.Host = host +} + +// Host is the basic, in-memory representation of the state of a remote host. +// Its fields are accessed atomically and Host values must not be copied. +type Host struct { numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) fails int64 - unhealthy int32 } // NumRequests returns the number of active requests to the upstream. -func (uh *upstreamHost) NumRequests() int { - return int(atomic.LoadInt64(&uh.numRequests)) +func (h *Host) NumRequests() int { + return int(atomic.LoadInt64(&h.numRequests)) } // Fails returns the number of recent failures with the upstream. -func (uh *upstreamHost) Fails() int { - return int(atomic.LoadInt64(&uh.fails)) -} - -// Unhealthy returns whether the upstream is healthy. -func (uh *upstreamHost) Unhealthy() bool { - return atomic.LoadInt32(&uh.unhealthy) == 1 +func (h *Host) Fails() int { + return int(atomic.LoadInt64(&h.fails)) } -// CountRequest mutates the active request count by +// countRequest mutates the active request count by // delta. It returns an error if the adjustment fails. -func (uh *upstreamHost) CountRequest(delta int) error { - result := atomic.AddInt64(&uh.numRequests, int64(delta)) +func (h *Host) countRequest(delta int) error { + result := atomic.AddInt64(&h.numRequests, int64(delta)) if result < 0 { return fmt.Errorf("count below 0: %d", result) } return nil } -// CountFail mutates the recent failures count by +// countFail mutates the recent failures count by // delta. It returns an error if the adjustment fails. -func (uh *upstreamHost) CountFail(delta int) error { - result := atomic.AddInt64(&uh.fails, int64(delta)) +func (h *Host) countFail(delta int) error { + result := atomic.AddInt64(&h.fails, int64(delta)) if result < 0 { return fmt.Errorf("count below 0: %d", result) } return nil } +// healthy returns true if the upstream is not actively marked as unhealthy. +// (This returns the status only from the "active" health checks.) +func (u *Upstream) healthy() bool { + return atomic.LoadInt32(&u.unhealthy) == 0 +} + // SetHealthy sets the upstream has healthy or unhealthy -// and returns true if the new value is different. -func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) { +// and returns true if the new value is different. This +// sets the status only for the "active" health checks. +func (u *Upstream) setHealthy(healthy bool) bool { var unhealthy, compare int32 = 1, 0 if healthy { unhealthy, compare = 0, 1 } - swapped := atomic.CompareAndSwapInt32(&uh.unhealthy, compare, unhealthy) - return swapped, nil + return atomic.CompareAndSwapInt32(&u.unhealthy, compare, unhealthy) } // DialInfo contains information needed to dial a -- cgit v1.2.3