From 424ae0f420f478e1b38189fd6632d29e13df7eee Mon Sep 17 00:00:00 2001 From: mmm444 Date: Mon, 19 Jun 2023 23:54:43 +0200 Subject: reverseproxy: Experimental streaming timeouts (#5567) * reverseproxy: WIP streaming timeouts * More verbose logging by using the child logger * reverseproxy: Implement streaming timeouts * reverseproxy: Refactor cleanup * reverseproxy: Avoid **time.Timer --------- Co-authored-by: Francis Lavoie --- modules/caddyhttp/reverseproxy/caddyfile.go | 34 +++++++++- modules/caddyhttp/reverseproxy/reverseproxy.go | 44 ++++++------- modules/caddyhttp/reverseproxy/streaming.go | 89 +++++++++++++++++++++++--- 3 files changed, 133 insertions(+), 34 deletions(-) (limited to 'modules/caddyhttp') diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index a79bd09..26dd55c 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -83,10 +83,12 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // unhealthy_request_count // // # streaming -// flush_interval +// flush_interval // buffer_requests // buffer_responses -// max_buffer_size +// max_buffer_size +// stream_timeout +// stream_close_delay // // # request manipulation // trusted_proxies [private_ranges] @@ -571,6 +573,34 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { caddy.Log().Named("config.adapter.caddyfile").Warn("DEPRECATED: max_buffer_size: use request_buffers and/or response_buffers instead (with maximum buffer sizes)") h.DeprecatedMaxBufferSize = int64(size) + case "stream_timeout": + if !d.NextArg() { + return d.ArgErr() + } + if fi, err := strconv.Atoi(d.Val()); err == nil { + h.StreamTimeout = caddy.Duration(fi) + } else { + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad duration value '%s': %v", d.Val(), err) + } + h.StreamTimeout = caddy.Duration(dur) + } + + case "stream_close_delay": + if !d.NextArg() { + return d.ArgErr() + } + if fi, err := strconv.Atoi(d.Val()); err == nil { + h.StreamCloseDelay = caddy.Duration(fi) + } else { + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad duration value '%s': %v", d.Val(), err) + } + h.StreamCloseDelay = caddy.Duration(dur) + } + case "trusted_proxies": for d.NextArg() { if d.Val() == "private_ranges" { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 839c0cc..2fd0aae 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -157,6 +157,19 @@ type Handler struct { // could be useful if the backend has tighter memory constraints. ResponseBuffers int64 `json:"response_buffers,omitempty"` + // If nonzero, streaming requests such as WebSockets will be + // forcibly closed at the end of the timeout. Default: no timeout. + StreamTimeout caddy.Duration `json:"stream_timeout,omitempty"` + + // If nonzero, streaming requests such as WebSockets will not be + // closed when the proxy config is unloaded, and instead the stream + // will remain open until the delay is complete. In other words, + // enabling this prevents streams from closing when Caddy's config + // is reloaded. Enabling this may be a good idea to avoid a thundering + // herd of reconnecting clients which had their connections closed + // by the previous config closing. Default: no delay. + StreamCloseDelay caddy.Duration `json:"stream_close_delay,omitempty"` + // If configured, rewrites the copy of the upstream request. // Allows changing the request method and URI (path and query). // Since the rewrite is applied to the copy, it does not persist @@ -198,8 +211,9 @@ type Handler struct { handleResponseSegments []*caddyfile.Dispenser // Stores upgraded requests (hijacked connections) for proper cleanup - connections map[io.ReadWriteCloser]openConnection - connectionsMu *sync.Mutex + connections map[io.ReadWriteCloser]openConnection + connectionsCloseTimer *time.Timer + connectionsMu *sync.Mutex ctx caddy.Context logger *zap.Logger @@ -382,25 +396,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // Cleanup cleans up the resources made by h. func (h *Handler) Cleanup() error { - // close hijacked connections (both to client and backend) - var err error - h.connectionsMu.Lock() - for _, oc := range h.connections { - if oc.gracefulClose != nil { - // this is potentially blocking while we have the lock on the connections - // map, but that should be OK since the server has in theory shut down - // and we are no longer using the connections map - gracefulErr := oc.gracefulClose() - if gracefulErr != nil && err == nil { - err = gracefulErr - } - } - closeErr := oc.conn.Close() - if closeErr != nil && err == nil { - err = closeErr - } - } - h.connectionsMu.Unlock() + err := h.cleanupConnections() // remove hosts from our config from the pool for _, upstream := range h.Upstreams { @@ -872,7 +868,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe repl.Set("http.reverse_proxy.status_code", res.StatusCode) repl.Set("http.reverse_proxy.status_text", res.Status) - h.logger.Debug("handling response", zap.Int("handler", i)) + logger.Debug("handling response", zap.Int("handler", i)) // we make some data available via request context to child routes // so that they may inherit some options and functions from the @@ -917,7 +913,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe } // finalizeResponse prepares and copies the response. -func (h Handler) finalizeResponse( +func (h *Handler) finalizeResponse( rw http.ResponseWriter, req *http.Request, res *http.Response, @@ -967,7 +963,7 @@ func (h Handler) finalizeResponse( // there's nothing an error handler can do to recover at this point; // the standard lib's proxy panics at this point, but we'll just log // the error and abort the stream here - h.logger.Error("aborting with incomplete response", zap.Error(err)) + logger.Error("aborting with incomplete response", zap.Error(err)) return nil } diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 1f5387e..6c1e44c 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -33,19 +33,19 @@ import ( "golang.org/x/net/http/httpguts" ) -func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) { +func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) { reqUpType := upgradeType(req.Header) resUpType := upgradeType(res.Header) // Taken from https://github.com/golang/go/commit/5c489514bc5e61ad9b5b07bd7d8ec65d66a0512a // We know reqUpType is ASCII, it's checked by the caller. if !asciiIsPrint(resUpType) { - h.logger.Debug("backend tried to switch to invalid protocol", + logger.Debug("backend tried to switch to invalid protocol", zap.String("backend_upgrade", resUpType)) return } if !asciiEqualFold(reqUpType, resUpType) { - h.logger.Debug("backend tried to switch to unexpected protocol via Upgrade header", + logger.Debug("backend tried to switch to unexpected protocol via Upgrade header", zap.String("backend_upgrade", resUpType), zap.String("requested_upgrade", reqUpType)) return @@ -53,12 +53,12 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite hj, ok := rw.(http.Hijacker) if !ok { - h.logger.Sugar().Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw) + logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw))) return } backConn, ok := res.Body.(io.ReadWriteCloser) if !ok { - h.logger.Error("internal error: 101 switching protocols response with non-writable body") + logger.Error("internal error: 101 switching protocols response with non-writable body") return } @@ -83,7 +83,7 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite logger.Debug("upgrading connection") conn, brw, err := hj.Hijack() if err != nil { - h.logger.Error("hijack failed on protocol switch", zap.Error(err)) + logger.Error("hijack failed on protocol switch", zap.Error(err)) return } @@ -94,7 +94,7 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite }() if err := brw.Flush(); err != nil { - h.logger.Debug("response flush", zap.Error(err)) + logger.Debug("response flush", zap.Error(err)) return } @@ -120,10 +120,23 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite spc := switchProtocolCopier{user: conn, backend: backConn} + // setup the timeout if requested + var timeoutc <-chan time.Time + if h.StreamTimeout > 0 { + timer := time.NewTimer(time.Duration(h.StreamTimeout)) + defer timer.Stop() + timeoutc = timer.C + } + errc := make(chan error, 1) go spc.copyToBackend(errc) go spc.copyFromBackend(errc) - <-errc + select { + case err := <-errc: + logger.Debug("streaming error", zap.Error(err)) + case time := <-timeoutc: + logger.Debug("stream timed out", zap.Time("timeout", time)) + } } // flushInterval returns the p.FlushInterval value, conditionally @@ -243,10 +256,70 @@ func (h *Handler) registerConnection(conn io.ReadWriteCloser, gracefulClose func return func() { h.connectionsMu.Lock() delete(h.connections, conn) + // if there is no connection left before the connections close timer fires + if len(h.connections) == 0 && h.connectionsCloseTimer != nil { + // we release the timer that holds the reference to Handler + if (*h.connectionsCloseTimer).Stop() { + h.logger.Debug("stopped streaming connections close timer - all connections are already closed") + } + h.connectionsCloseTimer = nil + } h.connectionsMu.Unlock() } } +// closeConnections immediately closes all hijacked connections (both to client and backend). +func (h *Handler) closeConnections() error { + var err error + h.connectionsMu.Lock() + defer h.connectionsMu.Unlock() + + for _, oc := range h.connections { + if oc.gracefulClose != nil { + // this is potentially blocking while we have the lock on the connections + // map, but that should be OK since the server has in theory shut down + // and we are no longer using the connections map + gracefulErr := oc.gracefulClose() + if gracefulErr != nil && err == nil { + err = gracefulErr + } + } + closeErr := oc.conn.Close() + if closeErr != nil && err == nil { + err = closeErr + } + } + return err +} + +// cleanupConnections closes hijacked connections. +// Depending on the value of StreamCloseDelay it does that either immediately +// or sets up a timer that will do that later. +func (h *Handler) cleanupConnections() error { + if h.StreamCloseDelay == 0 { + return h.closeConnections() + } + + h.connectionsMu.Lock() + defer h.connectionsMu.Unlock() + // the handler is shut down, no new connection can appear, + // so we can skip setting up the timer when there are no connections + if len(h.connections) > 0 { + delay := time.Duration(h.StreamCloseDelay) + h.connectionsCloseTimer = time.AfterFunc(delay, func() { + h.logger.Debug("closing streaming connections after delay", + zap.Duration("delay", delay)) + err := h.closeConnections() + if err != nil { + h.logger.Error("failed to closed connections after delay", + zap.Error(err), + zap.Duration("delay", delay)) + } + }) + } + return nil +} + // writeCloseControl sends a best-effort Close control message to the given // WebSocket connection. Thanks to @pascaldekloe who provided inspiration // from his simple implementation of this I was able to learn from at: -- cgit v1.2.3