From e73b117332a5606a986ee7e03e6f4b0cea7871c9 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Sat, 14 Sep 2019 13:25:26 -0600 Subject: reverse_proxy: Ability to mutate headers; set upstream placeholders --- modules/caddyhttp/reverseproxy/healthchecks.go | 6 +-- modules/caddyhttp/reverseproxy/hosts.go | 23 +++++++++++ modules/caddyhttp/reverseproxy/reverseproxy.go | 57 ++++++++++++++++++++++---- 3 files changed, 74 insertions(+), 12 deletions(-) (limited to 'modules/caddyhttp') diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index abe0f9c..084a693 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -111,10 +111,10 @@ func (h *Handler) doActiveHealthChecksForAllHosts() { if network == "unix" || network == "unixgram" || network == "unixpacket" { // this will be used as the Host portion of a http.Request URL, and // paths to socket files would produce an error when creating URL, - // so use a fake Host value instead - hostAddr = network + // so use a fake Host value instead; unix sockets are usually local + hostAddr = "localhost" } - err = h.doActiveHealthCheck(DialInfo{network, addrs[0]}, hostAddr, host) + err = h.doActiveHealthCheck(NewDialInfo(network, addrs[0]), hostAddr, host) if err != nil { log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", networkAddr, err) } diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index 1c0fae3..a8349bd 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -16,6 +16,8 @@ package reverseproxy import ( "fmt" + "net" + "strings" "sync/atomic" "github.com/caddyserver/caddy/v2" @@ -173,6 +175,27 @@ type DialInfo struct { // The address to dial. Follows the same // semantics and rules as net.Dial. Address string + + // Host and Port are components of Address, + // pre-split for convenience. + Host, Port string +} + +// NewDialInfo creates and populates a DialInfo +// for the given network and address. It splits +// the address into host and port values if the +// network type supports them, or uses the whole +// address as the port if splitting fails. +func NewDialInfo(network, address string) DialInfo { + var addrHost, addrPort string + if !strings.Contains(network, "unix") { + var err error + addrHost, addrPort, err = net.SplitHostPort(address) + if err != nil { + addrHost = address // assume there was no port + } + } + return DialInfo{network, address, addrHost, addrPort} } // String returns the Caddy network address form diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index a82c4e0..6a7ec70 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -21,11 +21,13 @@ import ( "net" "net/http" "regexp" + "strconv" "strings" "time" "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/modules/caddyhttp" + "github.com/caddyserver/caddy/v2/modules/caddyhttp/headers" "golang.org/x/net/http/httpguts" ) @@ -35,12 +37,13 @@ func init() { // Handler implements a highly configurable and production-ready reverse proxy. type Handler struct { - TransportRaw json.RawMessage `json:"transport,omitempty"` - CBRaw json.RawMessage `json:"circuit_breaker,omitempty"` - LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"` - HealthChecks *HealthChecks `json:"health_checks,omitempty"` - Upstreams UpstreamPool `json:"upstreams,omitempty"` - FlushInterval caddy.Duration `json:"flush_interval,omitempty"` + TransportRaw json.RawMessage `json:"transport,omitempty"` + CBRaw json.RawMessage `json:"circuit_breaker,omitempty"` + LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"` + HealthChecks *HealthChecks `json:"health_checks,omitempty"` + Upstreams UpstreamPool `json:"upstreams,omitempty"` + FlushInterval caddy.Duration `json:"flush_interval,omitempty"` + Headers *headers.Handler `json:"headers,omitempty"` Transport http.RoundTripper `json:"-"` CB CircuitBreaker `json:"-"` @@ -178,7 +181,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // make a new upstream based on the original // that has a singular dial address upstreamCopy := *upstream - upstreamCopy.dialInfo = DialInfo{network, addr} + upstreamCopy.dialInfo = NewDialInfo(network, addr) upstreamCopy.Dial = upstreamCopy.dialInfo.String() upstreamCopy.cb = h.CB @@ -187,7 +190,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // TODO: make hosts modular, so that their state can be distributed in enterprise for example // TODO: If distributed, the pool should be stored in storage... var host Host = new(upstreamHost) - activeHost, loaded := hosts.LoadOrStore(upstreamCopy.Dial, host) + activeHost, loaded := hosts.LoadOrStore(upstreamCopy.dialInfo.String(), host) if loaded { host = activeHost.(Host) } @@ -243,6 +246,8 @@ func (h *Handler) Cleanup() error { } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhttp.Handler) error { + repl := r.Context().Value(caddy.ReplacerCtxKey).(caddy.Replacer) + // prepare the request for proxying; this is needed only once err := h.prepareRequest(r) if err != nil { @@ -250,6 +255,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht fmt.Errorf("preparing request for upstream round-trip: %v", err)) } + // we will need the original headers and Host + // value if header operations are configured + reqHeader := r.Header + reqHost := r.Host + start := time.Now() var proxyErr error @@ -258,7 +268,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, r) if upstream == nil { if proxyErr == nil { - proxyErr = fmt.Errorf("no available upstreams") + proxyErr = fmt.Errorf("no upstreams available") } if !h.tryAgain(start, proxyErr) { break @@ -272,6 +282,26 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht ctx := context.WithValue(r.Context(), DialInfoCtxKey, upstream.dialInfo) r = r.WithContext(ctx) + // set placeholders with information about this upstream + repl.Set("http.handlers.reverse_proxy.upstream.address", upstream.dialInfo.String()) + repl.Set("http.handlers.reverse_proxy.upstream.hostport", upstream.dialInfo.Address) + repl.Set("http.handlers.reverse_proxy.upstream.host", upstream.dialInfo.Host) + repl.Set("http.handlers.reverse_proxy.upstream.port", upstream.dialInfo.Port) + repl.Set("http.handlers.reverse_proxy.upstream.requests", strconv.Itoa(upstream.Host.NumRequests())) + repl.Set("http.handlers.reverse_proxy.upstream.max_requests", strconv.Itoa(upstream.MaxRequests)) + repl.Set("http.handlers.reverse_proxy.upstream.fails", strconv.Itoa(upstream.Host.Fails())) + + // mutate request headers according to this upstream; + // because we're in a retry loop, we have to copy + // headers (and the r.Host value) from the original + // so that each retry is identical to the first + if h.Headers != nil && h.Headers.Request != nil { + r.Header = make(http.Header) + copyHeader(r.Header, reqHeader) + r.Host = reqHost + h.Headers.Request.ApplyToRequest(r) + } + // proxy the request to that upstream proxyErr = h.reverseProxy(w, r, upstream) if proxyErr == nil || proxyErr == context.Canceled { @@ -428,6 +458,15 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre rw.Header().Add("Trailer", strings.Join(trailerKeys, ", ")) } + // apply any response header operations + if h.Headers != nil && h.Headers.Response != nil { + if h.Headers.Response.Require == nil || + h.Headers.Response.Require.Match(res.StatusCode, rw.Header()) { + repl := req.Context().Value(caddy.ReplacerCtxKey).(caddy.Replacer) + h.Headers.Response.ApplyTo(rw.Header(), repl) + } + } + rw.WriteHeader(res.StatusCode) err = h.copyResponse(rw, res.Body, h.flushInterval(req, res)) -- cgit v1.2.3