summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/streaming.go
diff options
context:
space:
mode:
authorMatt Holt <mholt@users.noreply.github.com>2022-09-02 17:01:55 -0600
committerGitHub <noreply@github.com>2022-09-02 17:01:55 -0600
commit66476d8c8f6010f19fb65bac7758c6fd2824e231 (patch)
tree951aee5aa7e5d8d950aedd234bfaf3d39e3f04f9 /modules/caddyhttp/reverseproxy/streaming.go
parentd3c3fa10bd72029720969cff0c29e2b79a1b2cdf (diff)
reverseproxy: Close hijacked conns on reload/quit (#4895)
* reverseproxy: Close hijacked conns on reload/quit We also send a Close control message to both ends of WebSocket connections. I have tested this many times in my dev environment with consistent success, although the variety of scenarios was limited. * Oops... actually call Close() this time * CloseMessage --> closeMessage Co-authored-by: Francis Lavoie <lavofr@gmail.com> * Use httpguts, duh * Use map instead of sync.Map Co-authored-by: Francis Lavoie <lavofr@gmail.com>
Diffstat (limited to 'modules/caddyhttp/reverseproxy/streaming.go')
-rw-r--r--modules/caddyhttp/reverseproxy/streaming.go78
1 files changed, 76 insertions, 2 deletions
diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go
index 298ab58..01d865d 100644
--- a/modules/caddyhttp/reverseproxy/streaming.go
+++ b/modules/caddyhttp/reverseproxy/streaming.go
@@ -20,6 +20,7 @@ package reverseproxy
import (
"context"
+ "encoding/binary"
"io"
"mime"
"net/http"
@@ -27,6 +28,7 @@ import (
"time"
"go.uber.org/zap"
+ "golang.org/x/net/http/httpguts"
)
func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) {
@@ -97,8 +99,26 @@ func (h Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrite
return
}
- errc := make(chan error, 1)
+ // Ensure the hijacked client connection, and the new connection established
+ // with the backend, are both closed in the event of a server shutdown. This
+ // is done by registering them. We also try to gracefully close connections
+ // we recognize as websockets.
+ gracefulClose := func(conn io.ReadWriteCloser) func() error {
+ if isWebsocket(req) {
+ return func() error {
+ return writeCloseControl(conn)
+ }
+ }
+ return nil
+ }
+ deleteFrontConn := h.registerConnection(conn, gracefulClose(conn))
+ deleteBackConn := h.registerConnection(backConn, gracefulClose(backConn))
+ defer deleteFrontConn()
+ defer deleteBackConn()
+
spc := switchProtocolCopier{user: conn, backend: backConn}
+
+ errc := make(chan error, 1)
go spc.copyToBackend(errc)
go spc.copyFromBackend(errc)
<-errc
@@ -209,6 +229,60 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er
}
}
+// registerConnection holds onto conn so it can be closed in the event
+// of a server shutdown. This is useful because hijacked connections or
+// connections dialed to backends don't close when server is shut down.
+// The caller should call the returned delete() function when the
+// connection is done to remove it from memory.
+func (h *Handler) registerConnection(conn io.ReadWriteCloser, gracefulClose func() error) (del func()) {
+ h.connectionsMu.Lock()
+ h.connections[conn] = openConnection{conn, gracefulClose}
+ h.connectionsMu.Unlock()
+ return func() {
+ h.connectionsMu.Lock()
+ delete(h.connections, conn)
+ h.connectionsMu.Unlock()
+ }
+}
+
+// writeCloseControl sends a best-effort Close control message to the given
+// WebSocket connection. Thanks to @pascaldekloe who provided inspiration
+// from his simple implementation of this I was able to learn from at:
+// github.com/pascaldekloe/websocket.
+func writeCloseControl(conn io.Writer) error {
+ // https://github.com/pascaldekloe/websocket/blob/32050af67a5d/websocket.go#L119
+
+ var reason string // max 123 bytes (control frame payload limit is 125; status code takes 2)
+ const goingAway uint16 = 1001
+
+ // TODO: we might need to ensure we are the exclusive writer by this point (io.Copy is stopped)?
+ var writeBuf [127]byte
+ const closeMessage = 8
+ const finalBit = 1 << 7
+ writeBuf[0] = closeMessage | finalBit
+ writeBuf[1] = byte(len(reason) + 2)
+ binary.BigEndian.PutUint16(writeBuf[2:4], goingAway)
+ copy(writeBuf[4:], reason)
+
+ // simply best-effort, but return error for logging purposes
+ _, err := conn.Write(writeBuf[:4+len(reason)])
+ return err
+}
+
+// isWebsocket returns true if r looks to be an upgrade request for WebSockets.
+// It is a fairly naive check.
+func isWebsocket(r *http.Request) bool {
+ return httpguts.HeaderValuesContainsToken(r.Header["Connection"], "upgrade") &&
+ httpguts.HeaderValuesContainsToken(r.Header["Upgrade"], "websocket")
+}
+
+// openConnection maps an open connection to
+// an optional function for graceful close.
+type openConnection struct {
+ conn io.ReadWriteCloser
+ gracefulClose func() error
+}
+
type writeFlusher interface {
io.Writer
http.Flusher
@@ -265,7 +339,7 @@ func (m *maxLatencyWriter) stop() {
// switchProtocolCopier exists so goroutines proxying data back and
// forth have nice names in stacks.
type switchProtocolCopier struct {
- user, backend io.ReadWriter
+ user, backend io.ReadWriteCloser
}
func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {