summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy
diff options
context:
space:
mode:
Diffstat (limited to 'modules/caddyhttp/reverseproxy')
-rw-r--r--modules/caddyhttp/reverseproxy/caddyfile.go257
-rw-r--r--modules/caddyhttp/reverseproxy/upstreams.go81
2 files changed, 218 insertions, 120 deletions
diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go
index a2c85f9..f746ee5 100644
--- a/modules/caddyhttp/reverseproxy/caddyfile.go
+++ b/modules/caddyhttp/reverseproxy/caddyfile.go
@@ -52,73 +52,73 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
-// reverse_proxy [<matcher>] [<upstreams...>] {
-// # backends
-// to <upstreams...>
-// dynamic <name> [...]
+// reverse_proxy [<matcher>] [<upstreams...>] {
+// # backends
+// to <upstreams...>
+// dynamic <name> [...]
//
-// # load balancing
-// lb_policy <name> [<options...>]
-// lb_retries <retries>
-// lb_try_duration <duration>
-// lb_try_interval <interval>
-// lb_retry_match <request-matcher>
+// # load balancing
+// lb_policy <name> [<options...>]
+// lb_retries <retries>
+// lb_try_duration <duration>
+// lb_try_interval <interval>
+// lb_retry_match <request-matcher>
//
-// # active health checking
-// health_uri <uri>
-// health_port <port>
-// health_interval <interval>
-// health_timeout <duration>
-// health_status <status>
-// health_body <regexp>
-// health_headers {
-// <field> [<values...>]
-// }
+// # active health checking
+// health_uri <uri>
+// health_port <port>
+// health_interval <interval>
+// health_timeout <duration>
+// health_status <status>
+// health_body <regexp>
+// health_headers {
+// <field> [<values...>]
+// }
//
-// # passive health checking
-// fail_duration <duration>
-// max_fails <num>
-// unhealthy_status <status>
-// unhealthy_latency <duration>
-// unhealthy_request_count <num>
+// # passive health checking
+// fail_duration <duration>
+// max_fails <num>
+// unhealthy_status <status>
+// unhealthy_latency <duration>
+// unhealthy_request_count <num>
//
-// # streaming
-// flush_interval <duration>
-// buffer_requests
-// buffer_responses
-// max_buffer_size <size>
+// # streaming
+// flush_interval <duration>
+// buffer_requests
+// buffer_responses
+// max_buffer_size <size>
//
-// # request manipulation
-// trusted_proxies [private_ranges] <ranges...>
-// header_up [+|-]<field> [<value|regexp> [<replacement>]]
-// header_down [+|-]<field> [<value|regexp> [<replacement>]]
-// method <method>
-// rewrite <to>
+// # request manipulation
+// trusted_proxies [private_ranges] <ranges...>
+// header_up [+|-]<field> [<value|regexp> [<replacement>]]
+// header_down [+|-]<field> [<value|regexp> [<replacement>]]
+// method <method>
+// rewrite <to>
//
-// # round trip
-// transport <name> {
-// ...
-// }
+// # round trip
+// transport <name> {
+// ...
+// }
//
-// # optionally intercept responses from upstream
-// @name {
-// status <code...>
-// header <field> [<value>]
-// }
-// replace_status [<matcher>] <status_code>
-// handle_response [<matcher>] {
-// <directives...>
+// # optionally intercept responses from upstream
+// @name {
+// status <code...>
+// header <field> [<value>]
+// }
+// replace_status [<matcher>] <status_code>
+// handle_response [<matcher>] {
+// <directives...>
//
-// # special directives only available in handle_response
-// copy_response [<matcher>] [<status>] {
-// status <status>
-// }
-// copy_response_headers [<matcher>] {
-// include <fields...>
-// exclude <fields...>
-// }
-// }
-// }
+// # special directives only available in handle_response
+// copy_response [<matcher>] [<status>] {
+// status <status>
+// }
+// copy_response_headers [<matcher>] {
+// include <fields...>
+// exclude <fields...>
+// }
+// }
+// }
//
// Proxy upstream addresses should be network dial addresses such
// as `host:port`, or a URL such as `scheme://host:port`. Scheme
@@ -824,33 +824,32 @@ func (h *Handler) FinalizeUnmarshalCaddyfile(helper httpcaddyfile.Helper) error
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
-// transport http {
-// read_buffer <size>
-// write_buffer <size>
-// max_response_header <size>
-// dial_timeout <duration>
-// dial_fallback_delay <duration>
-// response_header_timeout <duration>
-// expect_continue_timeout <duration>
-// resolvers <resolvers...>
-// tls
-// tls_client_auth <automate_name> | <cert_file> <key_file>
-// tls_insecure_skip_verify
-// tls_timeout <duration>
-// tls_trusted_ca_certs <cert_files...>
-// tls_server_name <sni>
-// tls_renegotiation <level>
-// tls_except_ports <ports...>
-// keepalive [off|<duration>]
-// keepalive_interval <interval>
-// keepalive_idle_conns <max_count>
-// keepalive_idle_conns_per_host <count>
-// versions <versions...>
-// compression off
-// max_conns_per_host <count>
-// max_idle_conns_per_host <count>
-// }
-//
+// transport http {
+// read_buffer <size>
+// write_buffer <size>
+// max_response_header <size>
+// dial_timeout <duration>
+// dial_fallback_delay <duration>
+// response_header_timeout <duration>
+// expect_continue_timeout <duration>
+// resolvers <resolvers...>
+// tls
+// tls_client_auth <automate_name> | <cert_file> <key_file>
+// tls_insecure_skip_verify
+// tls_timeout <duration>
+// tls_trusted_ca_certs <cert_files...>
+// tls_server_name <sni>
+// tls_renegotiation <level>
+// tls_except_ports <ports...>
+// keepalive [off|<duration>]
+// keepalive_interval <interval>
+// keepalive_idle_conns <max_count>
+// keepalive_idle_conns_per_host <count>
+// versions <versions...>
+// compression off
+// max_conns_per_host <count>
+// max_idle_conns_per_host <count>
+// }
func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
for d.NextBlock(0) {
@@ -1138,10 +1137,9 @@ func parseCopyResponseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHan
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
-// copy_response [<matcher>] [<status>] {
-// status <status>
-// }
-//
+// copy_response [<matcher>] [<status>] {
+// status <status>
+// }
func (h *CopyResponseHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@@ -1178,11 +1176,10 @@ func parseCopyResponseHeadersCaddyfile(h httpcaddyfile.Helper) (caddyhttp.Middle
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
-// copy_response_headers [<matcher>] {
-// include <fields...>
-// exclude <fields...>
-// }
-//
+// copy_response_headers [<matcher>] {
+// include <fields...>
+// exclude <fields...>
+// }
func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@@ -1208,16 +1205,15 @@ func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser)
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
-// dynamic srv [<name>] {
-// service <service>
-// proto <proto>
-// name <name>
-// refresh <interval>
-// resolvers <resolvers...>
-// dial_timeout <timeout>
-// dial_fallback_delay <timeout>
-// }
-//
+// dynamic srv [<name>] {
+// service <service>
+// proto <proto>
+// name <name>
+// refresh <interval>
+// resolvers <resolvers...>
+// dial_timeout <timeout>
+// dial_fallback_delay <timeout>
+// }
func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@@ -1307,15 +1303,14 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
-// dynamic a [<name> <port] {
-// name <name>
-// port <port>
-// refresh <interval>
-// resolvers <resolvers...>
-// dial_timeout <timeout>
-// dial_fallback_delay <timeout>
-// }
-//
+// dynamic a [<name> <port] {
+// name <name>
+// port <port>
+// refresh <interval>
+// resolvers <resolvers...>
+// dial_timeout <timeout>
+// dial_fallback_delay <timeout>
+// }
func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@@ -1324,7 +1319,9 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
if len(args) > 0 {
u.Name = args[0]
- u.Port = args[1]
+ if len(args) == 2 {
+ u.Port = args[1]
+ }
}
for d.NextBlock(0) {
@@ -1395,6 +1392,35 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return nil
}
+// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
+//
+// dynamic multi {
+// <source> [...]
+// }
+func (u *MultiUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
+ for d.Next() {
+ if d.NextArg() {
+ return d.ArgErr()
+ }
+
+ for nesting := d.Nesting(); d.NextBlock(nesting); {
+ dynModule := d.Val()
+ modID := "http.reverse_proxy.upstreams." + dynModule
+ unm, err := caddyfile.UnmarshalModule(d, modID)
+ if err != nil {
+ return err
+ }
+ source, ok := unm.(UpstreamSource)
+ if !ok {
+ return d.Errf("module %s (%T) is not an UpstreamSource", modID, unm)
+ }
+ u.SourcesRaw = append(u.SourcesRaw, caddyconfig.JSONModuleObject(source, "source", dynModule, nil))
+ }
+ }
+
+ return nil
+}
+
const matcherPrefix = "@"
// Interface guards
@@ -1403,4 +1429,5 @@ var (
_ caddyfile.Unmarshaler = (*HTTPTransport)(nil)
_ caddyfile.Unmarshaler = (*SRVUpstreams)(nil)
_ caddyfile.Unmarshaler = (*AUpstreams)(nil)
+ _ caddyfile.Unmarshaler = (*MultiUpstreams)(nil)
)
diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go
index f788dad..f49bb28 100644
--- a/modules/caddyhttp/reverseproxy/upstreams.go
+++ b/modules/caddyhttp/reverseproxy/upstreams.go
@@ -2,6 +2,7 @@ package reverseproxy
import (
"context"
+ "encoding/json"
"fmt"
weakrand "math/rand"
"net"
@@ -18,6 +19,7 @@ import (
func init() {
caddy.RegisterModule(SRVUpstreams{})
caddy.RegisterModule(AUpstreams{})
+ caddy.RegisterModule(MultiUpstreams{})
}
// SRVUpstreams provides upstreams from SRV lookups.
@@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool {
return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh)
}
-var (
- srvs = make(map[string]srvLookup)
- srvsMu sync.RWMutex
-)
-
// AUpstreams provides upstreams from A/AAAA lookups.
// Results are cached and refreshed at the configured
// refresh interval.
@@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool {
return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh)
}
+// MultiUpstreams is a single dynamic upstream source that
+// aggregates the results of multiple dynamic upstream sources.
+// All configured sources will be queried in order, with their
+// results appended to the end of the list. Errors returned
+// from individual sources will be logged and the next source
+// will continue to be invoked.
+//
+// This module makes it easy to implement redundant cluster
+// failovers, especially in conjunction with the `first` load
+// balancing policy: if the first source returns an error or
+// no upstreams, the second source's upstreams will be used
+// naturally.
+type MultiUpstreams struct {
+ // The list of upstream source modules to get upstreams from.
+ // They will be queried in order, with their results appended
+ // in the order they are returned.
+ SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`
+ sources []UpstreamSource
+
+ logger *zap.Logger
+}
+
+// CaddyModule returns the Caddy module information.
+func (MultiUpstreams) CaddyModule() caddy.ModuleInfo {
+ return caddy.ModuleInfo{
+ ID: "http.reverse_proxy.upstreams.multi",
+ New: func() caddy.Module { return new(MultiUpstreams) },
+ }
+}
+
+func (mu *MultiUpstreams) Provision(ctx caddy.Context) error {
+ mu.logger = ctx.Logger(mu)
+
+ if mu.SourcesRaw != nil {
+ mod, err := ctx.LoadModule(mu, "SourcesRaw")
+ if err != nil {
+ return fmt.Errorf("loading upstream source modules: %v", err)
+ }
+ for _, src := range mod.([]any) {
+ mu.sources = append(mu.sources, src.(UpstreamSource))
+ }
+ }
+
+ return nil
+}
+
+func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
+ var upstreams []*Upstream
+
+ for i, src := range mu.sources {
+ select {
+ case <-r.Context().Done():
+ return upstreams, context.Canceled
+ default:
+ }
+
+ up, err := src.GetUpstreams(r)
+ if err != nil {
+ mu.logger.Error("upstream source returned error",
+ zap.Int("source_idx", i),
+ zap.Error(err))
+ } else if len(up) == 0 {
+ mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i))
+ } else {
+ upstreams = append(upstreams, up...)
+ }
+ }
+
+ return upstreams, nil
+}
+
// UpstreamResolver holds the set of addresses of DNS resolvers of
// upstream addresses
type UpstreamResolver struct {
@@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error {
}
var (
+ srvs = make(map[string]srvLookup)
+ srvsMu sync.RWMutex
+
aAaaa = make(map[string]aLookup)
aAaaaMu sync.RWMutex
)