diff options
-rw-r--r-- | modules/caddyhttp/reverseproxy/caddyfile.go | 257 | ||||
-rw-r--r-- | modules/caddyhttp/reverseproxy/upstreams.go | 81 |
2 files changed, 218 insertions, 120 deletions
diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index a2c85f9..f746ee5 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -52,73 +52,73 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// reverse_proxy [<matcher>] [<upstreams...>] { -// # backends -// to <upstreams...> -// dynamic <name> [...] +// reverse_proxy [<matcher>] [<upstreams...>] { +// # backends +// to <upstreams...> +// dynamic <name> [...] // -// # load balancing -// lb_policy <name> [<options...>] -// lb_retries <retries> -// lb_try_duration <duration> -// lb_try_interval <interval> -// lb_retry_match <request-matcher> +// # load balancing +// lb_policy <name> [<options...>] +// lb_retries <retries> +// lb_try_duration <duration> +// lb_try_interval <interval> +// lb_retry_match <request-matcher> // -// # active health checking -// health_uri <uri> -// health_port <port> -// health_interval <interval> -// health_timeout <duration> -// health_status <status> -// health_body <regexp> -// health_headers { -// <field> [<values...>] -// } +// # active health checking +// health_uri <uri> +// health_port <port> +// health_interval <interval> +// health_timeout <duration> +// health_status <status> +// health_body <regexp> +// health_headers { +// <field> [<values...>] +// } // -// # passive health checking -// fail_duration <duration> -// max_fails <num> -// unhealthy_status <status> -// unhealthy_latency <duration> -// unhealthy_request_count <num> +// # passive health checking +// fail_duration <duration> +// max_fails <num> +// unhealthy_status <status> +// unhealthy_latency <duration> +// unhealthy_request_count <num> // -// # streaming -// flush_interval <duration> -// buffer_requests -// buffer_responses -// max_buffer_size <size> +// # streaming +// flush_interval <duration> +// buffer_requests +// buffer_responses +// max_buffer_size <size> // -// # request manipulation -// trusted_proxies [private_ranges] <ranges...> -// header_up [+|-]<field> [<value|regexp> [<replacement>]] -// header_down [+|-]<field> [<value|regexp> [<replacement>]] -// method <method> -// rewrite <to> +// # request manipulation +// trusted_proxies [private_ranges] <ranges...> +// header_up [+|-]<field> [<value|regexp> [<replacement>]] +// header_down [+|-]<field> [<value|regexp> [<replacement>]] +// method <method> +// rewrite <to> // -// # round trip -// transport <name> { -// ... -// } +// # round trip +// transport <name> { +// ... +// } // -// # optionally intercept responses from upstream -// @name { -// status <code...> -// header <field> [<value>] -// } -// replace_status [<matcher>] <status_code> -// handle_response [<matcher>] { -// <directives...> +// # optionally intercept responses from upstream +// @name { +// status <code...> +// header <field> [<value>] +// } +// replace_status [<matcher>] <status_code> +// handle_response [<matcher>] { +// <directives...> // -// # special directives only available in handle_response -// copy_response [<matcher>] [<status>] { -// status <status> -// } -// copy_response_headers [<matcher>] { -// include <fields...> -// exclude <fields...> -// } -// } -// } +// # special directives only available in handle_response +// copy_response [<matcher>] [<status>] { +// status <status> +// } +// copy_response_headers [<matcher>] { +// include <fields...> +// exclude <fields...> +// } +// } +// } // // Proxy upstream addresses should be network dial addresses such // as `host:port`, or a URL such as `scheme://host:port`. Scheme @@ -824,33 +824,32 @@ func (h *Handler) FinalizeUnmarshalCaddyfile(helper httpcaddyfile.Helper) error // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// transport http { -// read_buffer <size> -// write_buffer <size> -// max_response_header <size> -// dial_timeout <duration> -// dial_fallback_delay <duration> -// response_header_timeout <duration> -// expect_continue_timeout <duration> -// resolvers <resolvers...> -// tls -// tls_client_auth <automate_name> | <cert_file> <key_file> -// tls_insecure_skip_verify -// tls_timeout <duration> -// tls_trusted_ca_certs <cert_files...> -// tls_server_name <sni> -// tls_renegotiation <level> -// tls_except_ports <ports...> -// keepalive [off|<duration>] -// keepalive_interval <interval> -// keepalive_idle_conns <max_count> -// keepalive_idle_conns_per_host <count> -// versions <versions...> -// compression off -// max_conns_per_host <count> -// max_idle_conns_per_host <count> -// } -// +// transport http { +// read_buffer <size> +// write_buffer <size> +// max_response_header <size> +// dial_timeout <duration> +// dial_fallback_delay <duration> +// response_header_timeout <duration> +// expect_continue_timeout <duration> +// resolvers <resolvers...> +// tls +// tls_client_auth <automate_name> | <cert_file> <key_file> +// tls_insecure_skip_verify +// tls_timeout <duration> +// tls_trusted_ca_certs <cert_files...> +// tls_server_name <sni> +// tls_renegotiation <level> +// tls_except_ports <ports...> +// keepalive [off|<duration>] +// keepalive_interval <interval> +// keepalive_idle_conns <max_count> +// keepalive_idle_conns_per_host <count> +// versions <versions...> +// compression off +// max_conns_per_host <count> +// max_idle_conns_per_host <count> +// } func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { for d.NextBlock(0) { @@ -1138,10 +1137,9 @@ func parseCopyResponseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHan // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// copy_response [<matcher>] [<status>] { -// status <status> -// } -// +// copy_response [<matcher>] [<status>] { +// status <status> +// } func (h *CopyResponseHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1178,11 +1176,10 @@ func parseCopyResponseHeadersCaddyfile(h httpcaddyfile.Helper) (caddyhttp.Middle // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// copy_response_headers [<matcher>] { -// include <fields...> -// exclude <fields...> -// } -// +// copy_response_headers [<matcher>] { +// include <fields...> +// exclude <fields...> +// } func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1208,16 +1205,15 @@ func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic srv [<name>] { -// service <service> -// proto <proto> -// name <name> -// refresh <interval> -// resolvers <resolvers...> -// dial_timeout <timeout> -// dial_fallback_delay <timeout> -// } -// +// dynamic srv [<name>] { +// service <service> +// proto <proto> +// name <name> +// refresh <interval> +// resolvers <resolvers...> +// dial_timeout <timeout> +// dial_fallback_delay <timeout> +// } func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1307,15 +1303,14 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic a [<name> <port] { -// name <name> -// port <port> -// refresh <interval> -// resolvers <resolvers...> -// dial_timeout <timeout> -// dial_fallback_delay <timeout> -// } -// +// dynamic a [<name> <port] { +// name <name> +// port <port> +// refresh <interval> +// resolvers <resolvers...> +// dial_timeout <timeout> +// dial_fallback_delay <timeout> +// } func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1324,7 +1319,9 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } if len(args) > 0 { u.Name = args[0] - u.Port = args[1] + if len(args) == 2 { + u.Port = args[1] + } } for d.NextBlock(0) { @@ -1395,6 +1392,35 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return nil } +// UnmarshalCaddyfile deserializes Caddyfile tokens into h. +// +// dynamic multi { +// <source> [...] +// } +func (u *MultiUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + for d.Next() { + if d.NextArg() { + return d.ArgErr() + } + + for nesting := d.Nesting(); d.NextBlock(nesting); { + dynModule := d.Val() + modID := "http.reverse_proxy.upstreams." + dynModule + unm, err := caddyfile.UnmarshalModule(d, modID) + if err != nil { + return err + } + source, ok := unm.(UpstreamSource) + if !ok { + return d.Errf("module %s (%T) is not an UpstreamSource", modID, unm) + } + u.SourcesRaw = append(u.SourcesRaw, caddyconfig.JSONModuleObject(source, "source", dynModule, nil)) + } + } + + return nil +} + const matcherPrefix = "@" // Interface guards @@ -1403,4 +1429,5 @@ var ( _ caddyfile.Unmarshaler = (*HTTPTransport)(nil) _ caddyfile.Unmarshaler = (*SRVUpstreams)(nil) _ caddyfile.Unmarshaler = (*AUpstreams)(nil) + _ caddyfile.Unmarshaler = (*MultiUpstreams)(nil) ) diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index f788dad..f49bb28 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -2,6 +2,7 @@ package reverseproxy import ( "context" + "encoding/json" "fmt" weakrand "math/rand" "net" @@ -18,6 +19,7 @@ import ( func init() { caddy.RegisterModule(SRVUpstreams{}) caddy.RegisterModule(AUpstreams{}) + caddy.RegisterModule(MultiUpstreams{}) } // SRVUpstreams provides upstreams from SRV lookups. @@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool { return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh) } -var ( - srvs = make(map[string]srvLookup) - srvsMu sync.RWMutex -) - // AUpstreams provides upstreams from A/AAAA lookups. // Results are cached and refreshed at the configured // refresh interval. @@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool { return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } +// MultiUpstreams is a single dynamic upstream source that +// aggregates the results of multiple dynamic upstream sources. +// All configured sources will be queried in order, with their +// results appended to the end of the list. Errors returned +// from individual sources will be logged and the next source +// will continue to be invoked. +// +// This module makes it easy to implement redundant cluster +// failovers, especially in conjunction with the `first` load +// balancing policy: if the first source returns an error or +// no upstreams, the second source's upstreams will be used +// naturally. +type MultiUpstreams struct { + // The list of upstream source modules to get upstreams from. + // They will be queried in order, with their results appended + // in the order they are returned. + SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + sources []UpstreamSource + + logger *zap.Logger +} + +// CaddyModule returns the Caddy module information. +func (MultiUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.multi", + New: func() caddy.Module { return new(MultiUpstreams) }, + } +} + +func (mu *MultiUpstreams) Provision(ctx caddy.Context) error { + mu.logger = ctx.Logger(mu) + + if mu.SourcesRaw != nil { + mod, err := ctx.LoadModule(mu, "SourcesRaw") + if err != nil { + return fmt.Errorf("loading upstream source modules: %v", err) + } + for _, src := range mod.([]any) { + mu.sources = append(mu.sources, src.(UpstreamSource)) + } + } + + return nil +} + +func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + var upstreams []*Upstream + + for i, src := range mu.sources { + select { + case <-r.Context().Done(): + return upstreams, context.Canceled + default: + } + + up, err := src.GetUpstreams(r) + if err != nil { + mu.logger.Error("upstream source returned error", + zap.Int("source_idx", i), + zap.Error(err)) + } else if len(up) == 0 { + mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i)) + } else { + upstreams = append(upstreams, up...) + } + } + + return upstreams, nil +} + // UpstreamResolver holds the set of addresses of DNS resolvers of // upstream addresses type UpstreamResolver struct { @@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error { } var ( + srvs = make(map[string]srvLookup) + srvsMu sync.RWMutex + aAaaa = make(map[string]aLookup) aAaaaMu sync.RWMutex ) |