From 1e31be8de0d1d5587348619225456a793cb30f7f Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Fri, 11 Oct 2019 14:25:39 -0600 Subject: reverse_proxy: Allow dynamic backends (closes #990 and #1539) This PR enables the use of placeholders in an upstream's Dial address. A Dial address must represent precisely one socket after replacements. See also #998 and #1639. --- listeners.go | 7 +- listeners_test.go | 4 + modules/caddyhttp/reverseproxy/caddyfile.go | 10 +- modules/caddyhttp/reverseproxy/healthchecks.go | 6 +- modules/caddyhttp/reverseproxy/hosts.go | 62 ++++++---- modules/caddyhttp/reverseproxy/reverseproxy.go | 157 ++++++++++--------------- 6 files changed, 112 insertions(+), 134 deletions(-) diff --git a/listeners.go b/listeners.go index 04ec788..8c2792c 100644 --- a/listeners.go +++ b/listeners.go @@ -286,9 +286,10 @@ func JoinNetworkAddress(network, host, port string) string { if network != "" { a = network + "/" } - a += host - if port != "" { - a += ":" + port + if host != "" && port == "" { + a += host + } else if port != "" { + a += net.JoinHostPort(host, port) } return a } diff --git a/listeners_test.go b/listeners_test.go index 11d3980..bdddf32 100644 --- a/listeners_test.go +++ b/listeners_test.go @@ -138,6 +138,10 @@ func TestJoinNetworkAddress(t *testing.T) { network: "unix", host: "/foo/bar", port: "", expect: "unix//foo/bar", }, + { + network: "", host: "::1", port: "1234", + expect: "[::1]:1234", + }, } { actual := JoinNetworkAddress(tc.network, tc.host, tc.port) if actual != tc.expect { diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 83298d8..6b1ec69 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -81,9 +81,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { for _, up := range d.RemainingArgs() { - h.Upstreams = append(h.Upstreams, &Upstream{ - Dial: up, - }) + h.Upstreams = append(h.Upstreams, &Upstream{Dial: up}) } for d.NextBlock(0) { @@ -94,9 +92,7 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return d.ArgErr() } for _, up := range args { - h.Upstreams = append(h.Upstreams, &Upstream{ - Dial: up, - }) + h.Upstreams = append(h.Upstreams, &Upstream{Dial: up}) } case "lb_policy": @@ -502,6 +498,7 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { if d.Val() == "off" { var disable bool h.KeepAlive.Enabled = &disable + break } dur, err := time.ParseDuration(d.Val()) if err != nil { @@ -521,6 +518,7 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { h.KeepAlive = new(KeepAlive) } h.KeepAlive.MaxIdleConns = num + h.KeepAlive.MaxIdleConnsPerHost = num default: return d.Errf("unrecognized subdirective %s", d.Val()) diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index be532ea..a64c21d 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -115,7 +115,7 @@ func (h *Handler) doActiveHealthChecksForAllHosts() { // so use a fake Host value instead; unix sockets are usually local hostAddr = "localhost" } - err = h.doActiveHealthCheck(NewDialInfo(network, addrs[0]), hostAddr, host) + err = h.doActiveHealthCheck(DialInfo{Network: network, Address: addrs[0]}, hostAddr, host) if err != nil { log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", networkAddr, err) } @@ -259,7 +259,7 @@ func (h *Handler) countFailure(upstream *Upstream) { err := upstream.Host.CountFail(1) if err != nil { log.Printf("[ERROR] proxy: upstream %s: counting failure: %v", - upstream.dialInfo, err) + upstream.Dial, err) } // forget it later @@ -268,7 +268,7 @@ func (h *Handler) countFailure(upstream *Upstream) { err := host.CountFail(-1) if err != nil { log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v", - upstream.dialInfo, err) + upstream.Dial, err) } }(upstream.Host, failDuration) } diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index a8349bd..a16bed0 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -72,7 +72,6 @@ type Upstream struct { healthCheckPolicy *PassiveHealthChecks cb CircuitBreaker - dialInfo DialInfo } // Available returns true if the remote host @@ -149,8 +148,7 @@ func (uh *upstreamHost) CountFail(delta int) error { } // SetHealthy sets the upstream has healthy or unhealthy -// and returns true if the value was different from before, -// or an error if the adjustment failed. +// and returns true if the new value is different. func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) { var unhealthy, compare int32 = 1, 0 if healthy { @@ -167,8 +165,12 @@ func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) { // a host that can be represented in a URL, but // they certainly have a network name and address). type DialInfo struct { - // The network to use. This should be one of the - // values that is accepted by net.Dial: + // Upstream is the Upstream associated with + // this DialInfo. It may be nil. + Upstream *Upstream + + // The network to use. This should be one of + // the values that is accepted by net.Dial: // https://golang.org/pkg/net/#Dial Network string @@ -176,33 +178,43 @@ type DialInfo struct { // semantics and rules as net.Dial. Address string - // Host and Port are components of Address, - // pre-split for convenience. + // Host and Port are components of Address. Host, Port string } -// NewDialInfo creates and populates a DialInfo -// for the given network and address. It splits -// the address into host and port values if the -// network type supports them, or uses the whole -// address as the port if splitting fails. -func NewDialInfo(network, address string) DialInfo { - var addrHost, addrPort string - if !strings.Contains(network, "unix") { - var err error - addrHost, addrPort, err = net.SplitHostPort(address) - if err != nil { - addrHost = address // assume there was no port - } - } - return DialInfo{network, address, addrHost, addrPort} -} - // String returns the Caddy network address form // by joining the network and address with a // forward slash. func (di DialInfo) String() string { - return di.Network + "/" + di.Address + return caddy.JoinNetworkAddress(di.Network, di.Host, di.Port) +} + +// fillDialInfo returns a filled DialInfo for the given upstream, using +// the given Replacer. Note that the returned value is not a pointer. +func fillDialInfo(upstream *Upstream, repl caddy.Replacer) (DialInfo, error) { + dial := repl.ReplaceAll(upstream.Dial, "") + netw, addrs, err := caddy.ParseNetworkAddress(dial) + if err != nil { + return DialInfo{}, fmt.Errorf("upstream %s: invalid dial address %s: %v", upstream.Dial, dial, err) + } + if len(addrs) != 1 { + return DialInfo{}, fmt.Errorf("upstream %s: dial address must represent precisely one socket: %s represents %d", + upstream.Dial, dial, len(addrs)) + } + var dialHost, dialPort string + if !strings.Contains(netw, "unix") { + dialHost, dialPort, err = net.SplitHostPort(addrs[0]) + if err != nil { + dialHost = addrs[0] // assume there was no port + } + } + return DialInfo{ + Upstream: upstream, + Network: netw, + Address: addrs[0], + Host: dialHost, + Port: dialPort, + }, nil } // DialInfoCtxKey is used to store a DialInfo diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 45c0690..266e5c3 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -87,6 +87,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.CBRaw = nil // allow GC to deallocate } + // set up transport if h.Transport == nil { t := &HTTPTransport{ KeepAlive: &KeepAlive{ @@ -102,6 +103,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.Transport = t } + // set up load balancing if h.LoadBalancing == nil { h.LoadBalancing = new(LoadBalancing) } @@ -152,85 +154,40 @@ func (h *Handler) Provision(ctx caddy.Context) error { go h.activeHealthChecker() } - var allUpstreams []*Upstream + // set up upstreams for _, upstream := range h.Upstreams { - // if a port was not specified (and the network type uses - // ports), then maybe we can figure out the default port - netw, host, port, err := caddy.SplitNetworkAddress(upstream.Dial) - if err != nil && port == "" && !strings.Contains(netw, "unix") { - if host == "" { - // assume all that was given was the host, no port - host = upstream.Dial - } - // a port was not specified, but we may be able to - // infer it if we know the standard ports on which - // the transport protocol operates - if ht, ok := h.Transport.(*HTTPTransport); ok { - defaultPort := "80" - if ht.TLS != nil { - defaultPort = "443" - } - upstream.Dial = caddy.JoinNetworkAddress(netw, host, defaultPort) - } + // create or get the host representation for this upstream + var host Host = new(upstreamHost) + existingHost, loaded := hosts.LoadOrStore(upstream.Dial, host) + if loaded { + host = existingHost.(Host) } + upstream.Host = host - // upstreams are allowed to map to only a single host, - // but an upstream's address may semantically represent - // multiple addresses, so make sure to handle each - // one in turn based on this one upstream config - network, addresses, err := caddy.ParseNetworkAddress(upstream.Dial) - if err != nil { - return fmt.Errorf("parsing dial address: %v", err) - } - - for _, addr := range addresses { - // make a new upstream based on the original - // that has a singular dial address - upstreamCopy := *upstream - upstreamCopy.dialInfo = NewDialInfo(network, addr) - upstreamCopy.Dial = upstreamCopy.dialInfo.String() - upstreamCopy.cb = h.CB - - // if host already exists from a current config, - // use that instead; otherwise, add it - // TODO: make hosts modular, so that their state can be distributed in enterprise for example - // TODO: If distributed, the pool should be stored in storage... - var host Host = new(upstreamHost) - activeHost, loaded := hosts.LoadOrStore(upstreamCopy.dialInfo.String(), host) - if loaded { - host = activeHost.(Host) - } - upstreamCopy.Host = host - - // if the passive health checker has a non-zero "unhealthy - // request count" but the upstream has no MaxRequests set - // (they are the same thing, but one is a default value for - // for upstreams with a zero MaxRequests), copy the default - // value into this upstream, since the value in the upstream - // (MaxRequests) is what is used during availability checks - if h.HealthChecks != nil && - h.HealthChecks.Passive != nil && - h.HealthChecks.Passive.UnhealthyRequestCount > 0 && - upstreamCopy.MaxRequests == 0 { - upstreamCopy.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount - } + // give it the circuit breaker, if any + upstream.cb = h.CB - // upstreams need independent access to the passive - // health check policy because they run outside of the - // scope of a request handler - if h.HealthChecks != nil { - upstreamCopy.healthCheckPolicy = h.HealthChecks.Passive - } + // if the passive health checker has a non-zero UnhealthyRequestCount + // but the upstream has no MaxRequests set (they are the same thing, + // but the passive health checker is a default value for for upstreams + // without MaxRequests), copy the value into this upstream, since the + // value in the upstream (MaxRequests) is what is used during + // availability checks + if h.HealthChecks != nil && + h.HealthChecks.Passive != nil && + h.HealthChecks.Passive.UnhealthyRequestCount > 0 && + upstream.MaxRequests == 0 { + upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount + } - allUpstreams = append(allUpstreams, &upstreamCopy) + // upstreams need independent access to the passive + // health check policy because passive health checks + // run without access to h. + if h.HealthChecks != nil { + upstream.healthCheckPolicy = h.HealthChecks.Passive } } - // replace the unmarshaled upstreams (possible 1:many - // address mapping) with our list, which is mapped 1:1, - // thus may have expanded the original list - h.Upstreams = allUpstreams - return nil } @@ -247,7 +204,7 @@ func (h *Handler) Cleanup() error { // remove hosts from our config from the pool for _, upstream := range h.Upstreams { - hosts.Delete(upstream.dialInfo.String()) + hosts.Delete(upstream.Dial) } return nil @@ -284,17 +241,25 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht continue } + // the dial address may vary per-request if placeholders are + // used, so perform those replacements here; the resulting + // DialInfo struct should have valid network address syntax + dialInfo, err := fillDialInfo(upstream, repl) + if err != nil { + return fmt.Errorf("making dial info: %v", err) + } + // attach to the request information about how to dial the upstream; // this is necessary because the information cannot be sufficiently // or satisfactorily represented in a URL - ctx := context.WithValue(r.Context(), DialInfoCtxKey, upstream.dialInfo) + ctx := context.WithValue(r.Context(), DialInfoCtxKey, dialInfo) r = r.WithContext(ctx) // set placeholders with information about this upstream - repl.Set("http.handlers.reverse_proxy.upstream.address", upstream.dialInfo.String()) - repl.Set("http.handlers.reverse_proxy.upstream.hostport", upstream.dialInfo.Address) - repl.Set("http.handlers.reverse_proxy.upstream.host", upstream.dialInfo.Host) - repl.Set("http.handlers.reverse_proxy.upstream.port", upstream.dialInfo.Port) + repl.Set("http.handlers.reverse_proxy.upstream.address", dialInfo.String()) + repl.Set("http.handlers.reverse_proxy.upstream.hostport", dialInfo.Address) + repl.Set("http.handlers.reverse_proxy.upstream.host", dialInfo.Host) + repl.Set("http.handlers.reverse_proxy.upstream.port", dialInfo.Port) repl.Set("http.handlers.reverse_proxy.upstream.requests", strconv.Itoa(upstream.Host.NumRequests())) repl.Set("http.handlers.reverse_proxy.upstream.max_requests", strconv.Itoa(upstream.MaxRequests)) repl.Set("http.handlers.reverse_proxy.upstream.fails", strconv.Itoa(upstream.Host.Fails())) @@ -311,7 +276,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht } // proxy the request to that upstream - proxyErr = h.reverseProxy(w, r, upstream) + proxyErr = h.reverseProxy(w, r, dialInfo) if proxyErr == nil || proxyErr == context.Canceled { // context.Canceled happens when the downstream client // cancels the request, which is not our failure @@ -405,12 +370,12 @@ func (h Handler) prepareRequest(req *http.Request) error { // reverseProxy performs a round-trip to the given backend and processes the response with the client. // (This method is mostly the beginning of what was borrowed from the net/http/httputil package in the // Go standard library which was used as the foundation.) -func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstream *Upstream) error { - upstream.Host.CountRequest(1) - defer upstream.Host.CountRequest(-1) +func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, di DialInfo) error { + di.Upstream.Host.CountRequest(1) + defer di.Upstream.Host.CountRequest(-1) // point the request to this upstream - h.directRequest(req, upstream) + h.directRequest(req, di) // do the round-trip start := time.Now() @@ -421,8 +386,8 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre } // update circuit breaker on current conditions - if upstream.cb != nil { - upstream.cb.RecordMetric(res.StatusCode, latency) + if di.Upstream.cb != nil { + di.Upstream.cb.RecordMetric(res.StatusCode, latency) } // perform passive health checks (if enabled) @@ -430,14 +395,14 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre // strike if the status code matches one that is "bad" for _, badStatus := range h.HealthChecks.Passive.UnhealthyStatus { if caddyhttp.StatusCodeMatches(res.StatusCode, badStatus) { - h.countFailure(upstream) + h.countFailure(di.Upstream) } } // strike if the roundtrip took too long if h.HealthChecks.Passive.UnhealthyLatency > 0 && latency >= time.Duration(h.HealthChecks.Passive.UnhealthyLatency) { - h.countFailure(upstream) + h.countFailure(di.Upstream) } } @@ -554,23 +519,21 @@ func (lb LoadBalancing) tryAgain(start time.Time, proxyErr error, req *http.Requ return true } -// directRequest modifies only req.URL so that it points to the -// given upstream host. It must modify ONLY the request URL. -func (h Handler) directRequest(req *http.Request, upstream *Upstream) { +// directRequest modifies only req.URL so that it points to the upstream +// in the given DialInfo. It must modify ONLY the request URL. +func (h Handler) directRequest(req *http.Request, di DialInfo) { if req.URL.Host == "" { // we need a host, so set the upstream's host address - fullHost := upstream.dialInfo.Address + reqHost := di.Address - // but if the port matches the scheme, strip the port because + // if the port equates to the scheme, strip the port because // it's weird to make a request like http://example.com:80/. - host, port, err := net.SplitHostPort(fullHost) - if err == nil && - (req.URL.Scheme == "http" && port == "80") || - (req.URL.Scheme == "https" && port == "443") { - fullHost = host + if (req.URL.Scheme == "http" && di.Port == "80") || + (req.URL.Scheme == "https" && di.Port == "443") { + reqHost = di.Host } - req.URL.Host = fullHost + req.URL.Host = reqHost } } -- cgit v1.2.3