diff options
author | Matthew Holt <mholt@users.noreply.github.com> | 2022-08-25 21:42:48 -0600 |
---|---|---|
committer | Matthew Holt <mholt@users.noreply.github.com> | 2022-08-25 21:42:48 -0600 |
commit | 5fb5b81439a521294a88cf4ad6da49fdc350f3aa (patch) | |
tree | b0dbac5a90edc8ba3bbb87ffcfa2d9588c997794 /modules/caddyhttp/reverseproxy/upstreams.go | |
parent | 2cc5d38229d92838b2753fd0e67e083f70251f75 (diff) |
reverseproxy: Multiple dynamic upstreams
This allows users to, for example, get upstreams from multiple SRV
endpoints in order (such as primary and secondary clusters).
Also, gofmt went to town on the comments, sigh
Diffstat (limited to 'modules/caddyhttp/reverseproxy/upstreams.go')
-rw-r--r-- | modules/caddyhttp/reverseproxy/upstreams.go | 81 |
1 files changed, 76 insertions, 5 deletions
diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index f788dad..f49bb28 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -2,6 +2,7 @@ package reverseproxy import ( "context" + "encoding/json" "fmt" weakrand "math/rand" "net" @@ -18,6 +19,7 @@ import ( func init() { caddy.RegisterModule(SRVUpstreams{}) caddy.RegisterModule(AUpstreams{}) + caddy.RegisterModule(MultiUpstreams{}) } // SRVUpstreams provides upstreams from SRV lookups. @@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool { return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh) } -var ( - srvs = make(map[string]srvLookup) - srvsMu sync.RWMutex -) - // AUpstreams provides upstreams from A/AAAA lookups. // Results are cached and refreshed at the configured // refresh interval. @@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool { return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } +// MultiUpstreams is a single dynamic upstream source that +// aggregates the results of multiple dynamic upstream sources. +// All configured sources will be queried in order, with their +// results appended to the end of the list. Errors returned +// from individual sources will be logged and the next source +// will continue to be invoked. +// +// This module makes it easy to implement redundant cluster +// failovers, especially in conjunction with the `first` load +// balancing policy: if the first source returns an error or +// no upstreams, the second source's upstreams will be used +// naturally. +type MultiUpstreams struct { + // The list of upstream source modules to get upstreams from. + // They will be queried in order, with their results appended + // in the order they are returned. + SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + sources []UpstreamSource + + logger *zap.Logger +} + +// CaddyModule returns the Caddy module information. +func (MultiUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.multi", + New: func() caddy.Module { return new(MultiUpstreams) }, + } +} + +func (mu *MultiUpstreams) Provision(ctx caddy.Context) error { + mu.logger = ctx.Logger(mu) + + if mu.SourcesRaw != nil { + mod, err := ctx.LoadModule(mu, "SourcesRaw") + if err != nil { + return fmt.Errorf("loading upstream source modules: %v", err) + } + for _, src := range mod.([]any) { + mu.sources = append(mu.sources, src.(UpstreamSource)) + } + } + + return nil +} + +func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + var upstreams []*Upstream + + for i, src := range mu.sources { + select { + case <-r.Context().Done(): + return upstreams, context.Canceled + default: + } + + up, err := src.GetUpstreams(r) + if err != nil { + mu.logger.Error("upstream source returned error", + zap.Int("source_idx", i), + zap.Error(err)) + } else if len(up) == 0 { + mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i)) + } else { + upstreams = append(upstreams, up...) + } + } + + return upstreams, nil +} + // UpstreamResolver holds the set of addresses of DNS resolvers of // upstream addresses type UpstreamResolver struct { @@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error { } var ( + srvs = make(map[string]srvLookup) + srvsMu sync.RWMutex + aAaaa = make(map[string]aLookup) aAaaaMu sync.RWMutex ) |