summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/upstreams.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/caddyhttp/reverseproxy/upstreams.go')
-rw-r--r--modules/caddyhttp/reverseproxy/upstreams.go81
1 files changed, 76 insertions, 5 deletions
diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go
index f788dad..f49bb28 100644
--- a/modules/caddyhttp/reverseproxy/upstreams.go
+++ b/modules/caddyhttp/reverseproxy/upstreams.go
@@ -2,6 +2,7 @@ package reverseproxy
import (
"context"
+ "encoding/json"
"fmt"
weakrand "math/rand"
"net"
@@ -18,6 +19,7 @@ import (
func init() {
caddy.RegisterModule(SRVUpstreams{})
caddy.RegisterModule(AUpstreams{})
+ caddy.RegisterModule(MultiUpstreams{})
}
// SRVUpstreams provides upstreams from SRV lookups.
@@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool {
return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh)
}
-var (
- srvs = make(map[string]srvLookup)
- srvsMu sync.RWMutex
-)
-
// AUpstreams provides upstreams from A/AAAA lookups.
// Results are cached and refreshed at the configured
// refresh interval.
@@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool {
return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh)
}
+// MultiUpstreams is a single dynamic upstream source that
+// aggregates the results of multiple dynamic upstream sources.
+// All configured sources will be queried in order, with their
+// results appended to the end of the list. Errors returned
+// from individual sources will be logged and the next source
+// will continue to be invoked.
+//
+// This module makes it easy to implement redundant cluster
+// failovers, especially in conjunction with the `first` load
+// balancing policy: if the first source returns an error or
+// no upstreams, the second source's upstreams will be used
+// naturally.
+type MultiUpstreams struct {
+ // The list of upstream source modules to get upstreams from.
+ // They will be queried in order, with their results appended
+ // in the order they are returned.
+ SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`
+ sources []UpstreamSource
+
+ logger *zap.Logger
+}
+
+// CaddyModule returns the Caddy module information.
+func (MultiUpstreams) CaddyModule() caddy.ModuleInfo {
+ return caddy.ModuleInfo{
+ ID: "http.reverse_proxy.upstreams.multi",
+ New: func() caddy.Module { return new(MultiUpstreams) },
+ }
+}
+
+func (mu *MultiUpstreams) Provision(ctx caddy.Context) error {
+ mu.logger = ctx.Logger(mu)
+
+ if mu.SourcesRaw != nil {
+ mod, err := ctx.LoadModule(mu, "SourcesRaw")
+ if err != nil {
+ return fmt.Errorf("loading upstream source modules: %v", err)
+ }
+ for _, src := range mod.([]any) {
+ mu.sources = append(mu.sources, src.(UpstreamSource))
+ }
+ }
+
+ return nil
+}
+
+func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
+ var upstreams []*Upstream
+
+ for i, src := range mu.sources {
+ select {
+ case <-r.Context().Done():
+ return upstreams, context.Canceled
+ default:
+ }
+
+ up, err := src.GetUpstreams(r)
+ if err != nil {
+ mu.logger.Error("upstream source returned error",
+ zap.Int("source_idx", i),
+ zap.Error(err))
+ } else if len(up) == 0 {
+ mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i))
+ } else {
+ upstreams = append(upstreams, up...)
+ }
+ }
+
+ return upstreams, nil
+}
+
// UpstreamResolver holds the set of addresses of DNS resolvers of
// upstream addresses
type UpstreamResolver struct {
@@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error {
}
var (
+ srvs = make(map[string]srvLookup)
+ srvsMu sync.RWMutex
+
aAaaa = make(map[string]aLookup)
aAaaaMu sync.RWMutex
)