diff options
Diffstat (limited to 'modules/caddyhttp/reverseproxy/upstreams.go')
-rw-r--r-- | modules/caddyhttp/reverseproxy/upstreams.go | 81 |
1 files changed, 76 insertions, 5 deletions
diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index f788dad..f49bb28 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -2,6 +2,7 @@ package reverseproxy import ( "context" + "encoding/json" "fmt" weakrand "math/rand" "net" @@ -18,6 +19,7 @@ import ( func init() { caddy.RegisterModule(SRVUpstreams{}) caddy.RegisterModule(AUpstreams{}) + caddy.RegisterModule(MultiUpstreams{}) } // SRVUpstreams provides upstreams from SRV lookups. @@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool { return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh) } -var ( - srvs = make(map[string]srvLookup) - srvsMu sync.RWMutex -) - // AUpstreams provides upstreams from A/AAAA lookups. // Results are cached and refreshed at the configured // refresh interval. @@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool { return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } +// MultiUpstreams is a single dynamic upstream source that +// aggregates the results of multiple dynamic upstream sources. +// All configured sources will be queried in order, with their +// results appended to the end of the list. Errors returned +// from individual sources will be logged and the next source +// will continue to be invoked. +// +// This module makes it easy to implement redundant cluster +// failovers, especially in conjunction with the `first` load +// balancing policy: if the first source returns an error or +// no upstreams, the second source's upstreams will be used +// naturally. +type MultiUpstreams struct { + // The list of upstream source modules to get upstreams from. + // They will be queried in order, with their results appended + // in the order they are returned. + SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + sources []UpstreamSource + + logger *zap.Logger +} + +// CaddyModule returns the Caddy module information. +func (MultiUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.multi", + New: func() caddy.Module { return new(MultiUpstreams) }, + } +} + +func (mu *MultiUpstreams) Provision(ctx caddy.Context) error { + mu.logger = ctx.Logger(mu) + + if mu.SourcesRaw != nil { + mod, err := ctx.LoadModule(mu, "SourcesRaw") + if err != nil { + return fmt.Errorf("loading upstream source modules: %v", err) + } + for _, src := range mod.([]any) { + mu.sources = append(mu.sources, src.(UpstreamSource)) + } + } + + return nil +} + +func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + var upstreams []*Upstream + + for i, src := range mu.sources { + select { + case <-r.Context().Done(): + return upstreams, context.Canceled + default: + } + + up, err := src.GetUpstreams(r) + if err != nil { + mu.logger.Error("upstream source returned error", + zap.Int("source_idx", i), + zap.Error(err)) + } else if len(up) == 0 { + mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i)) + } else { + upstreams = append(upstreams, up...) + } + } + + return upstreams, nil +} + // UpstreamResolver holds the set of addresses of DNS resolvers of // upstream addresses type UpstreamResolver struct { @@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error { } var ( + srvs = make(map[string]srvLookup) + srvsMu sync.RWMutex + aAaaa = make(map[string]aLookup) aAaaaMu sync.RWMutex ) |