summaryrefslogtreecommitdiff
path: root/modules/caddyevents
diff options
context:
space:
mode:
authorFrancis Lavoie <lavofr@gmail.com>2022-08-31 17:01:30 -0400
committerGitHub <noreply@github.com>2022-08-31 15:01:30 -0600
commitd4d8bbcfc64d1194079cae35697709f6d267d02f (patch)
tree8617401e3560f1eb5986878747180862822d4d4c /modules/caddyevents
parent68d8ac9802c5e7bf5bf55d1a3c1db634edc93999 (diff)
events: Implement event system (#4912)
Co-authored-by: Matt Holt <mholt@users.noreply.github.com>
Diffstat (limited to 'modules/caddyevents')
-rw-r--r--modules/caddyevents/app.go367
-rw-r--r--modules/caddyevents/eventsconfig/caddyfile.go88
2 files changed, 455 insertions, 0 deletions
diff --git a/modules/caddyevents/app.go b/modules/caddyevents/app.go
new file mode 100644
index 0000000..0c05fe5
--- /dev/null
+++ b/modules/caddyevents/app.go
@@ -0,0 +1,367 @@
+// Copyright 2015 Matthew Holt and The Caddy Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package caddyevents
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/caddyserver/caddy/v2"
+ "github.com/google/uuid"
+ "go.uber.org/zap"
+)
+
+func init() {
+ caddy.RegisterModule(App{})
+}
+
+// App implements a global eventing system within Caddy.
+// Modules can emit and subscribe to events, providing
+// hooks into deep parts of the code base that aren't
+// otherwise accessible. Events provide information about
+// what and when things are happening, and this facility
+// allows handlers to take action when events occur,
+// add information to the event's metadata, and even
+// control program flow in some cases.
+//
+// Events are propagated in a DOM-like fashion. An event
+// emitted from module `a.b.c` (the "origin") will first
+// invoke handlers listening to `a.b.c`, then `a.b`,
+// then `a`, then those listening regardless of origin.
+// If a handler returns the special error Aborted, then
+// propagation immediately stops and the event is marked
+// as aborted. Emitters may optionally choose to adjust
+// program flow based on an abort.
+//
+// Modules can subscribe to events by origin and/or name.
+// A handler is invoked only if it is subscribed to the
+// event by name and origin. Subscriptions should be
+// registered during the provisioning phase, before apps
+// are started.
+//
+// Event handlers are fired synchronously as part of the
+// regular flow of the program. This allows event handlers
+// to control the flow of the program if the origin permits
+// it and also allows handlers to convey new information
+// back into the origin module before it continues.
+// In essence, event handlers are similar to HTTP
+// middleware handlers.
+//
+// Event bindings/subscribers are unordered; i.e.
+// event handlers are invoked in an arbitrary order.
+// Event handlers should not rely on the logic of other
+// handlers to succeed.
+//
+// The entirety of this app module is EXPERIMENTAL and
+// subject to change. Pay attention to release notes.
+type App struct {
+ // Subscriptions bind handlers to one or more events
+ // either globally or scoped to specific modules or module
+ // namespaces.
+ Subscriptions []*Subscription `json:"subscriptions,omitempty"`
+
+ // Map of event name to map of module ID/namespace to handlers
+ subscriptions map[string]map[caddy.ModuleID][]Handler
+
+ logger *zap.Logger
+ started bool
+}
+
+// Subscription represents binding of one or more handlers to
+// one or more events.
+type Subscription struct {
+ // The name(s) of the event(s) to bind to. Default: all events.
+ Events []string `json:"events,omitempty"`
+
+ // The ID or namespace of the module(s) from which events
+ // originate to listen to for events. Default: all modules.
+ //
+ // Events propagate up, so events emitted by module "a.b.c"
+ // will also trigger the event for "a.b" and "a". Thus, to
+ // receive all events from "a.b.c" and "a.b.d", for example,
+ // one can subscribe to either "a.b" or all of "a" entirely.
+ Modules []caddy.ModuleID `json:"modules,omitempty"`
+
+ // The event handler modules. These implement the actual
+ // behavior to invoke when an event occurs. At least one
+ // handler is required.
+ HandlersRaw []json.RawMessage `json:"handlers,omitempty" caddy:"namespace=events.handlers inline_key=handler"`
+
+ // The decoded handlers; Go code that is subscribing to
+ // an event should set this field directly; HandlersRaw
+ // is meant for JSON configuration to fill out this field.
+ Handlers []Handler `json:"-"`
+}
+
+// CaddyModule returns the Caddy module information.
+func (App) CaddyModule() caddy.ModuleInfo {
+ return caddy.ModuleInfo{
+ ID: "events",
+ New: func() caddy.Module { return new(App) },
+ }
+}
+
+// Provision sets up the app.
+func (app *App) Provision(ctx caddy.Context) error {
+ app.logger = ctx.Logger(app)
+ app.subscriptions = make(map[string]map[caddy.ModuleID][]Handler)
+
+ for _, sub := range app.Subscriptions {
+ if sub.HandlersRaw != nil {
+ handlersIface, err := ctx.LoadModule(sub, "HandlersRaw")
+ if err != nil {
+ return fmt.Errorf("loading event subscriber modules: %v", err)
+ }
+ for _, h := range handlersIface.([]any) {
+ sub.Handlers = append(sub.Handlers, h.(Handler))
+ }
+ if len(sub.Handlers) == 0 {
+ // pointless to bind without any handlers
+ return fmt.Errorf("no handlers defined")
+ }
+ }
+ }
+
+ return nil
+}
+
+// Start runs the app.
+func (app *App) Start() error {
+ for _, sub := range app.Subscriptions {
+ if err := app.Subscribe(sub); err != nil {
+ return err
+ }
+ }
+
+ app.started = true
+
+ return nil
+}
+
+// Stop gracefully shuts down the app.
+func (app *App) Stop() error {
+ return nil
+}
+
+// Subscribe binds one or more event handlers to one or more events
+// according to the subscription s. For now, subscriptions can only
+// be created during the provision phase; new bindings cannot be
+// created after the events app has started.
+func (app *App) Subscribe(s *Subscription) error {
+ if app.started {
+ return fmt.Errorf("events already started; new subscriptions closed")
+ }
+
+ // handle special case of catch-alls (omission of event name or module space implies all)
+ if len(s.Events) == 0 {
+ s.Events = []string{""}
+ }
+ if len(s.Modules) == 0 {
+ s.Modules = []caddy.ModuleID{""}
+ }
+
+ for _, eventName := range s.Events {
+ if app.subscriptions[eventName] == nil {
+ app.subscriptions[eventName] = make(map[caddy.ModuleID][]Handler)
+ }
+ for _, originModule := range s.Modules {
+ app.subscriptions[eventName][originModule] = append(app.subscriptions[eventName][originModule], s.Handlers...)
+ }
+ }
+
+ return nil
+}
+
+// On is syntactic sugar for Subscribe() that binds a single handler
+// to a single event from any module. If the eventName is empty string,
+// it counts for all events.
+func (app *App) On(eventName string, handler Handler) error {
+ return app.Subscribe(&Subscription{
+ Events: []string{eventName},
+ Handlers: []Handler{handler},
+ })
+}
+
+// Emit creates and dispatches an event named eventName to all relevant handlers with
+// the metadata data. Events are emitted and propagated synchronously. The returned Event
+// value will have any additional information from the invoked handlers.
+func (app *App) Emit(ctx caddy.Context, eventName string, data map[string]any) Event {
+ id, err := uuid.NewRandom()
+ if err != nil {
+ app.logger.Error("failed generating new event ID",
+ zap.Error(err),
+ zap.String("event", eventName))
+ }
+
+ eventName = strings.ToLower(eventName)
+
+ e := Event{
+ id: id,
+ ts: time.Now(),
+ name: eventName,
+ origin: ctx.Module(),
+ data: data,
+ }
+
+ // add event info to replacer, make sure it's in the context
+ repl, ok := ctx.Context.Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
+ if !ok {
+ repl = caddy.NewReplacer()
+ ctx.Context = context.WithValue(ctx.Context, caddy.ReplacerCtxKey, repl)
+ }
+ repl.Map(func(key string) (any, bool) {
+ switch key {
+ case "event":
+ return e, true
+ case "event.id":
+ return e.id, true
+ case "event.name":
+ return e.name, true
+ case "event.time":
+ return e.ts, true
+ case "event.time_unix":
+ return e.ts.UnixMilli(), true
+ case "event.module":
+ return e.origin.CaddyModule().ID, true
+ case "event.data":
+ return e.data, true
+ }
+
+ if strings.HasPrefix(key, "event.data.") {
+ key = strings.TrimPrefix(key, "event.data.")
+ if val, ok := data[key]; ok {
+ return val, true
+ }
+ }
+
+ return nil, false
+ })
+
+ app.logger.Debug("event",
+ zap.String("name", e.name),
+ zap.String("id", e.id.String()),
+ zap.String("origin", e.origin.CaddyModule().String()),
+ zap.Any("data", e.data),
+ )
+
+ // invoke handlers bound to the event by name and also all events; this for loop
+ // iterates twice at most: once for the event name, once for "" (all events)
+ for {
+ moduleID := e.origin.CaddyModule().ID
+
+ // implement propagation up the module tree (i.e. start with "a.b.c" then "a.b" then "a" then "")
+ for {
+ if app.subscriptions[eventName] == nil {
+ break // shortcut if event not bound at all
+ }
+
+ for _, handler := range app.subscriptions[eventName][moduleID] {
+ if err := handler.Handle(ctx, e); err != nil {
+ aborted := errors.Is(err, ErrAborted)
+
+ app.logger.Error("handler error",
+ zap.Error(err),
+ zap.Bool("aborted", aborted))
+
+ if aborted {
+ e.Aborted = err
+ return e
+ }
+ }
+ }
+
+ if moduleID == "" {
+ break
+ }
+ lastDot := strings.LastIndex(string(moduleID), ".")
+ if lastDot < 0 {
+ moduleID = "" // include handlers bound to events regardless of module
+ } else {
+ moduleID = moduleID[:lastDot]
+ }
+ }
+
+ // include handlers listening to all events
+ if eventName == "" {
+ break
+ }
+ eventName = ""
+ }
+
+ return e
+}
+
+// Event represents something that has happened or is happening.
+type Event struct {
+ id uuid.UUID
+ ts time.Time
+ name string
+ origin caddy.Module
+ data map[string]any
+
+ // If non-nil, the event has been aborted, meaning
+ // propagation has stopped to other handlers and
+ // the code should stop what it was doing. Emitters
+ // may choose to use this as a signal to adjust their
+ // code path appropriately.
+ Aborted error
+}
+
+// CloudEvent exports event e as a structure that, when
+// serialized as JSON, is compatible with the
+// CloudEvents spec.
+func (e Event) CloudEvent() CloudEvent {
+ dataJSON, _ := json.Marshal(e.data)
+ return CloudEvent{
+ ID: e.id.String(),
+ Source: e.origin.CaddyModule().String(),
+ SpecVersion: "1.0",
+ Type: e.name,
+ Time: e.ts,
+ DataContentType: "application/json",
+ Data: dataJSON,
+ }
+}
+
+// CloudEvent is a JSON-serializable structure that
+// is compatible with the CloudEvents specification.
+// See https://cloudevents.io.
+type CloudEvent struct {
+ ID string `json:"id"`
+ Source string `json:"source"`
+ SpecVersion string `json:"specversion"`
+ Type string `json:"type"`
+ Time time.Time `json:"time"`
+ DataContentType string `json:"datacontenttype,omitempty"`
+ Data json.RawMessage `json:"data,omitempty"`
+}
+
+// ErrAborted cancels an event.
+var ErrAborted = errors.New("event aborted")
+
+// Handler is a type that can handle events.
+type Handler interface {
+ Handle(context.Context, Event) error
+}
+
+// Interface guards
+var (
+ _ caddy.App = (*App)(nil)
+ _ caddy.Provisioner = (*App)(nil)
+)
diff --git a/modules/caddyevents/eventsconfig/caddyfile.go b/modules/caddyevents/eventsconfig/caddyfile.go
new file mode 100644
index 0000000..9c3fae7
--- /dev/null
+++ b/modules/caddyevents/eventsconfig/caddyfile.go
@@ -0,0 +1,88 @@
+// Copyright 2015 Matthew Holt and The Caddy Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package eventsconfig is for configuring caddyevents.App with the
+// Caddyfile. This code can't be in the caddyevents package because
+// the httpcaddyfile package imports caddyhttp, which imports
+// caddyevents: hence, it creates an import cycle.
+package eventsconfig
+
+import (
+ "encoding/json"
+
+ "github.com/caddyserver/caddy/v2/caddyconfig"
+ "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
+ "github.com/caddyserver/caddy/v2/caddyconfig/httpcaddyfile"
+ "github.com/caddyserver/caddy/v2/modules/caddyevents"
+)
+
+func init() {
+ httpcaddyfile.RegisterGlobalOption("events", parseApp)
+}
+
+// parseApp configures the "events" global option from Caddyfile to set up the events app.
+// Syntax:
+//
+// events {
+// on <event> <handler_module...>
+// }
+//
+// If <event> is *, then it will bind to all events.
+func parseApp(d *caddyfile.Dispenser, _ any) (any, error) {
+ app := new(caddyevents.App)
+
+ // consume the option name
+ if !d.Next() {
+ return nil, d.ArgErr()
+ }
+
+ // handle the block
+ for d.NextBlock(0) {
+ switch d.Val() {
+ case "on":
+ if !d.NextArg() {
+ return nil, d.ArgErr()
+ }
+ eventName := d.Val()
+ if eventName == "*" {
+ eventName = ""
+ }
+
+ if !d.NextArg() {
+ return nil, d.ArgErr()
+ }
+ handlerName := d.Val()
+ modID := "events.handlers." + handlerName
+ unm, err := caddyfile.UnmarshalModule(d, modID)
+ if err != nil {
+ return nil, err
+ }
+
+ app.Subscriptions = append(app.Subscriptions, &caddyevents.Subscription{
+ Events: []string{eventName},
+ HandlersRaw: []json.RawMessage{
+ caddyconfig.JSONModuleObject(unm, "handler", handlerName, nil),
+ },
+ })
+
+ default:
+ return nil, d.ArgErr()
+ }
+ }
+
+ return httpcaddyfile.App{
+ Name: "events",
+ Value: caddyconfig.JSON(app, nil),
+ }, nil
+}