From 1960a0dc117dd30fb507b390ddf93b2ef371b9ad Mon Sep 17 00:00:00 2001 From: Matt Holt Date: Wed, 3 Aug 2022 11:04:51 -0600 Subject: httpserver: Configurable shutdown delay (#4906) --- caddyconfig/httpcaddyfile/httptype.go | 9 +- caddyconfig/httpcaddyfile/options.go | 1 + .../integration/caddyfile_adapt/global_options.txt | 2 + listeners.go | 30 ++++- listeners_test.go | 82 ++++++++++++++ modules/caddyhttp/app.go | 126 +++++++++++++++------ modules/caddyhttp/replacer.go | 16 +++ modules/caddyhttp/server.go | 8 +- usagepool.go | 15 +++ 9 files changed, 244 insertions(+), 45 deletions(-) diff --git a/caddyconfig/httpcaddyfile/httptype.go b/caddyconfig/httpcaddyfile/httptype.go index 7153b8c..cca864d 100644 --- a/caddyconfig/httpcaddyfile/httptype.go +++ b/caddyconfig/httpcaddyfile/httptype.go @@ -190,10 +190,11 @@ func (st ServerType) Setup(inputServerBlocks []caddyfile.ServerBlock, // now that each server is configured, make the HTTP app httpApp := caddyhttp.App{ - HTTPPort: tryInt(options["http_port"], &warnings), - HTTPSPort: tryInt(options["https_port"], &warnings), - GracePeriod: tryDuration(options["grace_period"], &warnings), - Servers: servers, + HTTPPort: tryInt(options["http_port"], &warnings), + HTTPSPort: tryInt(options["https_port"], &warnings), + GracePeriod: tryDuration(options["grace_period"], &warnings), + ShutdownDelay: tryDuration(options["shutdown_delay"], &warnings), + Servers: servers, } // then make the TLS app diff --git a/caddyconfig/httpcaddyfile/options.go b/caddyconfig/httpcaddyfile/options.go index d9af04e..36f8f4b 100644 --- a/caddyconfig/httpcaddyfile/options.go +++ b/caddyconfig/httpcaddyfile/options.go @@ -31,6 +31,7 @@ func init() { RegisterGlobalOption("https_port", parseOptHTTPSPort) RegisterGlobalOption("default_bind", parseOptStringList) RegisterGlobalOption("grace_period", parseOptDuration) + RegisterGlobalOption("shutdown_delay", parseOptDuration) RegisterGlobalOption("default_sni", parseOptSingleString) RegisterGlobalOption("order", parseOptOrder) RegisterGlobalOption("storage", parseOptStorage) diff --git a/caddytest/integration/caddyfile_adapt/global_options.txt b/caddytest/integration/caddyfile_adapt/global_options.txt index 69e2d9d..57831a4 100644 --- a/caddytest/integration/caddyfile_adapt/global_options.txt +++ b/caddytest/integration/caddyfile_adapt/global_options.txt @@ -3,6 +3,7 @@ http_port 8080 https_port 8443 grace_period 5s + shutdown_delay 10s default_sni localhost order root first storage file_system { @@ -45,6 +46,7 @@ "http_port": 8080, "https_port": 8443, "grace_period": 5000000000, + "shutdown_delay": 10000000000, "servers": { "srv0": { "listen": [ diff --git a/listeners.go b/listeners.go index 4c86e82..d9cfbf2 100644 --- a/listeners.go +++ b/listeners.go @@ -41,7 +41,7 @@ import ( // the socket have been finished. Always be sure to close listeners // when you are done with them, just like normal listeners. func Listen(network, addr string) (net.Listener, error) { - lnKey := network + "/" + addr + lnKey := listenerKey(network, addr) sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { ln, err := net.Listen(network, addr) @@ -65,7 +65,7 @@ func Listen(network, addr string) (net.Listener, error) { // It is like Listen except for PacketConns. // Always be sure to close the PacketConn when you are done. func ListenPacket(network, addr string) (net.PacketConn, error) { - lnKey := network + "/" + addr + lnKey := listenerKey(network, addr) sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { pc, err := net.ListenPacket(network, addr) @@ -89,7 +89,7 @@ func ListenPacket(network, addr string) (net.PacketConn, error) { // Note that the context passed to Accept is currently ignored, so using // a context other than context.Background is meaningless. func ListenQUIC(addr string, tlsConf *tls.Config) (quic.EarlyListener, error) { - lnKey := "quic/" + addr + lnKey := listenerKey("udp", addr) sharedEl, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { el, err := quic.ListenAddrEarly(addr, http3.ConfigureTLSConfig(tlsConf), &quic.Config{}) @@ -106,6 +106,16 @@ func ListenQUIC(addr string, tlsConf *tls.Config) (quic.EarlyListener, error) { }, err } +func listenerKey(network, addr string) string { + return network + "/" + addr +} + +// ListenerUsage returns the current usage count of the given listener address. +func ListenerUsage(network, addr string) int { + count, _ := listenerPool.References(listenerKey(network, addr)) + return count +} + // fakeCloseListener is a private wrapper over a listener that // is shared. The state of fakeCloseListener is not shared. // This allows one user of a socket to "close" the listener @@ -353,11 +363,25 @@ func (na NetworkAddress) JoinHostPort(offset uint) string { return net.JoinHostPort(na.Host, strconv.Itoa(int(na.StartPort+offset))) } +func (na NetworkAddress) Expand() []NetworkAddress { + size := na.PortRangeSize() + addrs := make([]NetworkAddress, size) + for portOffset := uint(0); portOffset < size; portOffset++ { + na2 := na + na2.StartPort, na2.EndPort = na.StartPort+portOffset, na.StartPort+portOffset + addrs[portOffset] = na2 + } + return addrs +} + // PortRangeSize returns how many ports are in // pa's port range. Port ranges are inclusive, // so the size is the difference of start and // end ports plus one. func (na NetworkAddress) PortRangeSize() uint { + if na.EndPort < na.StartPort { + return 0 + } return (na.EndPort - na.StartPort) + 1 } diff --git a/listeners_test.go b/listeners_test.go index 6b0f440..c5aa527 100644 --- a/listeners_test.go +++ b/listeners_test.go @@ -331,3 +331,85 @@ func TestJoinHostPort(t *testing.T) { } } } + +func TestExpand(t *testing.T) { + for i, tc := range []struct { + input NetworkAddress + expect []NetworkAddress + }{ + { + input: NetworkAddress{ + Network: "tcp", + Host: "localhost", + StartPort: 2000, + EndPort: 2000, + }, + expect: []NetworkAddress{ + { + Network: "tcp", + Host: "localhost", + StartPort: 2000, + EndPort: 2000, + }, + }, + }, + { + input: NetworkAddress{ + Network: "tcp", + Host: "localhost", + StartPort: 2000, + EndPort: 2002, + }, + expect: []NetworkAddress{ + { + Network: "tcp", + Host: "localhost", + StartPort: 2000, + EndPort: 2000, + }, + { + Network: "tcp", + Host: "localhost", + StartPort: 2001, + EndPort: 2001, + }, + { + Network: "tcp", + Host: "localhost", + StartPort: 2002, + EndPort: 2002, + }, + }, + }, + { + input: NetworkAddress{ + Network: "tcp", + Host: "localhost", + StartPort: 2000, + EndPort: 1999, + }, + expect: []NetworkAddress{}, + }, + { + input: NetworkAddress{ + Network: "unix", + Host: "/foo/bar", + StartPort: 0, + EndPort: 0, + }, + expect: []NetworkAddress{ + { + Network: "unix", + Host: "/foo/bar", + StartPort: 0, + EndPort: 0, + }, + }, + }, + } { + actual := tc.input.Expand() + if !reflect.DeepEqual(actual, tc.expect) { + t.Errorf("Test %d: Expected %+v but got %+v", i, tc.expect, actual) + } + } +} diff --git a/modules/caddyhttp/app.go b/modules/caddyhttp/app.go index 24069eb..c28e09d 100644 --- a/modules/caddyhttp/app.go +++ b/modules/caddyhttp/app.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "strconv" + "sync" "time" "github.com/caddyserver/caddy/v2" @@ -95,6 +96,8 @@ func init() { // `{http.request.uri}` | The full request URI // `{http.response.header.*}` | Specific response header field // `{http.vars.*}` | Custom variables in the HTTP handler chain +// `{http.shutting_down}` | True if the HTTP app is shutting down +// `{http.time_until_shutdown}` | Time until HTTP server shutdown, if scheduled type App struct { // HTTPPort specifies the port to use for HTTP (as opposed to HTTPS), // which is used when setting up HTTP->HTTPS redirects or ACME HTTP @@ -107,18 +110,31 @@ type App struct { HTTPSPort int `json:"https_port,omitempty"` // GracePeriod is how long to wait for active connections when shutting - // down the server. Once the grace period is over, connections will - // be forcefully closed. + // down the servers. During the grace period, no new connections are + // accepted, idle connections are closed, and active connections will + // be given the full length of time to become idle and close. + // Once the grace period is over, connections will be forcefully closed. + // If zero, the grace period is eternal. Default: 0. GracePeriod caddy.Duration `json:"grace_period,omitempty"` + // ShutdownDelay is how long to wait before initiating the grace + // period. When this app is stopping (e.g. during a config reload or + // process exit), all servers will be shut down. Normally this immediately + // initiates the grace period. However, if this delay is configured, servers + // will not be shut down until the delay is over. During this time, servers + // continue to function normally and allow new connections. At the end, the + // grace period will begin. This can be useful to allow downstream load + // balancers time to move this instance out of the rotation without hiccups. + // + // When shutdown has been scheduled, placeholders {http.shutting_down} (bool) + // and {http.time_until_shutdown} (duration) may be useful for health checks. + ShutdownDelay caddy.Duration `json:"shutdown_delay,omitempty"` + // Servers is the list of servers, keyed by arbitrary names chosen // at your discretion for your own convenience; the keys do not // affect functionality. Servers map[string]*Server `json:"servers,omitempty"` - servers []*http.Server - h3servers []*http3.Server - ctx caddy.Context logger *zap.Logger tlsApp *caddytls.TLS @@ -162,6 +178,7 @@ func (app *App) Provision(ctx caddy.Context) error { srv.tlsApp = app.tlsApp srv.logger = app.logger.Named("log") srv.errorLogger = app.logger.Named("log.error") + srv.shutdownAtMu = new(sync.RWMutex) // only enable access logs if configured if srv.Logs != nil { @@ -298,7 +315,7 @@ func (app *App) Start() error { } for srvName, srv := range app.Servers { - s := &http.Server{ + srv.server = &http.Server{ ReadTimeout: time.Duration(srv.ReadTimeout), ReadHeaderTimeout: time.Duration(srv.ReadHeaderTimeout), WriteTimeout: time.Duration(srv.WriteTimeout), @@ -307,13 +324,14 @@ func (app *App) Start() error { Handler: srv, ErrorLog: serverLogger, } + tlsCfg := srv.TLSConnPolicies.TLSConfig(app.ctx) // enable h2c if configured if srv.AllowH2C { h2server := &http2.Server{ IdleTimeout: time.Duration(srv.IdleTimeout), } - s.Handler = h2c.NewHandler(srv, h2server) + srv.server.Handler = h2c.NewHandler(srv, h2server) } for _, lnAddr := range srv.Listen { @@ -321,6 +339,8 @@ func (app *App) Start() error { if err != nil { return fmt.Errorf("%s: parsing listen address '%s': %v", srvName, lnAddr, err) } + srv.addresses = append(srv.addresses, listenAddr) + for portOffset := uint(0); portOffset < listenAddr.PortRangeSize(); portOffset++ { // create the listener for this socket hostport := listenAddr.JoinHostPort(portOffset) @@ -343,31 +363,27 @@ func (app *App) Start() error { useTLS := len(srv.TLSConnPolicies) > 0 && int(listenAddr.StartPort+portOffset) != app.httpPort() if useTLS { // create TLS listener - tlsCfg := srv.TLSConnPolicies.TLSConfig(app.ctx) ln = tls.NewListener(ln, tlsCfg) - ///////// // TODO: HTTP/3 support is experimental for now if srv.ExperimentalHTTP3 { - app.logger.Info("enabling experimental HTTP/3 listener", - zap.String("addr", hostport), - ) + if srv.h3server == nil { + srv.h3server = &http3.Server{ + Handler: srv, + TLSConfig: tlsCfg, + MaxHeaderBytes: srv.MaxHeaderBytes, + } + } + + app.logger.Info("enabling experimental HTTP/3 listener", zap.String("addr", hostport)) h3ln, err := caddy.ListenQUIC(hostport, tlsCfg) if err != nil { return fmt.Errorf("getting HTTP/3 QUIC listener: %v", err) } - h3srv := &http3.Server{ - Addr: hostport, - Handler: srv, - TLSConfig: tlsCfg, - MaxHeaderBytes: srv.MaxHeaderBytes, - } + //nolint:errcheck - go h3srv.ServeListener(h3ln) - app.h3servers = append(app.h3servers, h3srv) - srv.h3server = h3srv + go srv.h3server.ServeListener(h3ln) } - ///////// } // finish wrapping listener where we left off before TLS @@ -390,11 +406,10 @@ func (app *App) Start() error { zap.Bool("tls", useTLS), ) - //nolint:errcheck - go s.Serve(ln) - srv.listeners = append(srv.listeners, ln) - app.servers = append(app.servers, s) + + //nolint:errcheck + go srv.server.Serve(ln) } } } @@ -412,28 +427,65 @@ func (app *App) Start() error { // Stop gracefully shuts down the HTTP server. func (app *App) Stop() error { ctx := context.Background() + + // see if any listeners in our config will be closing or if they are continuing + // hrough a reload; because if any are closing, we will enforce shutdown delay + var delay bool + scheduledTime := time.Now().Add(time.Duration(app.ShutdownDelay)) + if app.ShutdownDelay > 0 { + for _, server := range app.Servers { + for _, na := range server.addresses { + for _, addr := range na.Expand() { + if caddy.ListenerUsage(addr.Network, addr.JoinHostPort(0)) < 2 { + app.logger.Debug("listener closing and shutdown delay is configured", zap.String("address", addr.String())) + server.shutdownAtMu.Lock() + server.shutdownAt = scheduledTime + server.shutdownAtMu.Unlock() + delay = true + } else { + app.logger.Debug("shutdown delay configured but listener will remain open", zap.String("address", addr.String())) + } + } + } + } + } + + // honor scheduled/delayed shutdown time + if delay { + app.logger.Debug("shutdown scheduled", + zap.Duration("delay_duration", time.Duration(app.ShutdownDelay)), + zap.Time("time", scheduledTime)) + time.Sleep(time.Duration(app.ShutdownDelay)) + } + + // enforce grace period if configured if app.GracePeriod > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Duration(app.GracePeriod)) defer cancel() + app.logger.Debug("servers shutting down; grace period initiated", zap.Duration("duration", time.Duration(app.GracePeriod))) + } else { + app.logger.Debug("servers shutting down with eternal grace period") } - for i, s := range app.servers { - if err := s.Shutdown(ctx); err != nil { + + // shut down servers + for _, server := range app.Servers { + if err := server.server.Shutdown(ctx); err != nil { app.logger.Error("server shutdown", zap.Error(err), - zap.Int("index", i)) + zap.Strings("addresses", server.Listen)) } - } - for i, s := range app.h3servers { - // TODO: CloseGracefully, once implemented upstream - // (see https://github.com/lucas-clemente/quic-go/issues/2103) - if err := s.Close(); err != nil { - app.logger.Error("HTTP/3 server shutdown", - zap.Error(err), - zap.Int("index", i)) + if server.h3server != nil { + // TODO: CloseGracefully, once implemented upstream (see https://github.com/lucas-clemente/quic-go/issues/2103) + if err := server.h3server.Close(); err != nil { + app.logger.Error("HTTP/3 server shutdown", + zap.Error(err), + zap.Strings("addresses", server.Listen)) + } } } + return nil } diff --git a/modules/caddyhttp/replacer.go b/modules/caddyhttp/replacer.go index bde58b7..76c1c87 100644 --- a/modules/caddyhttp/replacer.go +++ b/modules/caddyhttp/replacer.go @@ -252,6 +252,22 @@ func addHTTPVarsToReplacer(repl *caddy.Replacer, req *http.Request, w http.Respo } } + switch { + case key == "http.shutting_down": + server := req.Context().Value(ServerCtxKey).(*Server) + server.shutdownAtMu.RLock() + defer server.shutdownAtMu.RUnlock() + return !server.shutdownAt.IsZero(), true + case key == "http.time_until_shutdown": + server := req.Context().Value(ServerCtxKey).(*Server) + server.shutdownAtMu.RLock() + defer server.shutdownAtMu.RUnlock() + if server.shutdownAt.IsZero() { + return nil, true + } + return time.Until(server.shutdownAt), true + } + return nil, false } diff --git a/modules/caddyhttp/server.go b/modules/caddyhttp/server.go index c665e29..62c0fd9 100644 --- a/modules/caddyhttp/server.go +++ b/modules/caddyhttp/server.go @@ -24,6 +24,7 @@ import ( "net/url" "runtime" "strings" + "sync" "time" "github.com/caddyserver/caddy/v2" @@ -139,7 +140,12 @@ type Server struct { accessLogger *zap.Logger errorLogger *zap.Logger - h3server *http3.Server + server *http.Server + h3server *http3.Server + addresses []caddy.NetworkAddress + + shutdownAt time.Time + shutdownAtMu *sync.RWMutex } // ServeHTTP is the entry point for all HTTP requests. diff --git a/usagepool.go b/usagepool.go index 9234021..c344415 100644 --- a/usagepool.go +++ b/usagepool.go @@ -194,6 +194,21 @@ func (up *UsagePool) Delete(key any) (deleted bool, err error) { return } +// References returns the number of references (count of usages) to a +// key in the pool, and true if the key exists, or false otherwise. +func (up *UsagePool) References(key interface{}) (int, bool) { + up.RLock() + upv, loaded := up.pool[key] + up.RUnlock() + if loaded { + // I wonder if it'd be safer to read this value during + // our lock on the UsagePool... guess we'll see... + refs := atomic.LoadInt32(&upv.refs) + return int(refs), true + } + return 0, false +} + // Constructor is a function that returns a new value // that can destruct itself when it is no longer needed. type Constructor func() (Destructor, error) -- cgit v1.2.3