From d4d8bbcfc64d1194079cae35697709f6d267d02f Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Wed, 31 Aug 2022 17:01:30 -0400 Subject: events: Implement event system (#4912) Co-authored-by: Matt Holt --- modules/caddyevents/app.go | 367 ++++++++++++++++++++++++++ modules/caddyevents/eventsconfig/caddyfile.go | 88 ++++++ 2 files changed, 455 insertions(+) create mode 100644 modules/caddyevents/app.go create mode 100644 modules/caddyevents/eventsconfig/caddyfile.go (limited to 'modules/caddyevents') diff --git a/modules/caddyevents/app.go b/modules/caddyevents/app.go new file mode 100644 index 0000000..0c05fe5 --- /dev/null +++ b/modules/caddyevents/app.go @@ -0,0 +1,367 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package caddyevents + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/caddyserver/caddy/v2" + "github.com/google/uuid" + "go.uber.org/zap" +) + +func init() { + caddy.RegisterModule(App{}) +} + +// App implements a global eventing system within Caddy. +// Modules can emit and subscribe to events, providing +// hooks into deep parts of the code base that aren't +// otherwise accessible. Events provide information about +// what and when things are happening, and this facility +// allows handlers to take action when events occur, +// add information to the event's metadata, and even +// control program flow in some cases. +// +// Events are propagated in a DOM-like fashion. An event +// emitted from module `a.b.c` (the "origin") will first +// invoke handlers listening to `a.b.c`, then `a.b`, +// then `a`, then those listening regardless of origin. +// If a handler returns the special error Aborted, then +// propagation immediately stops and the event is marked +// as aborted. Emitters may optionally choose to adjust +// program flow based on an abort. +// +// Modules can subscribe to events by origin and/or name. +// A handler is invoked only if it is subscribed to the +// event by name and origin. Subscriptions should be +// registered during the provisioning phase, before apps +// are started. +// +// Event handlers are fired synchronously as part of the +// regular flow of the program. This allows event handlers +// to control the flow of the program if the origin permits +// it and also allows handlers to convey new information +// back into the origin module before it continues. +// In essence, event handlers are similar to HTTP +// middleware handlers. +// +// Event bindings/subscribers are unordered; i.e. +// event handlers are invoked in an arbitrary order. +// Event handlers should not rely on the logic of other +// handlers to succeed. +// +// The entirety of this app module is EXPERIMENTAL and +// subject to change. Pay attention to release notes. +type App struct { + // Subscriptions bind handlers to one or more events + // either globally or scoped to specific modules or module + // namespaces. + Subscriptions []*Subscription `json:"subscriptions,omitempty"` + + // Map of event name to map of module ID/namespace to handlers + subscriptions map[string]map[caddy.ModuleID][]Handler + + logger *zap.Logger + started bool +} + +// Subscription represents binding of one or more handlers to +// one or more events. +type Subscription struct { + // The name(s) of the event(s) to bind to. Default: all events. + Events []string `json:"events,omitempty"` + + // The ID or namespace of the module(s) from which events + // originate to listen to for events. Default: all modules. + // + // Events propagate up, so events emitted by module "a.b.c" + // will also trigger the event for "a.b" and "a". Thus, to + // receive all events from "a.b.c" and "a.b.d", for example, + // one can subscribe to either "a.b" or all of "a" entirely. + Modules []caddy.ModuleID `json:"modules,omitempty"` + + // The event handler modules. These implement the actual + // behavior to invoke when an event occurs. At least one + // handler is required. + HandlersRaw []json.RawMessage `json:"handlers,omitempty" caddy:"namespace=events.handlers inline_key=handler"` + + // The decoded handlers; Go code that is subscribing to + // an event should set this field directly; HandlersRaw + // is meant for JSON configuration to fill out this field. + Handlers []Handler `json:"-"` +} + +// CaddyModule returns the Caddy module information. +func (App) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "events", + New: func() caddy.Module { return new(App) }, + } +} + +// Provision sets up the app. +func (app *App) Provision(ctx caddy.Context) error { + app.logger = ctx.Logger(app) + app.subscriptions = make(map[string]map[caddy.ModuleID][]Handler) + + for _, sub := range app.Subscriptions { + if sub.HandlersRaw != nil { + handlersIface, err := ctx.LoadModule(sub, "HandlersRaw") + if err != nil { + return fmt.Errorf("loading event subscriber modules: %v", err) + } + for _, h := range handlersIface.([]any) { + sub.Handlers = append(sub.Handlers, h.(Handler)) + } + if len(sub.Handlers) == 0 { + // pointless to bind without any handlers + return fmt.Errorf("no handlers defined") + } + } + } + + return nil +} + +// Start runs the app. +func (app *App) Start() error { + for _, sub := range app.Subscriptions { + if err := app.Subscribe(sub); err != nil { + return err + } + } + + app.started = true + + return nil +} + +// Stop gracefully shuts down the app. +func (app *App) Stop() error { + return nil +} + +// Subscribe binds one or more event handlers to one or more events +// according to the subscription s. For now, subscriptions can only +// be created during the provision phase; new bindings cannot be +// created after the events app has started. +func (app *App) Subscribe(s *Subscription) error { + if app.started { + return fmt.Errorf("events already started; new subscriptions closed") + } + + // handle special case of catch-alls (omission of event name or module space implies all) + if len(s.Events) == 0 { + s.Events = []string{""} + } + if len(s.Modules) == 0 { + s.Modules = []caddy.ModuleID{""} + } + + for _, eventName := range s.Events { + if app.subscriptions[eventName] == nil { + app.subscriptions[eventName] = make(map[caddy.ModuleID][]Handler) + } + for _, originModule := range s.Modules { + app.subscriptions[eventName][originModule] = append(app.subscriptions[eventName][originModule], s.Handlers...) + } + } + + return nil +} + +// On is syntactic sugar for Subscribe() that binds a single handler +// to a single event from any module. If the eventName is empty string, +// it counts for all events. +func (app *App) On(eventName string, handler Handler) error { + return app.Subscribe(&Subscription{ + Events: []string{eventName}, + Handlers: []Handler{handler}, + }) +} + +// Emit creates and dispatches an event named eventName to all relevant handlers with +// the metadata data. Events are emitted and propagated synchronously. The returned Event +// value will have any additional information from the invoked handlers. +func (app *App) Emit(ctx caddy.Context, eventName string, data map[string]any) Event { + id, err := uuid.NewRandom() + if err != nil { + app.logger.Error("failed generating new event ID", + zap.Error(err), + zap.String("event", eventName)) + } + + eventName = strings.ToLower(eventName) + + e := Event{ + id: id, + ts: time.Now(), + name: eventName, + origin: ctx.Module(), + data: data, + } + + // add event info to replacer, make sure it's in the context + repl, ok := ctx.Context.Value(caddy.ReplacerCtxKey).(*caddy.Replacer) + if !ok { + repl = caddy.NewReplacer() + ctx.Context = context.WithValue(ctx.Context, caddy.ReplacerCtxKey, repl) + } + repl.Map(func(key string) (any, bool) { + switch key { + case "event": + return e, true + case "event.id": + return e.id, true + case "event.name": + return e.name, true + case "event.time": + return e.ts, true + case "event.time_unix": + return e.ts.UnixMilli(), true + case "event.module": + return e.origin.CaddyModule().ID, true + case "event.data": + return e.data, true + } + + if strings.HasPrefix(key, "event.data.") { + key = strings.TrimPrefix(key, "event.data.") + if val, ok := data[key]; ok { + return val, true + } + } + + return nil, false + }) + + app.logger.Debug("event", + zap.String("name", e.name), + zap.String("id", e.id.String()), + zap.String("origin", e.origin.CaddyModule().String()), + zap.Any("data", e.data), + ) + + // invoke handlers bound to the event by name and also all events; this for loop + // iterates twice at most: once for the event name, once for "" (all events) + for { + moduleID := e.origin.CaddyModule().ID + + // implement propagation up the module tree (i.e. start with "a.b.c" then "a.b" then "a" then "") + for { + if app.subscriptions[eventName] == nil { + break // shortcut if event not bound at all + } + + for _, handler := range app.subscriptions[eventName][moduleID] { + if err := handler.Handle(ctx, e); err != nil { + aborted := errors.Is(err, ErrAborted) + + app.logger.Error("handler error", + zap.Error(err), + zap.Bool("aborted", aborted)) + + if aborted { + e.Aborted = err + return e + } + } + } + + if moduleID == "" { + break + } + lastDot := strings.LastIndex(string(moduleID), ".") + if lastDot < 0 { + moduleID = "" // include handlers bound to events regardless of module + } else { + moduleID = moduleID[:lastDot] + } + } + + // include handlers listening to all events + if eventName == "" { + break + } + eventName = "" + } + + return e +} + +// Event represents something that has happened or is happening. +type Event struct { + id uuid.UUID + ts time.Time + name string + origin caddy.Module + data map[string]any + + // If non-nil, the event has been aborted, meaning + // propagation has stopped to other handlers and + // the code should stop what it was doing. Emitters + // may choose to use this as a signal to adjust their + // code path appropriately. + Aborted error +} + +// CloudEvent exports event e as a structure that, when +// serialized as JSON, is compatible with the +// CloudEvents spec. +func (e Event) CloudEvent() CloudEvent { + dataJSON, _ := json.Marshal(e.data) + return CloudEvent{ + ID: e.id.String(), + Source: e.origin.CaddyModule().String(), + SpecVersion: "1.0", + Type: e.name, + Time: e.ts, + DataContentType: "application/json", + Data: dataJSON, + } +} + +// CloudEvent is a JSON-serializable structure that +// is compatible with the CloudEvents specification. +// See https://cloudevents.io. +type CloudEvent struct { + ID string `json:"id"` + Source string `json:"source"` + SpecVersion string `json:"specversion"` + Type string `json:"type"` + Time time.Time `json:"time"` + DataContentType string `json:"datacontenttype,omitempty"` + Data json.RawMessage `json:"data,omitempty"` +} + +// ErrAborted cancels an event. +var ErrAborted = errors.New("event aborted") + +// Handler is a type that can handle events. +type Handler interface { + Handle(context.Context, Event) error +} + +// Interface guards +var ( + _ caddy.App = (*App)(nil) + _ caddy.Provisioner = (*App)(nil) +) diff --git a/modules/caddyevents/eventsconfig/caddyfile.go b/modules/caddyevents/eventsconfig/caddyfile.go new file mode 100644 index 0000000..9c3fae7 --- /dev/null +++ b/modules/caddyevents/eventsconfig/caddyfile.go @@ -0,0 +1,88 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventsconfig is for configuring caddyevents.App with the +// Caddyfile. This code can't be in the caddyevents package because +// the httpcaddyfile package imports caddyhttp, which imports +// caddyevents: hence, it creates an import cycle. +package eventsconfig + +import ( + "encoding/json" + + "github.com/caddyserver/caddy/v2/caddyconfig" + "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" + "github.com/caddyserver/caddy/v2/caddyconfig/httpcaddyfile" + "github.com/caddyserver/caddy/v2/modules/caddyevents" +) + +func init() { + httpcaddyfile.RegisterGlobalOption("events", parseApp) +} + +// parseApp configures the "events" global option from Caddyfile to set up the events app. +// Syntax: +// +// events { +// on +// } +// +// If is *, then it will bind to all events. +func parseApp(d *caddyfile.Dispenser, _ any) (any, error) { + app := new(caddyevents.App) + + // consume the option name + if !d.Next() { + return nil, d.ArgErr() + } + + // handle the block + for d.NextBlock(0) { + switch d.Val() { + case "on": + if !d.NextArg() { + return nil, d.ArgErr() + } + eventName := d.Val() + if eventName == "*" { + eventName = "" + } + + if !d.NextArg() { + return nil, d.ArgErr() + } + handlerName := d.Val() + modID := "events.handlers." + handlerName + unm, err := caddyfile.UnmarshalModule(d, modID) + if err != nil { + return nil, err + } + + app.Subscriptions = append(app.Subscriptions, &caddyevents.Subscription{ + Events: []string{eventName}, + HandlersRaw: []json.RawMessage{ + caddyconfig.JSONModuleObject(unm, "handler", handlerName, nil), + }, + }) + + default: + return nil, d.ArgErr() + } + } + + return httpcaddyfile.App{ + Name: "events", + Value: caddyconfig.JSON(app, nil), + }, nil +} -- cgit v1.2.3