From d3c3fa10bd72029720969cff0c29e2b79a1b2cdf Mon Sep 17 00:00:00 2001 From: Matt Holt Date: Fri, 2 Sep 2022 16:59:11 -0600 Subject: core: Refactor listeners; use SO_REUSEPORT on Unix (#4705) * core: Refactor listeners; use SO_REUSEPORT on Unix Just an experiment for now * Fix lint by logging error * TCP Keepalive configuration (#4865) * initial attempt at TCP Keepalive configuration * core: implement tcp-keepalive for linux * move canSetKeepAlive interface * Godoc for keepalive server parameter * handle return values * log keepalive errors * Clean up after bad merge * Merge in pluggable network types From 1edc1a45e3aee1f7d86b68c3ddaf2fd16ba8ab73 * Slight refactor, fix from recent merge conflict Co-authored-by: Karmanyaah Malhotra --- caddyconfig/httpcaddyfile/serveroptions.go | 11 ++ listen.go | 172 +++++++++++++++++++++++++++++ listen_linux.go | 34 ++++++ listeners.go | 148 ++----------------------- modules/caddyhttp/app.go | 2 +- modules/caddyhttp/server.go | 5 + 6 files changed, 234 insertions(+), 138 deletions(-) create mode 100644 listen.go create mode 100644 listen_linux.go diff --git a/caddyconfig/httpcaddyfile/serveroptions.go b/caddyconfig/httpcaddyfile/serveroptions.go index 6764f1a..f3e3d73 100644 --- a/caddyconfig/httpcaddyfile/serveroptions.go +++ b/caddyconfig/httpcaddyfile/serveroptions.go @@ -38,6 +38,7 @@ type serverOptions struct { ReadHeaderTimeout caddy.Duration WriteTimeout caddy.Duration IdleTimeout caddy.Duration + KeepAliveInterval caddy.Duration MaxHeaderBytes int Protocols []string StrictSNIHost *bool @@ -122,6 +123,15 @@ func unmarshalCaddyfileServerOptions(d *caddyfile.Dispenser) (any, error) { return nil, d.Errf("unrecognized timeouts option '%s'", d.Val()) } } + case "keepalive_interval": + if !d.NextArg() { + return nil, d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return nil, d.Errf("parsing keepalive interval duration: %v", err) + } + serverOpts.KeepAliveInterval = caddy.Duration(dur) case "max_header_size": var sizeStr string @@ -245,6 +255,7 @@ func applyServerOptions( server.ReadHeaderTimeout = opts.ReadHeaderTimeout server.WriteTimeout = opts.WriteTimeout server.IdleTimeout = opts.IdleTimeout + server.KeepAliveInterval = opts.KeepAliveInterval server.MaxHeaderBytes = opts.MaxHeaderBytes server.Protocols = opts.Protocols server.StrictSNIHost = opts.StrictSNIHost diff --git a/listen.go b/listen.go new file mode 100644 index 0000000..2c4a0b2 --- /dev/null +++ b/listen.go @@ -0,0 +1,172 @@ +//go:build !linux + +package caddy + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Listener, error) { + // check to see if plugin provides listener + if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { + return ln, err + } + + lnKey := listenerKey(network, addr) + + sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { + ln, err := net.Listen(network, addr) + if err != nil { + // https://github.com/caddyserver/caddy/pull/4534 + if isUnixNetwork(network) && isListenBindAddressAlreadyInUseError(err) { + return nil, fmt.Errorf("%w: this can happen if Caddy was forcefully killed", err) + } + return nil, err + } + return &sharedListener{Listener: ln, key: lnKey}, nil + }) + if err != nil { + return nil, err + } + + return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: keepAlivePeriod}, nil +} + +// fakeCloseListener is a private wrapper over a listener that +// is shared. The state of fakeCloseListener is not shared. +// This allows one user of a socket to "close" the listener +// while in reality the socket stays open for other users of +// the listener. In this way, servers become hot-swappable +// while the listener remains running. Listeners should be +// re-wrapped in a new fakeCloseListener each time the listener +// is reused. This type is atomic and values must not be copied. +type fakeCloseListener struct { + closed int32 // accessed atomically; belongs to this struct only + *sharedListener // embedded, so we also become a net.Listener + keepAlivePeriod time.Duration +} + +type canSetKeepAlive interface { + SetKeepAlivePeriod(d time.Duration) error + SetKeepAlive(bool) error +} + +func (fcl *fakeCloseListener) Accept() (net.Conn, error) { + // if the listener is already "closed", return error + if atomic.LoadInt32(&fcl.closed) == 1 { + return nil, fakeClosedErr(fcl) + } + + // call underlying accept + conn, err := fcl.sharedListener.Accept() + if err == nil { + // if 0, do nothing, Go's default is already set + // and if the connection allows setting KeepAlive, set it + if tconn, ok := conn.(canSetKeepAlive); ok && fcl.keepAlivePeriod != 0 { + if fcl.keepAlivePeriod > 0 { + err = tconn.SetKeepAlivePeriod(fcl.keepAlivePeriod) + } else { // negative + err = tconn.SetKeepAlive(false) + } + if err != nil { + Log().With(zap.String("server", fcl.sharedListener.key)).Warn("unable to set keepalive for new connection:", zap.Error(err)) + } + } + return conn, nil + } + + // since Accept() returned an error, it may be because our reference to + // the listener (this fakeCloseListener) may have been closed, i.e. the + // server is shutting down; in that case, we need to clear the deadline + // that we set when Close() was called, and return a non-temporary and + // non-timeout error value to the caller, masking the "true" error, so + // that server loops / goroutines won't retry, linger, and leak + if atomic.LoadInt32(&fcl.closed) == 1 { + // we dereference the sharedListener explicitly even though it's embedded + // so that it's clear in the code that side-effects are shared with other + // users of this listener, not just our own reference to it; we also don't + // do anything with the error because all we could do is log it, but we + // expliclty assign it to nothing so we don't forget it's there if needed + _ = fcl.sharedListener.clearDeadline() + + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + return nil, fakeClosedErr(fcl) + } + } + + return nil, err +} + +// Close stops accepting new connections without closing the +// underlying listener. The underlying listener is only closed +// if the caller is the last known user of the socket. +func (fcl *fakeCloseListener) Close() error { + if atomic.CompareAndSwapInt32(&fcl.closed, 0, 1) { + // There are two ways I know of to get an Accept() + // function to return to the server loop that called + // it: close the listener, or set a deadline in the + // past. Obviously, we can't close the socket yet + // since others may be using it (hence this whole + // file). But we can set the deadline in the past, + // and this is kind of cheating, but it works, and + // it apparently even works on Windows. + _ = fcl.sharedListener.setDeadline() + _, _ = listenerPool.Delete(fcl.sharedListener.key) + } + return nil +} + +// sharedListener is a wrapper over an underlying listener. The listener +// and the other fields on the struct are shared state that is synchronized, +// so sharedListener structs must never be copied (always use a pointer). +type sharedListener struct { + net.Listener + key string // uniquely identifies this listener + deadline bool // whether a deadline is currently set + deadlineMu sync.Mutex +} + +func (sl *sharedListener) clearDeadline() error { + var err error + sl.deadlineMu.Lock() + if sl.deadline { + switch ln := sl.Listener.(type) { + case *net.TCPListener: + err = ln.SetDeadline(time.Time{}) + case *net.UnixListener: + err = ln.SetDeadline(time.Time{}) + } + sl.deadline = false + } + sl.deadlineMu.Unlock() + return err +} + +func (sl *sharedListener) setDeadline() error { + timeInPast := time.Now().Add(-1 * time.Minute) + var err error + sl.deadlineMu.Lock() + if !sl.deadline { + switch ln := sl.Listener.(type) { + case *net.TCPListener: + err = ln.SetDeadline(timeInPast) + case *net.UnixListener: + err = ln.SetDeadline(timeInPast) + } + sl.deadline = true + } + sl.deadlineMu.Unlock() + return err +} + +// Destruct is called by the UsagePool when the listener is +// finally not being used anymore. It closes the socket. +func (sl *sharedListener) Destruct() error { + return sl.Listener.Close() +} diff --git a/listen_linux.go b/listen_linux.go new file mode 100644 index 0000000..b1220ce --- /dev/null +++ b/listen_linux.go @@ -0,0 +1,34 @@ +package caddy + +import ( + "context" + "net" + "syscall" + "time" + + "go.uber.org/zap" + "golang.org/x/sys/unix" +) + +// ListenTimeout is the same as Listen, but with a configurable keep-alive timeout duration. +func ListenTimeout(network, addr string, keepalivePeriod time.Duration) (net.Listener, error) { + // check to see if plugin provides listener + if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { + return ln, err + } + + config := &net.ListenConfig{Control: reusePort, KeepAlive: keepalivePeriod} + return config.Listen(context.Background(), network, addr) +} + +func reusePort(network, address string, conn syscall.RawConn) error { + return conn.Control(func(descriptor uintptr) { + if err := unix.SetsockoptInt(int(descriptor), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil { + Log().Error("setting SO_REUSEPORT", + zap.String("network", network), + zap.String("address", address), + zap.Uintptr("descriptor", descriptor), + zap.Error(err)) + } + }) +} diff --git a/listeners.go b/listeners.go index a95c98c..5914443 100644 --- a/listeners.go +++ b/listeners.go @@ -24,10 +24,8 @@ import ( "os" "strconv" "strings" - "sync" "sync/atomic" "syscall" - "time" "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/http3" @@ -43,6 +41,14 @@ import ( // the socket have been finished. Always be sure to close listeners // when you are done with them, just like normal listeners. func Listen(network, addr string) (net.Listener, error) { + // a 0 timeout means Go uses its default + return ListenTimeout(network, addr, 0) +} + +// getListenerFromPlugin returns a listener on the given network and address +// if a plugin has registered the network name. It may return (nil, nil) if +// no plugin can provide a listener. +func getListenerFromPlugin(network, addr string) (net.Listener, error) { network = strings.TrimSpace(strings.ToLower(network)) // get listener from plugin if network type is registered @@ -51,24 +57,7 @@ func Listen(network, addr string) (net.Listener, error) { return getListener(network, addr) } - lnKey := listenerKey(network, addr) - - sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { - ln, err := net.Listen(network, addr) - if err != nil { - // https://github.com/caddyserver/caddy/pull/4534 - if isUnixNetwork(network) && isListenBindAddressAlreadyInUseError(err) { - return nil, fmt.Errorf("%w: this can happen if Caddy was forcefully killed", err) - } - return nil, err - } - return &sharedListener{Listener: ln, key: lnKey}, nil - }) - if err != nil { - return nil, err - } - - return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener)}, nil + return nil, nil } // ListenPacket returns a net.PacketConn suitable for use in a Caddy module. @@ -124,80 +113,14 @@ func ListenQUIC(addr string, tlsConf *tls.Config, activeRequests *int64) (quic.E }, err } -func listenerKey(network, addr string) string { - return network + "/" + addr -} - // ListenerUsage returns the current usage count of the given listener address. func ListenerUsage(network, addr string) int { count, _ := listenerPool.References(listenerKey(network, addr)) return count } -// fakeCloseListener is a private wrapper over a listener that -// is shared. The state of fakeCloseListener is not shared. -// This allows one user of a socket to "close" the listener -// while in reality the socket stays open for other users of -// the listener. In this way, servers become hot-swappable -// while the listener remains running. Listeners should be -// re-wrapped in a new fakeCloseListener each time the listener -// is reused. This type is atomic and values must not be copied. -type fakeCloseListener struct { - closed int32 // accessed atomically; belongs to this struct only - *sharedListener // embedded, so we also become a net.Listener -} - -func (fcl *fakeCloseListener) Accept() (net.Conn, error) { - // if the listener is already "closed", return error - if atomic.LoadInt32(&fcl.closed) == 1 { - return nil, fakeClosedErr(fcl) - } - - // call underlying accept - conn, err := fcl.sharedListener.Accept() - if err == nil { - return conn, nil - } - - // since Accept() returned an error, it may be because our reference to - // the listener (this fakeCloseListener) may have been closed, i.e. the - // server is shutting down; in that case, we need to clear the deadline - // that we set when Close() was called, and return a non-temporary and - // non-timeout error value to the caller, masking the "true" error, so - // that server loops / goroutines won't retry, linger, and leak - if atomic.LoadInt32(&fcl.closed) == 1 { - // we dereference the sharedListener explicitly even though it's embedded - // so that it's clear in the code that side-effects are shared with other - // users of this listener, not just our own reference to it; we also don't - // do anything with the error because all we could do is log it, but we - // expliclty assign it to nothing so we don't forget it's there if needed - _ = fcl.sharedListener.clearDeadline() - - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - return nil, fakeClosedErr(fcl) - } - } - - return nil, err -} - -// Close stops accepting new connections without closing the -// underlying listener. The underlying listener is only closed -// if the caller is the last known user of the socket. -func (fcl *fakeCloseListener) Close() error { - if atomic.CompareAndSwapInt32(&fcl.closed, 0, 1) { - // There are two ways I know of to get an Accept() - // function to return to the server loop that called - // it: close the listener, or set a deadline in the - // past. Obviously, we can't close the socket yet - // since others may be using it (hence this whole - // file). But we can set the deadline in the past, - // and this is kind of cheating, but it works, and - // it apparently even works on Windows. - _ = fcl.sharedListener.setDeadline() - _, _ = listenerPool.Delete(fcl.sharedListener.key) - } - return nil +func listenerKey(network, addr string) string { + return network + "/" + addr } type fakeCloseQuicListener struct { @@ -283,55 +206,6 @@ func (fcpc fakeClosePacketConn) SyscallConn() (syscall.RawConn, error) { return nil, fmt.Errorf("SyscallConn() not implemented for %T", fcpc.PacketConn) } -// sharedListener is a wrapper over an underlying listener. The listener -// and the other fields on the struct are shared state that is synchronized, -// so sharedListener structs must never be copied (always use a pointer). -type sharedListener struct { - net.Listener - key string // uniquely identifies this listener - deadline bool // whether a deadline is currently set - deadlineMu sync.Mutex -} - -func (sl *sharedListener) clearDeadline() error { - var err error - sl.deadlineMu.Lock() - if sl.deadline { - switch ln := sl.Listener.(type) { - case *net.TCPListener: - err = ln.SetDeadline(time.Time{}) - case *net.UnixListener: - err = ln.SetDeadline(time.Time{}) - } - sl.deadline = false - } - sl.deadlineMu.Unlock() - return err -} - -func (sl *sharedListener) setDeadline() error { - timeInPast := time.Now().Add(-1 * time.Minute) - var err error - sl.deadlineMu.Lock() - if !sl.deadline { - switch ln := sl.Listener.(type) { - case *net.TCPListener: - err = ln.SetDeadline(timeInPast) - case *net.UnixListener: - err = ln.SetDeadline(timeInPast) - } - sl.deadline = true - } - sl.deadlineMu.Unlock() - return err -} - -// Destruct is called by the UsagePool when the listener is -// finally not being used anymore. It closes the socket. -func (sl *sharedListener) Destruct() error { - return sl.Listener.Close() -} - // sharedQuicListener is like sharedListener, but for quic.EarlyListeners. type sharedQuicListener struct { quic.EarlyListener diff --git a/modules/caddyhttp/app.go b/modules/caddyhttp/app.go index 3db87b1..e48f828 100644 --- a/modules/caddyhttp/app.go +++ b/modules/caddyhttp/app.go @@ -384,7 +384,7 @@ func (app *App) Start() error { for portOffset := uint(0); portOffset < listenAddr.PortRangeSize(); portOffset++ { // create the listener for this socket hostport := listenAddr.JoinHostPort(portOffset) - ln, err := caddy.Listen(listenAddr.Network, hostport) + ln, err := caddy.ListenTimeout(listenAddr.Network, hostport, time.Duration(srv.KeepAliveInterval)) if err != nil { return fmt.Errorf("%s: listening on %s: %v", listenAddr.Network, hostport, err) } diff --git a/modules/caddyhttp/server.go b/modules/caddyhttp/server.go index 1bba34c..e2da531 100644 --- a/modules/caddyhttp/server.go +++ b/modules/caddyhttp/server.go @@ -72,6 +72,11 @@ type Server struct { // 5m is applied to help avoid resource exhaustion. IdleTimeout caddy.Duration `json:"idle_timeout,omitempty"` + // KeepAliveInterval is the interval at which TCP keepalive packets + // are sent to keep the connection alive at the TCP layer when no other + // data is being transmitted. The default is 15s. + KeepAliveInterval caddy.Duration `json:"keepalive_interval,omitempty"` + // MaxHeaderBytes is the maximum size to parse from a client's // HTTP request headers. MaxHeaderBytes int `json:"max_header_bytes,omitempty"` -- cgit v1.2.3