summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/upstreams.go
diff options
context:
space:
mode:
authorMatthew Holt <mholt@users.noreply.github.com>2022-08-25 21:42:48 -0600
committerMatthew Holt <mholt@users.noreply.github.com>2022-08-25 21:42:48 -0600
commit5fb5b81439a521294a88cf4ad6da49fdc350f3aa (patch)
treeb0dbac5a90edc8ba3bbb87ffcfa2d9588c997794 /modules/caddyhttp/reverseproxy/upstreams.go
parent2cc5d38229d92838b2753fd0e67e083f70251f75 (diff)
reverseproxy: Multiple dynamic upstreams
This allows users to, for example, get upstreams from multiple SRV endpoints in order (such as primary and secondary clusters). Also, gofmt went to town on the comments, sigh
Diffstat (limited to 'modules/caddyhttp/reverseproxy/upstreams.go')
-rw-r--r--modules/caddyhttp/reverseproxy/upstreams.go81
1 files changed, 76 insertions, 5 deletions
diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go
index f788dad..f49bb28 100644
--- a/modules/caddyhttp/reverseproxy/upstreams.go
+++ b/modules/caddyhttp/reverseproxy/upstreams.go
@@ -2,6 +2,7 @@ package reverseproxy
import (
"context"
+ "encoding/json"
"fmt"
weakrand "math/rand"
"net"
@@ -18,6 +19,7 @@ import (
func init() {
caddy.RegisterModule(SRVUpstreams{})
caddy.RegisterModule(AUpstreams{})
+ caddy.RegisterModule(MultiUpstreams{})
}
// SRVUpstreams provides upstreams from SRV lookups.
@@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool {
return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh)
}
-var (
- srvs = make(map[string]srvLookup)
- srvsMu sync.RWMutex
-)
-
// AUpstreams provides upstreams from A/AAAA lookups.
// Results are cached and refreshed at the configured
// refresh interval.
@@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool {
return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh)
}
+// MultiUpstreams is a single dynamic upstream source that
+// aggregates the results of multiple dynamic upstream sources.
+// All configured sources will be queried in order, with their
+// results appended to the end of the list. Errors returned
+// from individual sources will be logged and the next source
+// will continue to be invoked.
+//
+// This module makes it easy to implement redundant cluster
+// failovers, especially in conjunction with the `first` load
+// balancing policy: if the first source returns an error or
+// no upstreams, the second source's upstreams will be used
+// naturally.
+type MultiUpstreams struct {
+ // The list of upstream source modules to get upstreams from.
+ // They will be queried in order, with their results appended
+ // in the order they are returned.
+ SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`
+ sources []UpstreamSource
+
+ logger *zap.Logger
+}
+
+// CaddyModule returns the Caddy module information.
+func (MultiUpstreams) CaddyModule() caddy.ModuleInfo {
+ return caddy.ModuleInfo{
+ ID: "http.reverse_proxy.upstreams.multi",
+ New: func() caddy.Module { return new(MultiUpstreams) },
+ }
+}
+
+func (mu *MultiUpstreams) Provision(ctx caddy.Context) error {
+ mu.logger = ctx.Logger(mu)
+
+ if mu.SourcesRaw != nil {
+ mod, err := ctx.LoadModule(mu, "SourcesRaw")
+ if err != nil {
+ return fmt.Errorf("loading upstream source modules: %v", err)
+ }
+ for _, src := range mod.([]any) {
+ mu.sources = append(mu.sources, src.(UpstreamSource))
+ }
+ }
+
+ return nil
+}
+
+func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
+ var upstreams []*Upstream
+
+ for i, src := range mu.sources {
+ select {
+ case <-r.Context().Done():
+ return upstreams, context.Canceled
+ default:
+ }
+
+ up, err := src.GetUpstreams(r)
+ if err != nil {
+ mu.logger.Error("upstream source returned error",
+ zap.Int("source_idx", i),
+ zap.Error(err))
+ } else if len(up) == 0 {
+ mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i))
+ } else {
+ upstreams = append(upstreams, up...)
+ }
+ }
+
+ return upstreams, nil
+}
+
// UpstreamResolver holds the set of addresses of DNS resolvers of
// upstream addresses
type UpstreamResolver struct {
@@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error {
}
var (
+ srvs = make(map[string]srvLookup)
+ srvsMu sync.RWMutex
+
aAaaa = make(map[string]aLookup)
aAaaaMu sync.RWMutex
)