summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/caddyhttp/reverseproxy/streaming.go')
-rw-r--r--modules/caddyhttp/reverseproxy/streaming.go60
1 files changed, 31 insertions, 29 deletions
diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go
index 6c1e44c..3f2489d 100644
--- a/modules/caddyhttp/reverseproxy/streaming.go
+++ b/modules/caddyhttp/reverseproxy/streaming.go
@@ -20,6 +20,7 @@ package reverseproxy
import (
"context"
+ "errors"
"fmt"
"io"
weakrand "math/rand"
@@ -51,17 +52,19 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
return
}
- hj, ok := rw.(http.Hijacker)
- if !ok {
- logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw)))
- return
- }
backConn, ok := res.Body.(io.ReadWriteCloser)
if !ok {
logger.Error("internal error: 101 switching protocols response with non-writable body")
return
}
+ //nolint:bodyclose
+ conn, brw, hijackErr := http.NewResponseController(rw).Hijack()
+ if errors.Is(hijackErr, http.ErrNotSupported) {
+ h.logger.Sugar().Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw)
+ return
+ }
+
// adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5
backConnCloseCh := make(chan struct{})
go func() {
@@ -81,9 +84,8 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
rw.WriteHeader(res.StatusCode)
logger.Debug("upgrading connection")
- conn, brw, err := hj.Hijack()
- if err != nil {
- logger.Error("hijack failed on protocol switch", zap.Error(err))
+ if hijackErr != nil {
+ h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr))
return
}
@@ -181,26 +183,28 @@ func (h Handler) isBidirectionalStream(req *http.Request, res *http.Response) bo
(ae == "identity" || ae == "")
}
-func (h Handler) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error {
+func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
+ var w io.Writer = dst
+
if flushInterval != 0 {
- if wf, ok := dst.(writeFlusher); ok {
- mlw := &maxLatencyWriter{
- dst: wf,
- latency: flushInterval,
- }
- defer mlw.stop()
+ mlw := &maxLatencyWriter{
+ dst: dst,
+ //nolint:bodyclose
+ flush: http.NewResponseController(dst).Flush,
+ latency: flushInterval,
+ }
+ defer mlw.stop()
- // set up initial timer so headers get flushed even if body writes are delayed
- mlw.flushPending = true
- mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
+ // set up initial timer so headers get flushed even if body writes are delayed
+ mlw.flushPending = true
+ mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
- dst = mlw
- }
+ w = mlw
}
buf := streamingBufPool.Get().(*[]byte)
defer streamingBufPool.Put(buf)
- _, err := h.copyBuffer(dst, src, *buf)
+ _, err := h.copyBuffer(w, src, *buf)
return err
}
@@ -439,13 +443,9 @@ type openConnection struct {
gracefulClose func() error
}
-type writeFlusher interface {
- io.Writer
- http.Flusher
-}
-
type maxLatencyWriter struct {
- dst writeFlusher
+ dst io.Writer
+ flush func() error
latency time.Duration // non-zero; negative means to flush immediately
mu sync.Mutex // protects t, flushPending, and dst.Flush
@@ -458,7 +458,8 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
defer m.mu.Unlock()
n, err = m.dst.Write(p)
if m.latency < 0 {
- m.dst.Flush()
+ //nolint:errcheck
+ m.flush()
return
}
if m.flushPending {
@@ -479,7 +480,8 @@ func (m *maxLatencyWriter) delayedFlush() {
if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
return
}
- m.dst.Flush()
+ //nolint:errcheck
+ m.flush()
m.flushPending = false
}