summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/caddyhttp/reverseproxy/streaming.go')
-rw-r--r--modules/caddyhttp/reverseproxy/streaming.go36
1 files changed, 33 insertions, 3 deletions
diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go
index c5369d8..155a1df 100644
--- a/modules/caddyhttp/reverseproxy/streaming.go
+++ b/modules/caddyhttp/reverseproxy/streaming.go
@@ -184,15 +184,22 @@ func (h Handler) isBidirectionalStream(req *http.Request, res *http.Response) bo
(ae == "identity" || ae == "")
}
-func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
+func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration, logger *zap.Logger) error {
var w io.Writer = dst
if flushInterval != 0 {
+ var mlwLogger *zap.Logger
+ if h.VerboseLogs {
+ mlwLogger = logger.Named("max_latency_writer")
+ } else {
+ mlwLogger = zap.NewNop()
+ }
mlw := &maxLatencyWriter{
dst: dst,
//nolint:bodyclose
flush: http.NewResponseController(dst).Flush,
latency: flushInterval,
+ logger: mlwLogger,
}
defer mlw.stop()
@@ -205,19 +212,30 @@ func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInter
buf := streamingBufPool.Get().(*[]byte)
defer streamingBufPool.Put(buf)
- _, err := h.copyBuffer(w, src, *buf)
+
+ var copyLogger *zap.Logger
+ if h.VerboseLogs {
+ copyLogger = logger
+ } else {
+ copyLogger = zap.NewNop()
+ }
+
+ _, err := h.copyBuffer(w, src, *buf, copyLogger)
return err
}
// copyBuffer returns any write errors or non-EOF read errors, and the amount
// of bytes written.
-func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) {
+func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *zap.Logger) (int64, error) {
if len(buf) == 0 {
buf = make([]byte, defaultBufferSize)
}
var written int64
for {
+ logger.Debug("waiting to read from upstream")
nr, rerr := src.Read(buf)
+ logger := logger.With(zap.Int("read", nr))
+ logger.Debug("read from upstream", zap.Error(rerr))
if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
// TODO: this could be useful to know (indeed, it revealed an error in our
// fastcgi PoC earlier; but it's this single error report here that necessitates
@@ -229,10 +247,15 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er
h.logger.Error("reading from backend", zap.Error(rerr))
}
if nr > 0 {
+ logger.Debug("writing to downstream")
nw, werr := dst.Write(buf[:nr])
if nw > 0 {
written += int64(nw)
}
+ logger.Debug("wrote to downstream",
+ zap.Int("written", nw),
+ zap.Int64("written_total", written),
+ zap.Error(werr))
if werr != nil {
return written, fmt.Errorf("writing: %w", werr)
}
@@ -452,18 +475,22 @@ type maxLatencyWriter struct {
mu sync.Mutex // protects t, flushPending, and dst.Flush
t *time.Timer
flushPending bool
+ logger *zap.Logger
}
func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
m.mu.Lock()
defer m.mu.Unlock()
n, err = m.dst.Write(p)
+ m.logger.Debug("wrote bytes", zap.Int("n", n), zap.Error(err))
if m.latency < 0 {
+ m.logger.Debug("flushing immediately")
//nolint:errcheck
m.flush()
return
}
if m.flushPending {
+ m.logger.Debug("delayed flush already pending")
return
}
if m.t == nil {
@@ -471,6 +498,7 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
} else {
m.t.Reset(m.latency)
}
+ m.logger.Debug("timer set for delayed flush", zap.Duration("duration", m.latency))
m.flushPending = true
return
}
@@ -479,8 +507,10 @@ func (m *maxLatencyWriter) delayedFlush() {
m.mu.Lock()
defer m.mu.Unlock()
if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
+ m.logger.Debug("delayed flush is not pending")
return
}
+ m.logger.Debug("delayed flush")
//nolint:errcheck
m.flush()
m.flushPending = false