diff options
author | Tom Barrett <tom@tombarrett.xyz> | 2023-11-01 17:57:48 +0100 |
---|---|---|
committer | Tom Barrett <tom@tombarrett.xyz> | 2023-11-01 18:11:33 +0100 |
commit | 240c3d1338415e5d82ef7ca0e52c4284be6441bd (patch) | |
tree | 4b0ee5d208c2cdffa78d65f1b0abe0ec85f15652 /modules/caddyhttp/reverseproxy/reverseproxy.go | |
parent | 73e78ab226f21e6c6c68961af88c4ab9c746f4f4 (diff) | |
parent | 0e204b730aa2b1fa0835336b1117eff8c420f713 (diff) |
Diffstat (limited to 'modules/caddyhttp/reverseproxy/reverseproxy.go')
-rw-r--r-- | modules/caddyhttp/reverseproxy/reverseproxy.go | 275 |
1 files changed, 128 insertions, 147 deletions
diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 1449785..08be40d 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -27,30 +27,23 @@ import ( "net/netip" "net/textproto" "net/url" - "regexp" - "runtime" "strconv" "strings" "sync" "time" + "go.uber.org/zap" + "golang.org/x/net/http/httpguts" + "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" "github.com/caddyserver/caddy/v2/modules/caddyevents" "github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/caddyserver/caddy/v2/modules/caddyhttp/headers" "github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite" - "go.uber.org/zap" - "golang.org/x/net/http/httpguts" ) -var supports1xx bool - func init() { - // Caddy requires at least Go 1.18, but Early Hints requires Go 1.19; thus we can simply check for 1.18 in version string - // TODO: remove this once our minimum Go version is 1.19 - supports1xx = !strings.Contains(runtime.Version(), "go1.18") - caddy.RegisterModule(Handler{}) } @@ -158,6 +151,19 @@ type Handler struct { // could be useful if the backend has tighter memory constraints. ResponseBuffers int64 `json:"response_buffers,omitempty"` + // If nonzero, streaming requests such as WebSockets will be + // forcibly closed at the end of the timeout. Default: no timeout. + StreamTimeout caddy.Duration `json:"stream_timeout,omitempty"` + + // If nonzero, streaming requests such as WebSockets will not be + // closed when the proxy config is unloaded, and instead the stream + // will remain open until the delay is complete. In other words, + // enabling this prevents streams from closing when Caddy's config + // is reloaded. Enabling this may be a good idea to avoid a thundering + // herd of reconnecting clients which had their connections closed + // by the previous config closing. Default: no delay. + StreamCloseDelay caddy.Duration `json:"stream_close_delay,omitempty"` + // If configured, rewrites the copy of the upstream request. // Allows changing the request method and URI (path and query). // Since the rewrite is applied to the copy, it does not persist @@ -185,6 +191,13 @@ type Handler struct { // - `{http.reverse_proxy.header.*}` The headers from the response HandleResponse []caddyhttp.ResponseHandler `json:"handle_response,omitempty"` + // If set, the proxy will write very detailed logs about its + // inner workings. Enable this only when debugging, as it + // will produce a lot of output. + // + // EXPERIMENTAL: This feature is subject to change or removal. + VerboseLogs bool `json:"verbose_logs,omitempty"` + Transport http.RoundTripper `json:"-"` CB CircuitBreaker `json:"-"` DynamicUpstreams UpstreamSource `json:"-"` @@ -199,8 +212,9 @@ type Handler struct { handleResponseSegments []*caddyfile.Dispenser // Stores upgraded requests (hijacked connections) for proper cleanup - connections map[io.ReadWriteCloser]openConnection - connectionsMu *sync.Mutex + connections map[io.ReadWriteCloser]openConnection + connectionsCloseTimer *time.Timer + connectionsMu *sync.Mutex ctx caddy.Context logger *zap.Logger @@ -243,20 +257,6 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.logger.Warn("UNLIMITED BUFFERING: buffering is enabled without any cap on buffer size, which can result in OOM crashes") } - // verify SRV compatibility - TODO: LookupSRV deprecated; will be removed - for i, v := range h.Upstreams { - if v.LookupSRV == "" { - continue - } - h.logger.Warn("DEPRECATED: lookup_srv: will be removed in a near-future version of Caddy; use the http.reverse_proxy.upstreams.srv module instead") - if h.HealthChecks != nil && h.HealthChecks.Active != nil { - return fmt.Errorf(`upstream: lookup_srv is incompatible with active health checks: %d: {"dial": %q, "lookup_srv": %q}`, i, v.Dial, v.LookupSRV) - } - if v.Dial != "" { - return fmt.Errorf(`upstream: specifying dial address is incompatible with lookup_srv: %d: {"dial": %q, "lookup_srv": %q}`, i, v.Dial, v.LookupSRV) - } - } - // start by loading modules if h.TransportRaw != nil { mod, err := ctx.LoadModule(h, "TransportRaw") @@ -363,62 +363,22 @@ func (h *Handler) Provision(ctx caddy.Context) error { if h.HealthChecks != nil { // set defaults on passive health checks, if necessary if h.HealthChecks.Passive != nil { - if h.HealthChecks.Passive.FailDuration > 0 && h.HealthChecks.Passive.MaxFails == 0 { + h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive") + if h.HealthChecks.Passive.MaxFails == 0 { h.HealthChecks.Passive.MaxFails = 1 } } // if active health checks are enabled, configure them and start a worker - if h.HealthChecks.Active != nil && (h.HealthChecks.Active.Path != "" || - h.HealthChecks.Active.URI != "" || - h.HealthChecks.Active.Port != 0) { - - h.HealthChecks.Active.logger = h.logger.Named("health_checker.active") - - timeout := time.Duration(h.HealthChecks.Active.Timeout) - if timeout == 0 { - timeout = 5 * time.Second - } - - if h.HealthChecks.Active.Path != "" { - h.HealthChecks.Active.logger.Warn("the 'path' option is deprecated, please use 'uri' instead!") - } - - // parse the URI string (supports path and query) - if h.HealthChecks.Active.URI != "" { - parsedURI, err := url.Parse(h.HealthChecks.Active.URI) - if err != nil { - return err - } - h.HealthChecks.Active.uri = parsedURI - } - - h.HealthChecks.Active.httpClient = &http.Client{ - Timeout: timeout, - Transport: h.Transport, - } - - for _, upstream := range h.Upstreams { - // if there's an alternative port for health-check provided in the config, - // then use it, otherwise use the port of upstream. - if h.HealthChecks.Active.Port != 0 { - upstream.activeHealthCheckPort = h.HealthChecks.Active.Port - } + if h.HealthChecks.Active != nil { + err := h.HealthChecks.Active.Provision(ctx, h) + if err != nil { + return err } - if h.HealthChecks.Active.Interval == 0 { - h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second) + if h.HealthChecks.Active.IsEnabled() { + go h.activeHealthChecker() } - - if h.HealthChecks.Active.ExpectBody != "" { - var err error - h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody) - if err != nil { - return fmt.Errorf("expect_body: compiling regular expression: %v", err) - } - } - - go h.activeHealthChecker() } } @@ -438,25 +398,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // Cleanup cleans up the resources made by h. func (h *Handler) Cleanup() error { - // close hijacked connections (both to client and backend) - var err error - h.connectionsMu.Lock() - for _, oc := range h.connections { - if oc.gracefulClose != nil { - // this is potentially blocking while we have the lock on the connections - // map, but that should be OK since the server has in theory shut down - // and we are no longer using the connections map - gracefulErr := oc.gracefulClose() - if gracefulErr != nil && err == nil { - err = gracefulErr - } - } - closeErr := oc.conn.Close() - if closeErr != nil && err == nil { - err = closeErr - } - } - h.connectionsMu.Unlock() + err := h.cleanupConnections() // remove hosts from our config from the pool for _, upstream := range h.Upstreams { @@ -517,7 +459,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // It returns true when the loop is done and should break; false otherwise. The error value returned should // be assigned to the proxyErr value for the next iteration of the loop (or the error handled after break). func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w http.ResponseWriter, proxyErr error, start time.Time, retries int, - repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler) (bool, error) { + repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler, +) (bool, error) { // get the updated list of upstreams upstreams := h.Upstreams if h.DynamicUpstreams != nil { @@ -544,7 +487,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w) if upstream == nil { if proxyErr == nil { - proxyErr = caddyhttp.Error(http.StatusServiceUnavailable, fmt.Errorf("no upstreams available")) + proxyErr = caddyhttp.Error(http.StatusServiceUnavailable, noUpstreamsAvailable) } if !h.LoadBalancing.tryAgain(h.ctx, start, retries, proxyErr, r) { return true, proxyErr @@ -646,7 +589,8 @@ func (h Handler) prepareRequest(req *http.Request, repl *caddy.Replacer) (*http. // feature if absolutely required, if read timeouts are // set, and if body size is limited if h.RequestBuffers != 0 && req.Body != nil { - req.Body, _ = h.bufferedBody(req.Body, h.RequestBuffers) + req.Body, req.ContentLength = h.bufferedBody(req.Body, h.RequestBuffers) + req.Header.Set("Content-Length", strconv.FormatInt(req.ContentLength, 10)) } if req.ContentLength == 0 { @@ -687,8 +631,24 @@ func (h Handler) prepareRequest(req *http.Request, repl *caddy.Replacer) (*http. req.Header.Set("Upgrade", reqUpType) } + // Set up the PROXY protocol info + address := caddyhttp.GetVar(req.Context(), caddyhttp.ClientIPVarKey).(string) + addrPort, err := netip.ParseAddrPort(address) + if err != nil { + // OK; probably didn't have a port + addr, err := netip.ParseAddr(address) + if err != nil { + // Doesn't seem like a valid ip address at all + } else { + // Ok, only the port was missing + addrPort = netip.AddrPortFrom(addr, 0) + } + } + proxyProtocolInfo := ProxyProtocolInfo{AddrPort: addrPort} + caddyhttp.SetVar(req.Context(), proxyProtocolInfoVarKey, proxyProtocolInfo) + // Add the supported X-Forwarded-* headers - err := h.addForwardedHeaders(req) + err = h.addForwardedHeaders(req) if err != nil { return nil, err } @@ -795,25 +755,23 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe server := req.Context().Value(caddyhttp.ServerCtxKey).(*caddyhttp.Server) shouldLogCredentials := server.Logs != nil && server.Logs.ShouldLogCredentials - if supports1xx { - // Forward 1xx status codes, backported from https://github.com/golang/go/pull/53164 - trace := &httptrace.ClientTrace{ - Got1xxResponse: func(code int, header textproto.MIMEHeader) error { - h := rw.Header() - copyHeader(h, http.Header(header)) - rw.WriteHeader(code) - - // Clear headers coming from the backend - // (it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses) - for k := range header { - delete(h, k) - } + // Forward 1xx status codes, backported from https://github.com/golang/go/pull/53164 + trace := &httptrace.ClientTrace{ + Got1xxResponse: func(code int, header textproto.MIMEHeader) error { + h := rw.Header() + copyHeader(h, http.Header(header)) + rw.WriteHeader(code) + + // Clear headers coming from the backend + // (it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses) + for k := range header { + delete(h, k) + } - return nil - }, - } - req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + return nil + }, } + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) // if FlushInterval is explicitly configured to -1 (i.e. flush continuously to achieve // low-latency streaming), don't let the transport cancel the request if the client @@ -821,7 +779,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe // regardless, and we should expect client disconnection in low-latency streaming // scenarios (see issue #4922) if h.FlushInterval == -1 { - req = req.WithContext(ignoreClientGoneContext{req.Context(), h.ctx.Done()}) + req = req.WithContext(ignoreClientGoneContext{req.Context()}) } // do the round-trip; emit debug log with values we know are @@ -897,12 +855,6 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe break } - // otherwise, if there are any routes configured, execute those as the - // actual response instead of what we got from the proxy backend - if len(rh.Routes) == 0 { - continue - } - // set up the replacer so that parts of the original response can be // used for routing decisions for field, value := range res.Header { @@ -911,7 +863,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe repl.Set("http.reverse_proxy.status_code", res.StatusCode) repl.Set("http.reverse_proxy.status_text", res.Status) - h.logger.Debug("handling response", zap.Int("handler", i)) + logger.Debug("handling response", zap.Int("handler", i)) // we make some data available via request context to child routes // so that they may inherit some options and functions from the @@ -956,7 +908,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe } // finalizeResponse prepares and copies the response. -func (h Handler) finalizeResponse( +func (h *Handler) finalizeResponse( rw http.ResponseWriter, req *http.Request, res *http.Response, @@ -998,15 +950,21 @@ func (h Handler) finalizeResponse( } rw.WriteHeader(res.StatusCode) + if h.VerboseLogs { + logger.Debug("wrote header") + } - err := h.copyResponse(rw, res.Body, h.flushInterval(req, res)) - res.Body.Close() // close now, instead of defer, to populate res.Trailer + err := h.copyResponse(rw, res.Body, h.flushInterval(req, res), logger) + errClose := res.Body.Close() // close now, instead of defer, to populate res.Trailer + if h.VerboseLogs || errClose != nil { + logger.Debug("closed response body from upstream", zap.Error(errClose)) + } if err != nil { // we're streaming the response and we've already written headers, so // there's nothing an error handler can do to recover at this point; // the standard lib's proxy panics at this point, but we'll just log // the error and abort the stream here - h.logger.Error("aborting with incomplete response", zap.Error(err)) + logger.Error("aborting with incomplete response", zap.Error(err)) return nil } @@ -1014,9 +972,8 @@ func (h Handler) finalizeResponse( // Force chunking if we saw a response trailer. // This prevents net/http from calculating the length for short // bodies and adding a Content-Length. - if fl, ok := rw.(http.Flusher); ok { - fl.Flush() - } + //nolint:bodyclose + http.NewResponseController(rw).Flush() } // total duration spent proxying, including writing response body @@ -1035,6 +992,10 @@ func (h Handler) finalizeResponse( } } + if h.VerboseLogs { + logger.Debug("response finalized") + } + return nil } @@ -1066,17 +1027,23 @@ func (lb LoadBalancing) tryAgain(ctx caddy.Context, start time.Time, retries int // should be safe to retry, since without a connection, no // HTTP request can be transmitted; but if the error is not // specifically a dialer error, we need to be careful - if _, ok := proxyErr.(DialError); proxyErr != nil && !ok { + if proxyErr != nil { + _, isDialError := proxyErr.(DialError) + herr, isHandlerError := proxyErr.(caddyhttp.HandlerError) + // if the error occurred after a connection was established, // we have to assume the upstream received the request, and // retries need to be carefully decided, because some requests // are not idempotent - if lb.RetryMatch == nil && req.Method != "GET" { - // by default, don't retry requests if they aren't GET - return false - } - if !lb.RetryMatch.AnyMatch(req) { - return false + if !isDialError && !(isHandlerError && errors.Is(herr, noUpstreamsAvailable)) { + if lb.RetryMatch == nil && req.Method != "GET" { + // by default, don't retry requests if they aren't GET + return false + } + + if !lb.RetryMatch.AnyMatch(req) { + return false + } } } @@ -1128,12 +1095,11 @@ func (h Handler) provisionUpstream(upstream *Upstream) { // without MaxRequests), copy the value into this upstream, since the // value in the upstream (MaxRequests) is what is used during // availability checks - if h.HealthChecks != nil && h.HealthChecks.Passive != nil { - h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive") - if h.HealthChecks.Passive.UnhealthyRequestCount > 0 && - upstream.MaxRequests == 0 { - upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount - } + if h.HealthChecks != nil && + h.HealthChecks.Passive != nil && + h.HealthChecks.Passive.UnhealthyRequestCount > 0 && + upstream.MaxRequests == 0 { + upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount } // upstreams need independent access to the passive @@ -1450,21 +1416,36 @@ type handleResponseContext struct { // ignoreClientGoneContext is a special context.Context type // intended for use when doing a RoundTrip where you don't // want a client disconnection to cancel the request during -// the roundtrip. Set its done field to a Done() channel -// of a context that doesn't get canceled when the client -// disconnects, such as caddy.Context.Done() instead. +// the roundtrip. +// This context clears cancellation, error, and deadline methods, +// but still allows values to pass through from its embedded +// context. +// +// TODO: This can be replaced with context.WithoutCancel once +// the minimum required version of Go is 1.21. type ignoreClientGoneContext struct { context.Context - done <-chan struct{} } -func (c ignoreClientGoneContext) Done() <-chan struct{} { return c.done } +func (c ignoreClientGoneContext) Deadline() (deadline time.Time, ok bool) { + return +} + +func (c ignoreClientGoneContext) Done() <-chan struct{} { + return nil +} + +func (c ignoreClientGoneContext) Err() error { + return nil +} // proxyHandleResponseContextCtxKey is the context key for the active proxy handler // so that handle_response routes can inherit some config options // from the proxy handler. const proxyHandleResponseContextCtxKey caddy.CtxKey = "reverse_proxy_handle_response_context" +var noUpstreamsAvailable = fmt.Errorf("no upstreams available") + // Interface guards var ( _ caddy.Provisioner = (*Handler)(nil) |