summaryrefslogtreecommitdiff
path: root/modules/caddyevents/app.go
blob: 3ae40e81390d0b42de812eeb74fe3815a6c25903 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
// Copyright 2015 Matthew Holt and The Caddy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package caddyevents

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/caddyserver/caddy/v2"
	"github.com/google/uuid"
	"go.uber.org/zap"
)

func init() {
	caddy.RegisterModule(App{})
}

// App implements a global eventing system within Caddy.
// Modules can emit and subscribe to events, providing
// hooks into deep parts of the code base that aren't
// otherwise accessible. Events provide information about
// what and when things are happening, and this facility
// allows handlers to take action when events occur,
// add information to the event's metadata, and even
// control program flow in some cases.
//
// Events are propagated in a DOM-like fashion. An event
// emitted from module `a.b.c` (the "origin") will first
// invoke handlers listening to `a.b.c`, then `a.b`,
// then `a`, then those listening regardless of origin.
// If a handler returns the special error Aborted, then
// propagation immediately stops and the event is marked
// as aborted. Emitters may optionally choose to adjust
// program flow based on an abort.
//
// Modules can subscribe to events by origin and/or name.
// A handler is invoked only if it is subscribed to the
// event by name and origin. Subscriptions should be
// registered during the provisioning phase, before apps
// are started.
//
// Event handlers are fired synchronously as part of the
// regular flow of the program. This allows event handlers
// to control the flow of the program if the origin permits
// it and also allows handlers to convey new information
// back into the origin module before it continues.
// In essence, event handlers are similar to HTTP
// middleware handlers.
//
// Event bindings/subscribers are unordered; i.e.
// event handlers are invoked in an arbitrary order.
// Event handlers should not rely on the logic of other
// handlers to succeed.
//
// The entirety of this app module is EXPERIMENTAL and
// subject to change. Pay attention to release notes.
type App struct {
	// Subscriptions bind handlers to one or more events
	// either globally or scoped to specific modules or module
	// namespaces.
	Subscriptions []*Subscription `json:"subscriptions,omitempty"`

	// Map of event name to map of module ID/namespace to handlers
	subscriptions map[string]map[caddy.ModuleID][]Handler

	logger  *zap.Logger
	started bool
}

// Subscription represents binding of one or more handlers to
// one or more events.
type Subscription struct {
	// The name(s) of the event(s) to bind to. Default: all events.
	Events []string `json:"events,omitempty"`

	// The ID or namespace of the module(s) from which events
	// originate to listen to for events. Default: all modules.
	//
	// Events propagate up, so events emitted by module "a.b.c"
	// will also trigger the event for "a.b" and "a". Thus, to
	// receive all events from "a.b.c" and "a.b.d", for example,
	// one can subscribe to either "a.b" or all of "a" entirely.
	Modules []caddy.ModuleID `json:"modules,omitempty"`

	// The event handler modules. These implement the actual
	// behavior to invoke when an event occurs. At least one
	// handler is required.
	HandlersRaw []json.RawMessage `json:"handlers,omitempty" caddy:"namespace=events.handlers inline_key=handler"`

	// The decoded handlers; Go code that is subscribing to
	// an event should set this field directly; HandlersRaw
	// is meant for JSON configuration to fill out this field.
	Handlers []Handler `json:"-"`
}

// CaddyModule returns the Caddy module information.
func (App) CaddyModule() caddy.ModuleInfo {
	return caddy.ModuleInfo{
		ID:  "events",
		New: func() caddy.Module { return new(App) },
	}
}

// Provision sets up the app.
func (app *App) Provision(ctx caddy.Context) error {
	app.logger = ctx.Logger(app)
	app.subscriptions = make(map[string]map[caddy.ModuleID][]Handler)

	for _, sub := range app.Subscriptions {
		if sub.HandlersRaw != nil {
			handlersIface, err := ctx.LoadModule(sub, "HandlersRaw")
			if err != nil {
				return fmt.Errorf("loading event subscriber modules: %v", err)
			}
			for _, h := range handlersIface.([]any) {
				sub.Handlers = append(sub.Handlers, h.(Handler))
			}
			if len(sub.Handlers) == 0 {
				// pointless to bind without any handlers
				return fmt.Errorf("no handlers defined")
			}
		}
	}

	return nil
}

// Start runs the app.
func (app *App) Start() error {
	for _, sub := range app.Subscriptions {
		if err := app.Subscribe(sub); err != nil {
			return err
		}
	}

	app.started = true

	return nil
}

// Stop gracefully shuts down the app.
func (app *App) Stop() error {
	return nil
}

// Subscribe binds one or more event handlers to one or more events
// according to the subscription s. For now, subscriptions can only
// be created during the provision phase; new bindings cannot be
// created after the events app has started.
func (app *App) Subscribe(s *Subscription) error {
	if app.started {
		return fmt.Errorf("events already started; new subscriptions closed")
	}

	// handle special case of catch-alls (omission of event name or module space implies all)
	if len(s.Events) == 0 {
		s.Events = []string{""}
	}
	if len(s.Modules) == 0 {
		s.Modules = []caddy.ModuleID{""}
	}

	for _, eventName := range s.Events {
		if app.subscriptions[eventName] == nil {
			app.subscriptions[eventName] = make(map[caddy.ModuleID][]Handler)
		}
		for _, originModule := range s.Modules {
			app.subscriptions[eventName][originModule] = append(app.subscriptions[eventName][originModule], s.Handlers...)
		}
	}

	return nil
}

// On is syntactic sugar for Subscribe() that binds a single handler
// to a single event from any module. If the eventName is empty string,
// it counts for all events.
func (app *App) On(eventName string, handler Handler) error {
	return app.Subscribe(&Subscription{
		Events:   []string{eventName},
		Handlers: []Handler{handler},
	})
}

// Emit creates and dispatches an event named eventName to all relevant handlers with
// the metadata data. Events are emitted and propagated synchronously. The returned Event
// value will have any additional information from the invoked handlers.
func (app *App) Emit(ctx caddy.Context, eventName string, data map[string]any) Event {
	logger := app.logger.With(zap.String("name", eventName))

	id, err := uuid.NewRandom()
	if err != nil {
		logger.Error("failed generating new event ID", zap.Error(err))
	}

	eventName = strings.ToLower(eventName)

	e := Event{
		id:     id,
		ts:     time.Now(),
		name:   eventName,
		origin: ctx.Module(),
		data:   data,
	}

	logger = logger.With(
		zap.String("id", e.id.String()),
		zap.String("origin", e.origin.CaddyModule().String()))

	// add event info to replacer, make sure it's in the context
	repl, ok := ctx.Context.Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
	if !ok {
		repl = caddy.NewReplacer()
		ctx.Context = context.WithValue(ctx.Context, caddy.ReplacerCtxKey, repl)
	}
	repl.Map(func(key string) (any, bool) {
		switch key {
		case "event":
			return e, true
		case "event.id":
			return e.id, true
		case "event.name":
			return e.name, true
		case "event.time":
			return e.ts, true
		case "event.time_unix":
			return e.ts.UnixMilli(), true
		case "event.module":
			return e.origin.CaddyModule().ID, true
		case "event.data":
			return e.data, true
		}

		if strings.HasPrefix(key, "event.data.") {
			key = strings.TrimPrefix(key, "event.data.")
			if val, ok := data[key]; ok {
				return val, true
			}
		}

		return nil, false
	})

	logger.Debug("event", zap.Any("data", e.data))

	// invoke handlers bound to the event by name and also all events; this for loop
	// iterates twice at most: once for the event name, once for "" (all events)
	for {
		moduleID := e.origin.CaddyModule().ID

		// implement propagation up the module tree (i.e. start with "a.b.c" then "a.b" then "a" then "")
		for {
			if app.subscriptions[eventName] == nil {
				break // shortcut if event not bound at all
			}

			for _, handler := range app.subscriptions[eventName][moduleID] {
				select {
				case <-ctx.Done():
					logger.Error("context canceled; event handling stopped")
					return e
				default:
				}

				if err := handler.Handle(ctx, e); err != nil {
					aborted := errors.Is(err, ErrAborted)

					logger.Error("handler error",
						zap.Error(err),
						zap.Bool("aborted", aborted))

					if aborted {
						e.Aborted = err
						return e
					}
				}
			}

			if moduleID == "" {
				break
			}
			lastDot := strings.LastIndex(string(moduleID), ".")
			if lastDot < 0 {
				moduleID = "" // include handlers bound to events regardless of module
			} else {
				moduleID = moduleID[:lastDot]
			}
		}

		// include handlers listening to all events
		if eventName == "" {
			break
		}
		eventName = ""
	}

	return e
}

// Event represents something that has happened or is happening.
type Event struct {
	id     uuid.UUID
	ts     time.Time
	name   string
	origin caddy.Module
	data   map[string]any

	// If non-nil, the event has been aborted, meaning
	// propagation has stopped to other handlers and
	// the code should stop what it was doing. Emitters
	// may choose to use this as a signal to adjust their
	// code path appropriately.
	Aborted error
}

// CloudEvent exports event e as a structure that, when
// serialized as JSON, is compatible with the
// CloudEvents spec.
func (e Event) CloudEvent() CloudEvent {
	dataJSON, _ := json.Marshal(e.data)
	return CloudEvent{
		ID:              e.id.String(),
		Source:          e.origin.CaddyModule().String(),
		SpecVersion:     "1.0",
		Type:            e.name,
		Time:            e.ts,
		DataContentType: "application/json",
		Data:            dataJSON,
	}
}

// CloudEvent is a JSON-serializable structure that
// is compatible with the CloudEvents specification.
// See https://cloudevents.io.
type CloudEvent struct {
	ID              string          `json:"id"`
	Source          string          `json:"source"`
	SpecVersion     string          `json:"specversion"`
	Type            string          `json:"type"`
	Time            time.Time       `json:"time"`
	DataContentType string          `json:"datacontenttype,omitempty"`
	Data            json.RawMessage `json:"data,omitempty"`
}

// ErrAborted cancels an event.
var ErrAborted = errors.New("event aborted")

// Handler is a type that can handle events.
type Handler interface {
	Handle(context.Context, Event) error
}

// Interface guards
var (
	_ caddy.App         = (*App)(nil)
	_ caddy.Provisioner = (*App)(nil)
)