From d4d8bbcfc64d1194079cae35697709f6d267d02f Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Wed, 31 Aug 2022 17:01:30 -0400 Subject: events: Implement event system (#4912) Co-authored-by: Matt Holt --- modules/caddyhttp/app.go | 8 ++++++++ modules/caddyhttp/reverseproxy/healthchecks.go | 18 +++++++++++++----- modules/caddyhttp/reverseproxy/reverseproxy.go | 7 +++++++ modules/caddyhttp/server.go | 3 +++ 4 files changed, 31 insertions(+), 5 deletions(-) (limited to 'modules/caddyhttp') diff --git a/modules/caddyhttp/app.go b/modules/caddyhttp/app.go index a3d0f7e..3db87b1 100644 --- a/modules/caddyhttp/app.go +++ b/modules/caddyhttp/app.go @@ -24,6 +24,7 @@ import ( "time" "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/caddy/v2/modules/caddyevents" "github.com/caddyserver/caddy/v2/modules/caddytls" "go.uber.org/zap" "golang.org/x/net/http2" @@ -161,6 +162,11 @@ func (app *App) Provision(ctx caddy.Context) error { app.ctx = ctx app.logger = ctx.Logger(app) + eventsAppIface, err := ctx.App("events") + if err != nil { + return fmt.Errorf("getting events app: %v", err) + } + repl := caddy.NewReplacer() // this provisions the matchers for each route, @@ -175,6 +181,8 @@ func (app *App) Provision(ctx caddy.Context) error { for srvName, srv := range app.Servers { srv.name = srvName srv.tlsApp = app.tlsApp + srv.events = eventsAppIface.(*caddyevents.App) + srv.ctx = ctx srv.logger = app.logger.Named("log") srv.errorLogger = app.logger.Named("log.error") srv.shutdownAtMu = new(sync.RWMutex) diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index eb98638..cf22d26 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -284,6 +284,13 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre } } + markUnhealthy := func() { + // dispatch an event that the host newly became unhealthy + if upstream.setHealthy(false) { + h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr}) + } + } + // do the request, being careful to tame the response body resp, err := h.HealthChecks.Active.httpClient.Do(req) if err != nil { @@ -291,7 +298,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre zap.String("host", hostAddr), zap.Error(err), ) - upstream.setHealthy(false) + markUnhealthy() return nil } var body io.Reader = resp.Body @@ -311,7 +318,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre zap.Int("status_code", resp.StatusCode), zap.String("host", hostAddr), ) - upstream.setHealthy(false) + markUnhealthy() return nil } } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { @@ -319,7 +326,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre zap.Int("status_code", resp.StatusCode), zap.String("host", hostAddr), ) - upstream.setHealthy(false) + markUnhealthy() return nil } @@ -331,14 +338,14 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre zap.String("host", hostAddr), zap.Error(err), ) - upstream.setHealthy(false) + markUnhealthy() return nil } if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) { h.HealthChecks.Active.logger.Info("response body failed expectations", zap.String("host", hostAddr), ) - upstream.setHealthy(false) + markUnhealthy() return nil } } @@ -346,6 +353,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre // passed health check parameters, so mark as healthy if upstream.setHealthy(true) { h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr)) + h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr}) } return nil diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index b806dda..895682f 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -36,6 +36,7 @@ import ( "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" + "github.com/caddyserver/caddy/v2/modules/caddyevents" "github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/caddyserver/caddy/v2/modules/caddyhttp/headers" "github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite" @@ -193,6 +194,7 @@ type Handler struct { ctx caddy.Context logger *zap.Logger + events *caddyevents.App } // CaddyModule returns the Caddy module information. @@ -205,6 +207,11 @@ func (Handler) CaddyModule() caddy.ModuleInfo { // Provision ensures that h is set up properly before use. func (h *Handler) Provision(ctx caddy.Context) error { + eventAppIface, err := ctx.App("events") + if err != nil { + return fmt.Errorf("getting events app: %v", err) + } + h.events = eventAppIface.(*caddyevents.App) h.ctx = ctx h.logger = ctx.Logger(h) diff --git a/modules/caddyhttp/server.go b/modules/caddyhttp/server.go index be59184..eec4d1b 100644 --- a/modules/caddyhttp/server.go +++ b/modules/caddyhttp/server.go @@ -30,6 +30,7 @@ import ( "time" "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/caddy/v2/modules/caddyevents" "github.com/caddyserver/caddy/v2/modules/caddytls" "github.com/caddyserver/certmagic" "github.com/lucas-clemente/quic-go/http3" @@ -154,9 +155,11 @@ type Server struct { listeners []net.Listener tlsApp *caddytls.TLS + events *caddyevents.App logger *zap.Logger accessLogger *zap.Logger errorLogger *zap.Logger + ctx caddy.Context server *http.Server h3server *http3.Server -- cgit v1.2.3