From 66e571e687eeddca0aafd5df0e3ab5f7cecbdcfa Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Fri, 31 Mar 2023 15:46:29 -0400 Subject: reverseproxy: Add mention of which half a copyBuffer err comes from (#5472) Co-authored-by: Matt Holt --- modules/caddyhttp/reverseproxy/streaming.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/streaming.go') diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 1db107a..1f5387e 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -20,6 +20,7 @@ package reverseproxy import ( "context" + "fmt" "io" weakrand "math/rand" "mime" @@ -215,7 +216,7 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er written += int64(nw) } if werr != nil { - return written, werr + return written, fmt.Errorf("writing: %w", werr) } if nr != nw { return written, io.ErrShortWrite @@ -223,9 +224,9 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er } if rerr != nil { if rerr == io.EOF { - rerr = nil + return written, nil } - return written, rerr + return written, fmt.Errorf("reading: %w", rerr) } } } -- cgit v1.2.3 From 424ae0f420f478e1b38189fd6632d29e13df7eee Mon Sep 17 00:00:00 2001 From: mmm444 Date: Mon, 19 Jun 2023 23:54:43 +0200 Subject: reverseproxy: Experimental streaming timeouts (#5567) * reverseproxy: WIP streaming timeouts * More verbose logging by using the child logger * reverseproxy: Implement streaming timeouts * reverseproxy: Refactor cleanup * reverseproxy: Avoid **time.Timer --------- Co-authored-by: Francis Lavoie --- modules/caddyhttp/reverseproxy/streaming.go | 89 ++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 8 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/streaming.go') diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 1f5387e..6c1e44c 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -33,19 +33,19 @@ import ( "golang.org/x/net/http/httpguts" ) -func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) { +func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) { reqUpType := upgradeType(req.Header) resUpType := upgradeType(res.Header) // Taken from https://github.com/golang/go/commit/5c489514bc5e61ad9b5b07bd7d8ec65d66a0512a // We know reqUpType is ASCII, it's checked by the caller. if !asciiIsPrint(resUpType) { - h.logger.Debug("backend tried to switch to invalid protocol", + logger.Debug("backend tried to switch to invalid protocol", zap.String("backend_upgrade", resUpType)) return } if !asciiEqualFold(reqUpType, resUpType) { - h.logger.Debug("backend tried to switch to unexpected protocol via Upgrade header", + logger.Debug("backend tried to switch to unexpected protocol via Upgrade header", zap.String("backend_upgrade", resUpType), zap.String("requested_upgrade", reqUpType)) return @@ -53,12 +53,12 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite hj, ok := rw.(http.Hijacker) if !ok { - h.logger.Sugar().Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw) + logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw))) return } backConn, ok := res.Body.(io.ReadWriteCloser) if !ok { - h.logger.Error("internal error: 101 switching protocols response with non-writable body") + logger.Error("internal error: 101 switching protocols response with non-writable body") return } @@ -83,7 +83,7 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite logger.Debug("upgrading connection") conn, brw, err := hj.Hijack() if err != nil { - h.logger.Error("hijack failed on protocol switch", zap.Error(err)) + logger.Error("hijack failed on protocol switch", zap.Error(err)) return } @@ -94,7 +94,7 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite }() if err := brw.Flush(); err != nil { - h.logger.Debug("response flush", zap.Error(err)) + logger.Debug("response flush", zap.Error(err)) return } @@ -120,10 +120,23 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite spc := switchProtocolCopier{user: conn, backend: backConn} + // setup the timeout if requested + var timeoutc <-chan time.Time + if h.StreamTimeout > 0 { + timer := time.NewTimer(time.Duration(h.StreamTimeout)) + defer timer.Stop() + timeoutc = timer.C + } + errc := make(chan error, 1) go spc.copyToBackend(errc) go spc.copyFromBackend(errc) - <-errc + select { + case err := <-errc: + logger.Debug("streaming error", zap.Error(err)) + case time := <-timeoutc: + logger.Debug("stream timed out", zap.Time("timeout", time)) + } } // flushInterval returns the p.FlushInterval value, conditionally @@ -243,10 +256,70 @@ func (h *Handler) registerConnection(conn io.ReadWriteCloser, gracefulClose func return func() { h.connectionsMu.Lock() delete(h.connections, conn) + // if there is no connection left before the connections close timer fires + if len(h.connections) == 0 && h.connectionsCloseTimer != nil { + // we release the timer that holds the reference to Handler + if (*h.connectionsCloseTimer).Stop() { + h.logger.Debug("stopped streaming connections close timer - all connections are already closed") + } + h.connectionsCloseTimer = nil + } h.connectionsMu.Unlock() } } +// closeConnections immediately closes all hijacked connections (both to client and backend). +func (h *Handler) closeConnections() error { + var err error + h.connectionsMu.Lock() + defer h.connectionsMu.Unlock() + + for _, oc := range h.connections { + if oc.gracefulClose != nil { + // this is potentially blocking while we have the lock on the connections + // map, but that should be OK since the server has in theory shut down + // and we are no longer using the connections map + gracefulErr := oc.gracefulClose() + if gracefulErr != nil && err == nil { + err = gracefulErr + } + } + closeErr := oc.conn.Close() + if closeErr != nil && err == nil { + err = closeErr + } + } + return err +} + +// cleanupConnections closes hijacked connections. +// Depending on the value of StreamCloseDelay it does that either immediately +// or sets up a timer that will do that later. +func (h *Handler) cleanupConnections() error { + if h.StreamCloseDelay == 0 { + return h.closeConnections() + } + + h.connectionsMu.Lock() + defer h.connectionsMu.Unlock() + // the handler is shut down, no new connection can appear, + // so we can skip setting up the timer when there are no connections + if len(h.connections) > 0 { + delay := time.Duration(h.StreamCloseDelay) + h.connectionsCloseTimer = time.AfterFunc(delay, func() { + h.logger.Debug("closing streaming connections after delay", + zap.Duration("delay", delay)) + err := h.closeConnections() + if err != nil { + h.logger.Error("failed to closed connections after delay", + zap.Error(err), + zap.Duration("delay", delay)) + } + }) + } + return nil +} + // writeCloseControl sends a best-effort Close control message to the given // WebSocket connection. Thanks to @pascaldekloe who provided inspiration // from his simple implementation of this I was able to learn from at: -- cgit v1.2.3 From cd486c25d168caf58f4b6fe5d3252df9432901ec Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Wed, 2 Aug 2023 16:03:26 -0400 Subject: caddyhttp: Make use of `http.ResponseController` (#5654) * caddyhttp: Make use of http.ResponseController Also syncs the reverseproxy implementation with stdlib's which now uses ResponseController as well https://github.com/golang/go/commit/2449bbb5e614954ce9e99c8a481ea2ee73d72d61 * Enable full-duplex for HTTP/1.1 * Appease linter * Add warning for builds with Go 1.20, so it's less surprising to users * Improved godoc for EnableFullDuplex, copied text from stdlib * Only wrap in encode if not already wrapped --- modules/caddyhttp/reverseproxy/streaming.go | 60 +++++++++++++++-------------- 1 file changed, 31 insertions(+), 29 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/streaming.go') diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 6c1e44c..3f2489d 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -20,6 +20,7 @@ package reverseproxy import ( "context" + "errors" "fmt" "io" weakrand "math/rand" @@ -51,17 +52,19 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit return } - hj, ok := rw.(http.Hijacker) - if !ok { - logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw))) - return - } backConn, ok := res.Body.(io.ReadWriteCloser) if !ok { logger.Error("internal error: 101 switching protocols response with non-writable body") return } + //nolint:bodyclose + conn, brw, hijackErr := http.NewResponseController(rw).Hijack() + if errors.Is(hijackErr, http.ErrNotSupported) { + h.logger.Sugar().Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw) + return + } + // adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5 backConnCloseCh := make(chan struct{}) go func() { @@ -81,9 +84,8 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit rw.WriteHeader(res.StatusCode) logger.Debug("upgrading connection") - conn, brw, err := hj.Hijack() - if err != nil { - logger.Error("hijack failed on protocol switch", zap.Error(err)) + if hijackErr != nil { + h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) return } @@ -181,26 +183,28 @@ func (h Handler) isBidirectionalStream(req *http.Request, res *http.Response) bo (ae == "identity" || ae == "") } -func (h Handler) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error { +func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error { + var w io.Writer = dst + if flushInterval != 0 { - if wf, ok := dst.(writeFlusher); ok { - mlw := &maxLatencyWriter{ - dst: wf, - latency: flushInterval, - } - defer mlw.stop() + mlw := &maxLatencyWriter{ + dst: dst, + //nolint:bodyclose + flush: http.NewResponseController(dst).Flush, + latency: flushInterval, + } + defer mlw.stop() - // set up initial timer so headers get flushed even if body writes are delayed - mlw.flushPending = true - mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush) + // set up initial timer so headers get flushed even if body writes are delayed + mlw.flushPending = true + mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush) - dst = mlw - } + w = mlw } buf := streamingBufPool.Get().(*[]byte) defer streamingBufPool.Put(buf) - _, err := h.copyBuffer(dst, src, *buf) + _, err := h.copyBuffer(w, src, *buf) return err } @@ -439,13 +443,9 @@ type openConnection struct { gracefulClose func() error } -type writeFlusher interface { - io.Writer - http.Flusher -} - type maxLatencyWriter struct { - dst writeFlusher + dst io.Writer + flush func() error latency time.Duration // non-zero; negative means to flush immediately mu sync.Mutex // protects t, flushPending, and dst.Flush @@ -458,7 +458,8 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { defer m.mu.Unlock() n, err = m.dst.Write(p) if m.latency < 0 { - m.dst.Flush() + //nolint:errcheck + m.flush() return } if m.flushPending { @@ -479,7 +480,8 @@ func (m *maxLatencyWriter) delayedFlush() { if !m.flushPending { // if stop was called but AfterFunc already started this goroutine return } - m.dst.Flush() + //nolint:errcheck + m.flush() m.flushPending = false } -- cgit v1.2.3 From e2fc08bd34cc17b8cbb6ac364fa1ec41c4c643b9 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Thu, 3 Aug 2023 12:08:12 +0800 Subject: reverseproxy: Fix hijack ordering which broke websockets (#5679) --- modules/caddyhttp/reverseproxy/streaming.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/streaming.go') diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 3f2489d..71b9386 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -58,6 +58,13 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit return } + // write header first, response headers should not be counted in size + // like the rest of handler chain. + copyHeader(rw.Header(), res.Header) + rw.WriteHeader(res.StatusCode) + + logger.Debug("upgrading connection") + //nolint:bodyclose conn, brw, hijackErr := http.NewResponseController(rw).Hijack() if errors.Is(hijackErr, http.ErrNotSupported) { @@ -65,6 +72,11 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit return } + if hijackErr != nil { + h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) + return + } + // adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5 backConnCloseCh := make(chan struct{}) go func() { @@ -78,17 +90,6 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit }() defer close(backConnCloseCh) - // write header first, response headers should not be counted in size - // like the rest of handler chain. - copyHeader(rw.Header(), res.Header) - rw.WriteHeader(res.StatusCode) - - logger.Debug("upgrading connection") - if hijackErr != nil { - h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) - return - } - start := time.Now() defer func() { conn.Close() -- cgit v1.2.3 From b32f265ecad60404c3818cc9d42e367a8e4eb7d4 Mon Sep 17 00:00:00 2001 From: Jacob Gadikian Date: Tue, 8 Aug 2023 03:40:31 +0800 Subject: ci: Use gofumpt to format code (#5707) --- modules/caddyhttp/reverseproxy/streaming.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/streaming.go') diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 71b9386..c5369d8 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -522,5 +522,7 @@ var streamingBufPool = sync.Pool{ }, } -const defaultBufferSize = 32 * 1024 -const wordSize = int(unsafe.Sizeof(uintptr(0))) +const ( + defaultBufferSize = 32 * 1024 + wordSize = int(unsafe.Sizeof(uintptr(0))) +) -- cgit v1.2.3 From 3a3182fba3eb20c45dce3efd9eb2856288a5ae04 Mon Sep 17 00:00:00 2001 From: Matt Holt Date: Wed, 11 Oct 2023 13:36:20 -0600 Subject: reverseproxy: Add more debug logs (#5793) * reverseproxy: Add more debug logs This makes debug logging very noisy when reverse proxying, but I guess that's the point. This has shown to be useful in troubleshooting infrastructure issues. * Update modules/caddyhttp/reverseproxy/streaming.go Co-authored-by: Francis Lavoie * Update modules/caddyhttp/reverseproxy/streaming.go Co-authored-by: Francis Lavoie * Add opt-in `trace_logs` option * Rename to VerboseLogs --------- Co-authored-by: Francis Lavoie --- modules/caddyhttp/reverseproxy/streaming.go | 36 ++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/streaming.go') diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index c5369d8..155a1df 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -184,15 +184,22 @@ func (h Handler) isBidirectionalStream(req *http.Request, res *http.Response) bo (ae == "identity" || ae == "") } -func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error { +func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration, logger *zap.Logger) error { var w io.Writer = dst if flushInterval != 0 { + var mlwLogger *zap.Logger + if h.VerboseLogs { + mlwLogger = logger.Named("max_latency_writer") + } else { + mlwLogger = zap.NewNop() + } mlw := &maxLatencyWriter{ dst: dst, //nolint:bodyclose flush: http.NewResponseController(dst).Flush, latency: flushInterval, + logger: mlwLogger, } defer mlw.stop() @@ -205,19 +212,30 @@ func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInter buf := streamingBufPool.Get().(*[]byte) defer streamingBufPool.Put(buf) - _, err := h.copyBuffer(w, src, *buf) + + var copyLogger *zap.Logger + if h.VerboseLogs { + copyLogger = logger + } else { + copyLogger = zap.NewNop() + } + + _, err := h.copyBuffer(w, src, *buf, copyLogger) return err } // copyBuffer returns any write errors or non-EOF read errors, and the amount // of bytes written. -func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) { +func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *zap.Logger) (int64, error) { if len(buf) == 0 { buf = make([]byte, defaultBufferSize) } var written int64 for { + logger.Debug("waiting to read from upstream") nr, rerr := src.Read(buf) + logger := logger.With(zap.Int("read", nr)) + logger.Debug("read from upstream", zap.Error(rerr)) if rerr != nil && rerr != io.EOF && rerr != context.Canceled { // TODO: this could be useful to know (indeed, it revealed an error in our // fastcgi PoC earlier; but it's this single error report here that necessitates @@ -229,10 +247,15 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er h.logger.Error("reading from backend", zap.Error(rerr)) } if nr > 0 { + logger.Debug("writing to downstream") nw, werr := dst.Write(buf[:nr]) if nw > 0 { written += int64(nw) } + logger.Debug("wrote to downstream", + zap.Int("written", nw), + zap.Int64("written_total", written), + zap.Error(werr)) if werr != nil { return written, fmt.Errorf("writing: %w", werr) } @@ -452,18 +475,22 @@ type maxLatencyWriter struct { mu sync.Mutex // protects t, flushPending, and dst.Flush t *time.Timer flushPending bool + logger *zap.Logger } func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { m.mu.Lock() defer m.mu.Unlock() n, err = m.dst.Write(p) + m.logger.Debug("wrote bytes", zap.Int("n", n), zap.Error(err)) if m.latency < 0 { + m.logger.Debug("flushing immediately") //nolint:errcheck m.flush() return } if m.flushPending { + m.logger.Debug("delayed flush already pending") return } if m.t == nil { @@ -471,6 +498,7 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { } else { m.t.Reset(m.latency) } + m.logger.Debug("timer set for delayed flush", zap.Duration("duration", m.latency)) m.flushPending = true return } @@ -479,8 +507,10 @@ func (m *maxLatencyWriter) delayedFlush() { m.mu.Lock() defer m.mu.Unlock() if !m.flushPending { // if stop was called but AfterFunc already started this goroutine + m.logger.Debug("delayed flush is not pending") return } + m.logger.Debug("delayed flush") //nolint:errcheck m.flush() m.flushPending = false -- cgit v1.2.3