summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Holt <mholt@users.noreply.github.com>2019-10-11 14:25:39 -0600
committerMatthew Holt <mholt@users.noreply.github.com>2019-10-11 14:25:39 -0600
commit1e31be8de0d1d5587348619225456a793cb30f7f (patch)
treefa3fca930b8ae02649471698950a60cce8afc539
parent4aa3af4b78addcf65ce6b254be10f006bae1c9ac (diff)
reverse_proxy: Allow dynamic backends (closes #990 and #1539)
This PR enables the use of placeholders in an upstream's Dial address. A Dial address must represent precisely one socket after replacements. See also #998 and #1639.
-rw-r--r--listeners.go7
-rw-r--r--listeners_test.go4
-rw-r--r--modules/caddyhttp/reverseproxy/caddyfile.go10
-rw-r--r--modules/caddyhttp/reverseproxy/healthchecks.go6
-rw-r--r--modules/caddyhttp/reverseproxy/hosts.go62
-rw-r--r--modules/caddyhttp/reverseproxy/reverseproxy.go157
6 files changed, 112 insertions, 134 deletions
diff --git a/listeners.go b/listeners.go
index 04ec788..8c2792c 100644
--- a/listeners.go
+++ b/listeners.go
@@ -286,9 +286,10 @@ func JoinNetworkAddress(network, host, port string) string {
if network != "" {
a = network + "/"
}
- a += host
- if port != "" {
- a += ":" + port
+ if host != "" && port == "" {
+ a += host
+ } else if port != "" {
+ a += net.JoinHostPort(host, port)
}
return a
}
diff --git a/listeners_test.go b/listeners_test.go
index 11d3980..bdddf32 100644
--- a/listeners_test.go
+++ b/listeners_test.go
@@ -138,6 +138,10 @@ func TestJoinNetworkAddress(t *testing.T) {
network: "unix", host: "/foo/bar", port: "",
expect: "unix//foo/bar",
},
+ {
+ network: "", host: "::1", port: "1234",
+ expect: "[::1]:1234",
+ },
} {
actual := JoinNetworkAddress(tc.network, tc.host, tc.port)
if actual != tc.expect {
diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go
index 83298d8..6b1ec69 100644
--- a/modules/caddyhttp/reverseproxy/caddyfile.go
+++ b/modules/caddyhttp/reverseproxy/caddyfile.go
@@ -81,9 +81,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
for _, up := range d.RemainingArgs() {
- h.Upstreams = append(h.Upstreams, &Upstream{
- Dial: up,
- })
+ h.Upstreams = append(h.Upstreams, &Upstream{Dial: up})
}
for d.NextBlock(0) {
@@ -94,9 +92,7 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return d.ArgErr()
}
for _, up := range args {
- h.Upstreams = append(h.Upstreams, &Upstream{
- Dial: up,
- })
+ h.Upstreams = append(h.Upstreams, &Upstream{Dial: up})
}
case "lb_policy":
@@ -502,6 +498,7 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
if d.Val() == "off" {
var disable bool
h.KeepAlive.Enabled = &disable
+ break
}
dur, err := time.ParseDuration(d.Val())
if err != nil {
@@ -521,6 +518,7 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
h.KeepAlive = new(KeepAlive)
}
h.KeepAlive.MaxIdleConns = num
+ h.KeepAlive.MaxIdleConnsPerHost = num
default:
return d.Errf("unrecognized subdirective %s", d.Val())
diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go
index be532ea..a64c21d 100644
--- a/modules/caddyhttp/reverseproxy/healthchecks.go
+++ b/modules/caddyhttp/reverseproxy/healthchecks.go
@@ -115,7 +115,7 @@ func (h *Handler) doActiveHealthChecksForAllHosts() {
// so use a fake Host value instead; unix sockets are usually local
hostAddr = "localhost"
}
- err = h.doActiveHealthCheck(NewDialInfo(network, addrs[0]), hostAddr, host)
+ err = h.doActiveHealthCheck(DialInfo{Network: network, Address: addrs[0]}, hostAddr, host)
if err != nil {
log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", networkAddr, err)
}
@@ -259,7 +259,7 @@ func (h *Handler) countFailure(upstream *Upstream) {
err := upstream.Host.CountFail(1)
if err != nil {
log.Printf("[ERROR] proxy: upstream %s: counting failure: %v",
- upstream.dialInfo, err)
+ upstream.Dial, err)
}
// forget it later
@@ -268,7 +268,7 @@ func (h *Handler) countFailure(upstream *Upstream) {
err := host.CountFail(-1)
if err != nil {
log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v",
- upstream.dialInfo, err)
+ upstream.Dial, err)
}
}(upstream.Host, failDuration)
}
diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go
index a8349bd..a16bed0 100644
--- a/modules/caddyhttp/reverseproxy/hosts.go
+++ b/modules/caddyhttp/reverseproxy/hosts.go
@@ -72,7 +72,6 @@ type Upstream struct {
healthCheckPolicy *PassiveHealthChecks
cb CircuitBreaker
- dialInfo DialInfo
}
// Available returns true if the remote host
@@ -149,8 +148,7 @@ func (uh *upstreamHost) CountFail(delta int) error {
}
// SetHealthy sets the upstream has healthy or unhealthy
-// and returns true if the value was different from before,
-// or an error if the adjustment failed.
+// and returns true if the new value is different.
func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) {
var unhealthy, compare int32 = 1, 0
if healthy {
@@ -167,8 +165,12 @@ func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) {
// a host that can be represented in a URL, but
// they certainly have a network name and address).
type DialInfo struct {
- // The network to use. This should be one of the
- // values that is accepted by net.Dial:
+ // Upstream is the Upstream associated with
+ // this DialInfo. It may be nil.
+ Upstream *Upstream
+
+ // The network to use. This should be one of
+ // the values that is accepted by net.Dial:
// https://golang.org/pkg/net/#Dial
Network string
@@ -176,33 +178,43 @@ type DialInfo struct {
// semantics and rules as net.Dial.
Address string
- // Host and Port are components of Address,
- // pre-split for convenience.
+ // Host and Port are components of Address.
Host, Port string
}
-// NewDialInfo creates and populates a DialInfo
-// for the given network and address. It splits
-// the address into host and port values if the
-// network type supports them, or uses the whole
-// address as the port if splitting fails.
-func NewDialInfo(network, address string) DialInfo {
- var addrHost, addrPort string
- if !strings.Contains(network, "unix") {
- var err error
- addrHost, addrPort, err = net.SplitHostPort(address)
- if err != nil {
- addrHost = address // assume there was no port
- }
- }
- return DialInfo{network, address, addrHost, addrPort}
-}
-
// String returns the Caddy network address form
// by joining the network and address with a
// forward slash.
func (di DialInfo) String() string {
- return di.Network + "/" + di.Address
+ return caddy.JoinNetworkAddress(di.Network, di.Host, di.Port)
+}
+
+// fillDialInfo returns a filled DialInfo for the given upstream, using
+// the given Replacer. Note that the returned value is not a pointer.
+func fillDialInfo(upstream *Upstream, repl caddy.Replacer) (DialInfo, error) {
+ dial := repl.ReplaceAll(upstream.Dial, "")
+ netw, addrs, err := caddy.ParseNetworkAddress(dial)
+ if err != nil {
+ return DialInfo{}, fmt.Errorf("upstream %s: invalid dial address %s: %v", upstream.Dial, dial, err)
+ }
+ if len(addrs) != 1 {
+ return DialInfo{}, fmt.Errorf("upstream %s: dial address must represent precisely one socket: %s represents %d",
+ upstream.Dial, dial, len(addrs))
+ }
+ var dialHost, dialPort string
+ if !strings.Contains(netw, "unix") {
+ dialHost, dialPort, err = net.SplitHostPort(addrs[0])
+ if err != nil {
+ dialHost = addrs[0] // assume there was no port
+ }
+ }
+ return DialInfo{
+ Upstream: upstream,
+ Network: netw,
+ Address: addrs[0],
+ Host: dialHost,
+ Port: dialPort,
+ }, nil
}
// DialInfoCtxKey is used to store a DialInfo
diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go
index 45c0690..266e5c3 100644
--- a/modules/caddyhttp/reverseproxy/reverseproxy.go
+++ b/modules/caddyhttp/reverseproxy/reverseproxy.go
@@ -87,6 +87,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.CBRaw = nil // allow GC to deallocate
}
+ // set up transport
if h.Transport == nil {
t := &HTTPTransport{
KeepAlive: &KeepAlive{
@@ -102,6 +103,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.Transport = t
}
+ // set up load balancing
if h.LoadBalancing == nil {
h.LoadBalancing = new(LoadBalancing)
}
@@ -152,85 +154,40 @@ func (h *Handler) Provision(ctx caddy.Context) error {
go h.activeHealthChecker()
}
- var allUpstreams []*Upstream
+ // set up upstreams
for _, upstream := range h.Upstreams {
- // if a port was not specified (and the network type uses
- // ports), then maybe we can figure out the default port
- netw, host, port, err := caddy.SplitNetworkAddress(upstream.Dial)
- if err != nil && port == "" && !strings.Contains(netw, "unix") {
- if host == "" {
- // assume all that was given was the host, no port
- host = upstream.Dial
- }
- // a port was not specified, but we may be able to
- // infer it if we know the standard ports on which
- // the transport protocol operates
- if ht, ok := h.Transport.(*HTTPTransport); ok {
- defaultPort := "80"
- if ht.TLS != nil {
- defaultPort = "443"
- }
- upstream.Dial = caddy.JoinNetworkAddress(netw, host, defaultPort)
- }
+ // create or get the host representation for this upstream
+ var host Host = new(upstreamHost)
+ existingHost, loaded := hosts.LoadOrStore(upstream.Dial, host)
+ if loaded {
+ host = existingHost.(Host)
}
+ upstream.Host = host
- // upstreams are allowed to map to only a single host,
- // but an upstream's address may semantically represent
- // multiple addresses, so make sure to handle each
- // one in turn based on this one upstream config
- network, addresses, err := caddy.ParseNetworkAddress(upstream.Dial)
- if err != nil {
- return fmt.Errorf("parsing dial address: %v", err)
- }
-
- for _, addr := range addresses {
- // make a new upstream based on the original
- // that has a singular dial address
- upstreamCopy := *upstream
- upstreamCopy.dialInfo = NewDialInfo(network, addr)
- upstreamCopy.Dial = upstreamCopy.dialInfo.String()
- upstreamCopy.cb = h.CB
-
- // if host already exists from a current config,
- // use that instead; otherwise, add it
- // TODO: make hosts modular, so that their state can be distributed in enterprise for example
- // TODO: If distributed, the pool should be stored in storage...
- var host Host = new(upstreamHost)
- activeHost, loaded := hosts.LoadOrStore(upstreamCopy.dialInfo.String(), host)
- if loaded {
- host = activeHost.(Host)
- }
- upstreamCopy.Host = host
-
- // if the passive health checker has a non-zero "unhealthy
- // request count" but the upstream has no MaxRequests set
- // (they are the same thing, but one is a default value for
- // for upstreams with a zero MaxRequests), copy the default
- // value into this upstream, since the value in the upstream
- // (MaxRequests) is what is used during availability checks
- if h.HealthChecks != nil &&
- h.HealthChecks.Passive != nil &&
- h.HealthChecks.Passive.UnhealthyRequestCount > 0 &&
- upstreamCopy.MaxRequests == 0 {
- upstreamCopy.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
- }
+ // give it the circuit breaker, if any
+ upstream.cb = h.CB
- // upstreams need independent access to the passive
- // health check policy because they run outside of the
- // scope of a request handler
- if h.HealthChecks != nil {
- upstreamCopy.healthCheckPolicy = h.HealthChecks.Passive
- }
+ // if the passive health checker has a non-zero UnhealthyRequestCount
+ // but the upstream has no MaxRequests set (they are the same thing,
+ // but the passive health checker is a default value for for upstreams
+ // without MaxRequests), copy the value into this upstream, since the
+ // value in the upstream (MaxRequests) is what is used during
+ // availability checks
+ if h.HealthChecks != nil &&
+ h.HealthChecks.Passive != nil &&
+ h.HealthChecks.Passive.UnhealthyRequestCount > 0 &&
+ upstream.MaxRequests == 0 {
+ upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
+ }
- allUpstreams = append(allUpstreams, &upstreamCopy)
+ // upstreams need independent access to the passive
+ // health check policy because passive health checks
+ // run without access to h.
+ if h.HealthChecks != nil {
+ upstream.healthCheckPolicy = h.HealthChecks.Passive
}
}
- // replace the unmarshaled upstreams (possible 1:many
- // address mapping) with our list, which is mapped 1:1,
- // thus may have expanded the original list
- h.Upstreams = allUpstreams
-
return nil
}
@@ -247,7 +204,7 @@ func (h *Handler) Cleanup() error {
// remove hosts from our config from the pool
for _, upstream := range h.Upstreams {
- hosts.Delete(upstream.dialInfo.String())
+ hosts.Delete(upstream.Dial)
}
return nil
@@ -284,17 +241,25 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
continue
}
+ // the dial address may vary per-request if placeholders are
+ // used, so perform those replacements here; the resulting
+ // DialInfo struct should have valid network address syntax
+ dialInfo, err := fillDialInfo(upstream, repl)
+ if err != nil {
+ return fmt.Errorf("making dial info: %v", err)
+ }
+
// attach to the request information about how to dial the upstream;
// this is necessary because the information cannot be sufficiently
// or satisfactorily represented in a URL
- ctx := context.WithValue(r.Context(), DialInfoCtxKey, upstream.dialInfo)
+ ctx := context.WithValue(r.Context(), DialInfoCtxKey, dialInfo)
r = r.WithContext(ctx)
// set placeholders with information about this upstream
- repl.Set("http.handlers.reverse_proxy.upstream.address", upstream.dialInfo.String())
- repl.Set("http.handlers.reverse_proxy.upstream.hostport", upstream.dialInfo.Address)
- repl.Set("http.handlers.reverse_proxy.upstream.host", upstream.dialInfo.Host)
- repl.Set("http.handlers.reverse_proxy.upstream.port", upstream.dialInfo.Port)
+ repl.Set("http.handlers.reverse_proxy.upstream.address", dialInfo.String())
+ repl.Set("http.handlers.reverse_proxy.upstream.hostport", dialInfo.Address)
+ repl.Set("http.handlers.reverse_proxy.upstream.host", dialInfo.Host)
+ repl.Set("http.handlers.reverse_proxy.upstream.port", dialInfo.Port)
repl.Set("http.handlers.reverse_proxy.upstream.requests", strconv.Itoa(upstream.Host.NumRequests()))
repl.Set("http.handlers.reverse_proxy.upstream.max_requests", strconv.Itoa(upstream.MaxRequests))
repl.Set("http.handlers.reverse_proxy.upstream.fails", strconv.Itoa(upstream.Host.Fails()))
@@ -311,7 +276,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
}
// proxy the request to that upstream
- proxyErr = h.reverseProxy(w, r, upstream)
+ proxyErr = h.reverseProxy(w, r, dialInfo)
if proxyErr == nil || proxyErr == context.Canceled {
// context.Canceled happens when the downstream client
// cancels the request, which is not our failure
@@ -405,12 +370,12 @@ func (h Handler) prepareRequest(req *http.Request) error {
// reverseProxy performs a round-trip to the given backend and processes the response with the client.
// (This method is mostly the beginning of what was borrowed from the net/http/httputil package in the
// Go standard library which was used as the foundation.)
-func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstream *Upstream) error {
- upstream.Host.CountRequest(1)
- defer upstream.Host.CountRequest(-1)
+func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, di DialInfo) error {
+ di.Upstream.Host.CountRequest(1)
+ defer di.Upstream.Host.CountRequest(-1)
// point the request to this upstream
- h.directRequest(req, upstream)
+ h.directRequest(req, di)
// do the round-trip
start := time.Now()
@@ -421,8 +386,8 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre
}
// update circuit breaker on current conditions
- if upstream.cb != nil {
- upstream.cb.RecordMetric(res.StatusCode, latency)
+ if di.Upstream.cb != nil {
+ di.Upstream.cb.RecordMetric(res.StatusCode, latency)
}
// perform passive health checks (if enabled)
@@ -430,14 +395,14 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre
// strike if the status code matches one that is "bad"
for _, badStatus := range h.HealthChecks.Passive.UnhealthyStatus {
if caddyhttp.StatusCodeMatches(res.StatusCode, badStatus) {
- h.countFailure(upstream)
+ h.countFailure(di.Upstream)
}
}
// strike if the roundtrip took too long
if h.HealthChecks.Passive.UnhealthyLatency > 0 &&
latency >= time.Duration(h.HealthChecks.Passive.UnhealthyLatency) {
- h.countFailure(upstream)
+ h.countFailure(di.Upstream)
}
}
@@ -554,23 +519,21 @@ func (lb LoadBalancing) tryAgain(start time.Time, proxyErr error, req *http.Requ
return true
}
-// directRequest modifies only req.URL so that it points to the
-// given upstream host. It must modify ONLY the request URL.
-func (h Handler) directRequest(req *http.Request, upstream *Upstream) {
+// directRequest modifies only req.URL so that it points to the upstream
+// in the given DialInfo. It must modify ONLY the request URL.
+func (h Handler) directRequest(req *http.Request, di DialInfo) {
if req.URL.Host == "" {
// we need a host, so set the upstream's host address
- fullHost := upstream.dialInfo.Address
+ reqHost := di.Address
- // but if the port matches the scheme, strip the port because
+ // if the port equates to the scheme, strip the port because
// it's weird to make a request like http://example.com:80/.
- host, port, err := net.SplitHostPort(fullHost)
- if err == nil &&
- (req.URL.Scheme == "http" && port == "80") ||
- (req.URL.Scheme == "https" && port == "443") {
- fullHost = host
+ if (req.URL.Scheme == "http" && di.Port == "80") ||
+ (req.URL.Scheme == "https" && di.Port == "443") {
+ reqHost = di.Host
}
- req.URL.Host = fullHost
+ req.URL.Host = reqHost
}
}