summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Holt <mholt@users.noreply.github.com>2022-08-03 11:04:51 -0600
committerGitHub <noreply@github.com>2022-08-03 11:04:51 -0600
commit1960a0dc117dd30fb507b390ddf93b2ef371b9ad (patch)
tree42837b24d53dd449ac153d7b1d252ee8422e4729
parent63c7720e84176184698730fed0ca7c402c53481a (diff)
httpserver: Configurable shutdown delay (#4906)
-rw-r--r--caddyconfig/httpcaddyfile/httptype.go9
-rw-r--r--caddyconfig/httpcaddyfile/options.go1
-rw-r--r--caddytest/integration/caddyfile_adapt/global_options.txt2
-rw-r--r--listeners.go30
-rw-r--r--listeners_test.go82
-rw-r--r--modules/caddyhttp/app.go126
-rw-r--r--modules/caddyhttp/replacer.go16
-rw-r--r--modules/caddyhttp/server.go8
-rw-r--r--usagepool.go15
9 files changed, 244 insertions, 45 deletions
diff --git a/caddyconfig/httpcaddyfile/httptype.go b/caddyconfig/httpcaddyfile/httptype.go
index 7153b8c..cca864d 100644
--- a/caddyconfig/httpcaddyfile/httptype.go
+++ b/caddyconfig/httpcaddyfile/httptype.go
@@ -190,10 +190,11 @@ func (st ServerType) Setup(inputServerBlocks []caddyfile.ServerBlock,
// now that each server is configured, make the HTTP app
httpApp := caddyhttp.App{
- HTTPPort: tryInt(options["http_port"], &warnings),
- HTTPSPort: tryInt(options["https_port"], &warnings),
- GracePeriod: tryDuration(options["grace_period"], &warnings),
- Servers: servers,
+ HTTPPort: tryInt(options["http_port"], &warnings),
+ HTTPSPort: tryInt(options["https_port"], &warnings),
+ GracePeriod: tryDuration(options["grace_period"], &warnings),
+ ShutdownDelay: tryDuration(options["shutdown_delay"], &warnings),
+ Servers: servers,
}
// then make the TLS app
diff --git a/caddyconfig/httpcaddyfile/options.go b/caddyconfig/httpcaddyfile/options.go
index d9af04e..36f8f4b 100644
--- a/caddyconfig/httpcaddyfile/options.go
+++ b/caddyconfig/httpcaddyfile/options.go
@@ -31,6 +31,7 @@ func init() {
RegisterGlobalOption("https_port", parseOptHTTPSPort)
RegisterGlobalOption("default_bind", parseOptStringList)
RegisterGlobalOption("grace_period", parseOptDuration)
+ RegisterGlobalOption("shutdown_delay", parseOptDuration)
RegisterGlobalOption("default_sni", parseOptSingleString)
RegisterGlobalOption("order", parseOptOrder)
RegisterGlobalOption("storage", parseOptStorage)
diff --git a/caddytest/integration/caddyfile_adapt/global_options.txt b/caddytest/integration/caddyfile_adapt/global_options.txt
index 69e2d9d..57831a4 100644
--- a/caddytest/integration/caddyfile_adapt/global_options.txt
+++ b/caddytest/integration/caddyfile_adapt/global_options.txt
@@ -3,6 +3,7 @@
http_port 8080
https_port 8443
grace_period 5s
+ shutdown_delay 10s
default_sni localhost
order root first
storage file_system {
@@ -45,6 +46,7 @@
"http_port": 8080,
"https_port": 8443,
"grace_period": 5000000000,
+ "shutdown_delay": 10000000000,
"servers": {
"srv0": {
"listen": [
diff --git a/listeners.go b/listeners.go
index 4c86e82..d9cfbf2 100644
--- a/listeners.go
+++ b/listeners.go
@@ -41,7 +41,7 @@ import (
// the socket have been finished. Always be sure to close listeners
// when you are done with them, just like normal listeners.
func Listen(network, addr string) (net.Listener, error) {
- lnKey := network + "/" + addr
+ lnKey := listenerKey(network, addr)
sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
ln, err := net.Listen(network, addr)
@@ -65,7 +65,7 @@ func Listen(network, addr string) (net.Listener, error) {
// It is like Listen except for PacketConns.
// Always be sure to close the PacketConn when you are done.
func ListenPacket(network, addr string) (net.PacketConn, error) {
- lnKey := network + "/" + addr
+ lnKey := listenerKey(network, addr)
sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
pc, err := net.ListenPacket(network, addr)
@@ -89,7 +89,7 @@ func ListenPacket(network, addr string) (net.PacketConn, error) {
// Note that the context passed to Accept is currently ignored, so using
// a context other than context.Background is meaningless.
func ListenQUIC(addr string, tlsConf *tls.Config) (quic.EarlyListener, error) {
- lnKey := "quic/" + addr
+ lnKey := listenerKey("udp", addr)
sharedEl, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
el, err := quic.ListenAddrEarly(addr, http3.ConfigureTLSConfig(tlsConf), &quic.Config{})
@@ -106,6 +106,16 @@ func ListenQUIC(addr string, tlsConf *tls.Config) (quic.EarlyListener, error) {
}, err
}
+func listenerKey(network, addr string) string {
+ return network + "/" + addr
+}
+
+// ListenerUsage returns the current usage count of the given listener address.
+func ListenerUsage(network, addr string) int {
+ count, _ := listenerPool.References(listenerKey(network, addr))
+ return count
+}
+
// fakeCloseListener is a private wrapper over a listener that
// is shared. The state of fakeCloseListener is not shared.
// This allows one user of a socket to "close" the listener
@@ -353,11 +363,25 @@ func (na NetworkAddress) JoinHostPort(offset uint) string {
return net.JoinHostPort(na.Host, strconv.Itoa(int(na.StartPort+offset)))
}
+func (na NetworkAddress) Expand() []NetworkAddress {
+ size := na.PortRangeSize()
+ addrs := make([]NetworkAddress, size)
+ for portOffset := uint(0); portOffset < size; portOffset++ {
+ na2 := na
+ na2.StartPort, na2.EndPort = na.StartPort+portOffset, na.StartPort+portOffset
+ addrs[portOffset] = na2
+ }
+ return addrs
+}
+
// PortRangeSize returns how many ports are in
// pa's port range. Port ranges are inclusive,
// so the size is the difference of start and
// end ports plus one.
func (na NetworkAddress) PortRangeSize() uint {
+ if na.EndPort < na.StartPort {
+ return 0
+ }
return (na.EndPort - na.StartPort) + 1
}
diff --git a/listeners_test.go b/listeners_test.go
index 6b0f440..c5aa527 100644
--- a/listeners_test.go
+++ b/listeners_test.go
@@ -331,3 +331,85 @@ func TestJoinHostPort(t *testing.T) {
}
}
}
+
+func TestExpand(t *testing.T) {
+ for i, tc := range []struct {
+ input NetworkAddress
+ expect []NetworkAddress
+ }{
+ {
+ input: NetworkAddress{
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2000,
+ EndPort: 2000,
+ },
+ expect: []NetworkAddress{
+ {
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2000,
+ EndPort: 2000,
+ },
+ },
+ },
+ {
+ input: NetworkAddress{
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2000,
+ EndPort: 2002,
+ },
+ expect: []NetworkAddress{
+ {
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2000,
+ EndPort: 2000,
+ },
+ {
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2001,
+ EndPort: 2001,
+ },
+ {
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2002,
+ EndPort: 2002,
+ },
+ },
+ },
+ {
+ input: NetworkAddress{
+ Network: "tcp",
+ Host: "localhost",
+ StartPort: 2000,
+ EndPort: 1999,
+ },
+ expect: []NetworkAddress{},
+ },
+ {
+ input: NetworkAddress{
+ Network: "unix",
+ Host: "/foo/bar",
+ StartPort: 0,
+ EndPort: 0,
+ },
+ expect: []NetworkAddress{
+ {
+ Network: "unix",
+ Host: "/foo/bar",
+ StartPort: 0,
+ EndPort: 0,
+ },
+ },
+ },
+ } {
+ actual := tc.input.Expand()
+ if !reflect.DeepEqual(actual, tc.expect) {
+ t.Errorf("Test %d: Expected %+v but got %+v", i, tc.expect, actual)
+ }
+ }
+}
diff --git a/modules/caddyhttp/app.go b/modules/caddyhttp/app.go
index 24069eb..c28e09d 100644
--- a/modules/caddyhttp/app.go
+++ b/modules/caddyhttp/app.go
@@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"strconv"
+ "sync"
"time"
"github.com/caddyserver/caddy/v2"
@@ -95,6 +96,8 @@ func init() {
// `{http.request.uri}` | The full request URI
// `{http.response.header.*}` | Specific response header field
// `{http.vars.*}` | Custom variables in the HTTP handler chain
+// `{http.shutting_down}` | True if the HTTP app is shutting down
+// `{http.time_until_shutdown}` | Time until HTTP server shutdown, if scheduled
type App struct {
// HTTPPort specifies the port to use for HTTP (as opposed to HTTPS),
// which is used when setting up HTTP->HTTPS redirects or ACME HTTP
@@ -107,18 +110,31 @@ type App struct {
HTTPSPort int `json:"https_port,omitempty"`
// GracePeriod is how long to wait for active connections when shutting
- // down the server. Once the grace period is over, connections will
- // be forcefully closed.
+ // down the servers. During the grace period, no new connections are
+ // accepted, idle connections are closed, and active connections will
+ // be given the full length of time to become idle and close.
+ // Once the grace period is over, connections will be forcefully closed.
+ // If zero, the grace period is eternal. Default: 0.
GracePeriod caddy.Duration `json:"grace_period,omitempty"`
+ // ShutdownDelay is how long to wait before initiating the grace
+ // period. When this app is stopping (e.g. during a config reload or
+ // process exit), all servers will be shut down. Normally this immediately
+ // initiates the grace period. However, if this delay is configured, servers
+ // will not be shut down until the delay is over. During this time, servers
+ // continue to function normally and allow new connections. At the end, the
+ // grace period will begin. This can be useful to allow downstream load
+ // balancers time to move this instance out of the rotation without hiccups.
+ //
+ // When shutdown has been scheduled, placeholders {http.shutting_down} (bool)
+ // and {http.time_until_shutdown} (duration) may be useful for health checks.
+ ShutdownDelay caddy.Duration `json:"shutdown_delay,omitempty"`
+
// Servers is the list of servers, keyed by arbitrary names chosen
// at your discretion for your own convenience; the keys do not
// affect functionality.
Servers map[string]*Server `json:"servers,omitempty"`
- servers []*http.Server
- h3servers []*http3.Server
-
ctx caddy.Context
logger *zap.Logger
tlsApp *caddytls.TLS
@@ -162,6 +178,7 @@ func (app *App) Provision(ctx caddy.Context) error {
srv.tlsApp = app.tlsApp
srv.logger = app.logger.Named("log")
srv.errorLogger = app.logger.Named("log.error")
+ srv.shutdownAtMu = new(sync.RWMutex)
// only enable access logs if configured
if srv.Logs != nil {
@@ -298,7 +315,7 @@ func (app *App) Start() error {
}
for srvName, srv := range app.Servers {
- s := &http.Server{
+ srv.server = &http.Server{
ReadTimeout: time.Duration(srv.ReadTimeout),
ReadHeaderTimeout: time.Duration(srv.ReadHeaderTimeout),
WriteTimeout: time.Duration(srv.WriteTimeout),
@@ -307,13 +324,14 @@ func (app *App) Start() error {
Handler: srv,
ErrorLog: serverLogger,
}
+ tlsCfg := srv.TLSConnPolicies.TLSConfig(app.ctx)
// enable h2c if configured
if srv.AllowH2C {
h2server := &http2.Server{
IdleTimeout: time.Duration(srv.IdleTimeout),
}
- s.Handler = h2c.NewHandler(srv, h2server)
+ srv.server.Handler = h2c.NewHandler(srv, h2server)
}
for _, lnAddr := range srv.Listen {
@@ -321,6 +339,8 @@ func (app *App) Start() error {
if err != nil {
return fmt.Errorf("%s: parsing listen address '%s': %v", srvName, lnAddr, err)
}
+ srv.addresses = append(srv.addresses, listenAddr)
+
for portOffset := uint(0); portOffset < listenAddr.PortRangeSize(); portOffset++ {
// create the listener for this socket
hostport := listenAddr.JoinHostPort(portOffset)
@@ -343,31 +363,27 @@ func (app *App) Start() error {
useTLS := len(srv.TLSConnPolicies) > 0 && int(listenAddr.StartPort+portOffset) != app.httpPort()
if useTLS {
// create TLS listener
- tlsCfg := srv.TLSConnPolicies.TLSConfig(app.ctx)
ln = tls.NewListener(ln, tlsCfg)
- /////////
// TODO: HTTP/3 support is experimental for now
if srv.ExperimentalHTTP3 {
- app.logger.Info("enabling experimental HTTP/3 listener",
- zap.String("addr", hostport),
- )
+ if srv.h3server == nil {
+ srv.h3server = &http3.Server{
+ Handler: srv,
+ TLSConfig: tlsCfg,
+ MaxHeaderBytes: srv.MaxHeaderBytes,
+ }
+ }
+
+ app.logger.Info("enabling experimental HTTP/3 listener", zap.String("addr", hostport))
h3ln, err := caddy.ListenQUIC(hostport, tlsCfg)
if err != nil {
return fmt.Errorf("getting HTTP/3 QUIC listener: %v", err)
}
- h3srv := &http3.Server{
- Addr: hostport,
- Handler: srv,
- TLSConfig: tlsCfg,
- MaxHeaderBytes: srv.MaxHeaderBytes,
- }
+
//nolint:errcheck
- go h3srv.ServeListener(h3ln)
- app.h3servers = append(app.h3servers, h3srv)
- srv.h3server = h3srv
+ go srv.h3server.ServeListener(h3ln)
}
- /////////
}
// finish wrapping listener where we left off before TLS
@@ -390,11 +406,10 @@ func (app *App) Start() error {
zap.Bool("tls", useTLS),
)
- //nolint:errcheck
- go s.Serve(ln)
-
srv.listeners = append(srv.listeners, ln)
- app.servers = append(app.servers, s)
+
+ //nolint:errcheck
+ go srv.server.Serve(ln)
}
}
}
@@ -412,28 +427,65 @@ func (app *App) Start() error {
// Stop gracefully shuts down the HTTP server.
func (app *App) Stop() error {
ctx := context.Background()
+
+ // see if any listeners in our config will be closing or if they are continuing
+ // hrough a reload; because if any are closing, we will enforce shutdown delay
+ var delay bool
+ scheduledTime := time.Now().Add(time.Duration(app.ShutdownDelay))
+ if app.ShutdownDelay > 0 {
+ for _, server := range app.Servers {
+ for _, na := range server.addresses {
+ for _, addr := range na.Expand() {
+ if caddy.ListenerUsage(addr.Network, addr.JoinHostPort(0)) < 2 {
+ app.logger.Debug("listener closing and shutdown delay is configured", zap.String("address", addr.String()))
+ server.shutdownAtMu.Lock()
+ server.shutdownAt = scheduledTime
+ server.shutdownAtMu.Unlock()
+ delay = true
+ } else {
+ app.logger.Debug("shutdown delay configured but listener will remain open", zap.String("address", addr.String()))
+ }
+ }
+ }
+ }
+ }
+
+ // honor scheduled/delayed shutdown time
+ if delay {
+ app.logger.Debug("shutdown scheduled",
+ zap.Duration("delay_duration", time.Duration(app.ShutdownDelay)),
+ zap.Time("time", scheduledTime))
+ time.Sleep(time.Duration(app.ShutdownDelay))
+ }
+
+ // enforce grace period if configured
if app.GracePeriod > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(app.GracePeriod))
defer cancel()
+ app.logger.Debug("servers shutting down; grace period initiated", zap.Duration("duration", time.Duration(app.GracePeriod)))
+ } else {
+ app.logger.Debug("servers shutting down with eternal grace period")
}
- for i, s := range app.servers {
- if err := s.Shutdown(ctx); err != nil {
+
+ // shut down servers
+ for _, server := range app.Servers {
+ if err := server.server.Shutdown(ctx); err != nil {
app.logger.Error("server shutdown",
zap.Error(err),
- zap.Int("index", i))
+ zap.Strings("addresses", server.Listen))
}
- }
- for i, s := range app.h3servers {
- // TODO: CloseGracefully, once implemented upstream
- // (see https://github.com/lucas-clemente/quic-go/issues/2103)
- if err := s.Close(); err != nil {
- app.logger.Error("HTTP/3 server shutdown",
- zap.Error(err),
- zap.Int("index", i))
+ if server.h3server != nil {
+ // TODO: CloseGracefully, once implemented upstream (see https://github.com/lucas-clemente/quic-go/issues/2103)
+ if err := server.h3server.Close(); err != nil {
+ app.logger.Error("HTTP/3 server shutdown",
+ zap.Error(err),
+ zap.Strings("addresses", server.Listen))
+ }
}
}
+
return nil
}
diff --git a/modules/caddyhttp/replacer.go b/modules/caddyhttp/replacer.go
index bde58b7..76c1c87 100644
--- a/modules/caddyhttp/replacer.go
+++ b/modules/caddyhttp/replacer.go
@@ -252,6 +252,22 @@ func addHTTPVarsToReplacer(repl *caddy.Replacer, req *http.Request, w http.Respo
}
}
+ switch {
+ case key == "http.shutting_down":
+ server := req.Context().Value(ServerCtxKey).(*Server)
+ server.shutdownAtMu.RLock()
+ defer server.shutdownAtMu.RUnlock()
+ return !server.shutdownAt.IsZero(), true
+ case key == "http.time_until_shutdown":
+ server := req.Context().Value(ServerCtxKey).(*Server)
+ server.shutdownAtMu.RLock()
+ defer server.shutdownAtMu.RUnlock()
+ if server.shutdownAt.IsZero() {
+ return nil, true
+ }
+ return time.Until(server.shutdownAt), true
+ }
+
return nil, false
}
diff --git a/modules/caddyhttp/server.go b/modules/caddyhttp/server.go
index c665e29..62c0fd9 100644
--- a/modules/caddyhttp/server.go
+++ b/modules/caddyhttp/server.go
@@ -24,6 +24,7 @@ import (
"net/url"
"runtime"
"strings"
+ "sync"
"time"
"github.com/caddyserver/caddy/v2"
@@ -139,7 +140,12 @@ type Server struct {
accessLogger *zap.Logger
errorLogger *zap.Logger
- h3server *http3.Server
+ server *http.Server
+ h3server *http3.Server
+ addresses []caddy.NetworkAddress
+
+ shutdownAt time.Time
+ shutdownAtMu *sync.RWMutex
}
// ServeHTTP is the entry point for all HTTP requests.
diff --git a/usagepool.go b/usagepool.go
index 9234021..c344415 100644
--- a/usagepool.go
+++ b/usagepool.go
@@ -194,6 +194,21 @@ func (up *UsagePool) Delete(key any) (deleted bool, err error) {
return
}
+// References returns the number of references (count of usages) to a
+// key in the pool, and true if the key exists, or false otherwise.
+func (up *UsagePool) References(key interface{}) (int, bool) {
+ up.RLock()
+ upv, loaded := up.pool[key]
+ up.RUnlock()
+ if loaded {
+ // I wonder if it'd be safer to read this value during
+ // our lock on the UsagePool... guess we'll see...
+ refs := atomic.LoadInt32(&upv.refs)
+ return int(refs), true
+ }
+ return 0, false
+}
+
// Constructor is a function that returns a new value
// that can destruct itself when it is no longer needed.
type Constructor func() (Destructor, error)