diff options
author | Matthew Holt <mholt@users.noreply.github.com> | 2022-08-25 21:42:48 -0600 |
---|---|---|
committer | Matthew Holt <mholt@users.noreply.github.com> | 2022-08-25 21:42:48 -0600 |
commit | 5fb5b81439a521294a88cf4ad6da49fdc350f3aa (patch) | |
tree | b0dbac5a90edc8ba3bbb87ffcfa2d9588c997794 /modules/caddyhttp | |
parent | 2cc5d38229d92838b2753fd0e67e083f70251f75 (diff) |
reverseproxy: Multiple dynamic upstreams
This allows users to, for example, get upstreams from multiple SRV
endpoints in order (such as primary and secondary clusters).
Also, gofmt went to town on the comments, sigh
Diffstat (limited to 'modules/caddyhttp')
-rw-r--r-- | modules/caddyhttp/reverseproxy/caddyfile.go | 257 | ||||
-rw-r--r-- | modules/caddyhttp/reverseproxy/upstreams.go | 81 |
2 files changed, 218 insertions, 120 deletions
diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index a2c85f9..f746ee5 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -52,73 +52,73 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// reverse_proxy [<matcher>] [<upstreams...>] { -// # backends -// to <upstreams...> -// dynamic <name> [...] +// reverse_proxy [<matcher>] [<upstreams...>] { +// # backends +// to <upstreams...> +// dynamic <name> [...] // -// # load balancing -// lb_policy <name> [<options...>] -// lb_retries <retries> -// lb_try_duration <duration> -// lb_try_interval <interval> -// lb_retry_match <request-matcher> +// # load balancing +// lb_policy <name> [<options...>] +// lb_retries <retries> +// lb_try_duration <duration> +// lb_try_interval <interval> +// lb_retry_match <request-matcher> // -// # active health checking -// health_uri <uri> -// health_port <port> -// health_interval <interval> -// health_timeout <duration> -// health_status <status> -// health_body <regexp> -// health_headers { -// <field> [<values...>] -// } +// # active health checking +// health_uri <uri> +// health_port <port> +// health_interval <interval> +// health_timeout <duration> +// health_status <status> +// health_body <regexp> +// health_headers { +// <field> [<values...>] +// } // -// # passive health checking -// fail_duration <duration> -// max_fails <num> -// unhealthy_status <status> -// unhealthy_latency <duration> -// unhealthy_request_count <num> +// # passive health checking +// fail_duration <duration> +// max_fails <num> +// unhealthy_status <status> +// unhealthy_latency <duration> +// unhealthy_request_count <num> // -// # streaming -// flush_interval <duration> -// buffer_requests -// buffer_responses -// max_buffer_size <size> +// # streaming +// flush_interval <duration> +// buffer_requests +// buffer_responses +// max_buffer_size <size> // -// # request manipulation -// trusted_proxies [private_ranges] <ranges...> -// header_up [+|-]<field> [<value|regexp> [<replacement>]] -// header_down [+|-]<field> [<value|regexp> [<replacement>]] -// method <method> -// rewrite <to> +// # request manipulation +// trusted_proxies [private_ranges] <ranges...> +// header_up [+|-]<field> [<value|regexp> [<replacement>]] +// header_down [+|-]<field> [<value|regexp> [<replacement>]] +// method <method> +// rewrite <to> // -// # round trip -// transport <name> { -// ... -// } +// # round trip +// transport <name> { +// ... +// } // -// # optionally intercept responses from upstream -// @name { -// status <code...> -// header <field> [<value>] -// } -// replace_status [<matcher>] <status_code> -// handle_response [<matcher>] { -// <directives...> +// # optionally intercept responses from upstream +// @name { +// status <code...> +// header <field> [<value>] +// } +// replace_status [<matcher>] <status_code> +// handle_response [<matcher>] { +// <directives...> // -// # special directives only available in handle_response -// copy_response [<matcher>] [<status>] { -// status <status> -// } -// copy_response_headers [<matcher>] { -// include <fields...> -// exclude <fields...> -// } -// } -// } +// # special directives only available in handle_response +// copy_response [<matcher>] [<status>] { +// status <status> +// } +// copy_response_headers [<matcher>] { +// include <fields...> +// exclude <fields...> +// } +// } +// } // // Proxy upstream addresses should be network dial addresses such // as `host:port`, or a URL such as `scheme://host:port`. Scheme @@ -824,33 +824,32 @@ func (h *Handler) FinalizeUnmarshalCaddyfile(helper httpcaddyfile.Helper) error // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// transport http { -// read_buffer <size> -// write_buffer <size> -// max_response_header <size> -// dial_timeout <duration> -// dial_fallback_delay <duration> -// response_header_timeout <duration> -// expect_continue_timeout <duration> -// resolvers <resolvers...> -// tls -// tls_client_auth <automate_name> | <cert_file> <key_file> -// tls_insecure_skip_verify -// tls_timeout <duration> -// tls_trusted_ca_certs <cert_files...> -// tls_server_name <sni> -// tls_renegotiation <level> -// tls_except_ports <ports...> -// keepalive [off|<duration>] -// keepalive_interval <interval> -// keepalive_idle_conns <max_count> -// keepalive_idle_conns_per_host <count> -// versions <versions...> -// compression off -// max_conns_per_host <count> -// max_idle_conns_per_host <count> -// } -// +// transport http { +// read_buffer <size> +// write_buffer <size> +// max_response_header <size> +// dial_timeout <duration> +// dial_fallback_delay <duration> +// response_header_timeout <duration> +// expect_continue_timeout <duration> +// resolvers <resolvers...> +// tls +// tls_client_auth <automate_name> | <cert_file> <key_file> +// tls_insecure_skip_verify +// tls_timeout <duration> +// tls_trusted_ca_certs <cert_files...> +// tls_server_name <sni> +// tls_renegotiation <level> +// tls_except_ports <ports...> +// keepalive [off|<duration>] +// keepalive_interval <interval> +// keepalive_idle_conns <max_count> +// keepalive_idle_conns_per_host <count> +// versions <versions...> +// compression off +// max_conns_per_host <count> +// max_idle_conns_per_host <count> +// } func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { for d.NextBlock(0) { @@ -1138,10 +1137,9 @@ func parseCopyResponseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHan // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// copy_response [<matcher>] [<status>] { -// status <status> -// } -// +// copy_response [<matcher>] [<status>] { +// status <status> +// } func (h *CopyResponseHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1178,11 +1176,10 @@ func parseCopyResponseHeadersCaddyfile(h httpcaddyfile.Helper) (caddyhttp.Middle // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// copy_response_headers [<matcher>] { -// include <fields...> -// exclude <fields...> -// } -// +// copy_response_headers [<matcher>] { +// include <fields...> +// exclude <fields...> +// } func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1208,16 +1205,15 @@ func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic srv [<name>] { -// service <service> -// proto <proto> -// name <name> -// refresh <interval> -// resolvers <resolvers...> -// dial_timeout <timeout> -// dial_fallback_delay <timeout> -// } -// +// dynamic srv [<name>] { +// service <service> +// proto <proto> +// name <name> +// refresh <interval> +// resolvers <resolvers...> +// dial_timeout <timeout> +// dial_fallback_delay <timeout> +// } func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1307,15 +1303,14 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic a [<name> <port] { -// name <name> -// port <port> -// refresh <interval> -// resolvers <resolvers...> -// dial_timeout <timeout> -// dial_fallback_delay <timeout> -// } -// +// dynamic a [<name> <port] { +// name <name> +// port <port> +// refresh <interval> +// resolvers <resolvers...> +// dial_timeout <timeout> +// dial_fallback_delay <timeout> +// } func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1324,7 +1319,9 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } if len(args) > 0 { u.Name = args[0] - u.Port = args[1] + if len(args) == 2 { + u.Port = args[1] + } } for d.NextBlock(0) { @@ -1395,6 +1392,35 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return nil } +// UnmarshalCaddyfile deserializes Caddyfile tokens into h. +// +// dynamic multi { +// <source> [...] +// } +func (u *MultiUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + for d.Next() { + if d.NextArg() { + return d.ArgErr() + } + + for nesting := d.Nesting(); d.NextBlock(nesting); { + dynModule := d.Val() + modID := "http.reverse_proxy.upstreams." + dynModule + unm, err := caddyfile.UnmarshalModule(d, modID) + if err != nil { + return err + } + source, ok := unm.(UpstreamSource) + if !ok { + return d.Errf("module %s (%T) is not an UpstreamSource", modID, unm) + } + u.SourcesRaw = append(u.SourcesRaw, caddyconfig.JSONModuleObject(source, "source", dynModule, nil)) + } + } + + return nil +} + const matcherPrefix = "@" // Interface guards @@ -1403,4 +1429,5 @@ var ( _ caddyfile.Unmarshaler = (*HTTPTransport)(nil) _ caddyfile.Unmarshaler = (*SRVUpstreams)(nil) _ caddyfile.Unmarshaler = (*AUpstreams)(nil) + _ caddyfile.Unmarshaler = (*MultiUpstreams)(nil) ) diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index f788dad..f49bb28 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -2,6 +2,7 @@ package reverseproxy import ( "context" + "encoding/json" "fmt" weakrand "math/rand" "net" @@ -18,6 +19,7 @@ import ( func init() { caddy.RegisterModule(SRVUpstreams{}) caddy.RegisterModule(AUpstreams{}) + caddy.RegisterModule(MultiUpstreams{}) } // SRVUpstreams provides upstreams from SRV lookups. @@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool { return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh) } -var ( - srvs = make(map[string]srvLookup) - srvsMu sync.RWMutex -) - // AUpstreams provides upstreams from A/AAAA lookups. // Results are cached and refreshed at the configured // refresh interval. @@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool { return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } +// MultiUpstreams is a single dynamic upstream source that +// aggregates the results of multiple dynamic upstream sources. +// All configured sources will be queried in order, with their +// results appended to the end of the list. Errors returned +// from individual sources will be logged and the next source +// will continue to be invoked. +// +// This module makes it easy to implement redundant cluster +// failovers, especially in conjunction with the `first` load +// balancing policy: if the first source returns an error or +// no upstreams, the second source's upstreams will be used +// naturally. +type MultiUpstreams struct { + // The list of upstream source modules to get upstreams from. + // They will be queried in order, with their results appended + // in the order they are returned. + SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + sources []UpstreamSource + + logger *zap.Logger +} + +// CaddyModule returns the Caddy module information. +func (MultiUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.multi", + New: func() caddy.Module { return new(MultiUpstreams) }, + } +} + +func (mu *MultiUpstreams) Provision(ctx caddy.Context) error { + mu.logger = ctx.Logger(mu) + + if mu.SourcesRaw != nil { + mod, err := ctx.LoadModule(mu, "SourcesRaw") + if err != nil { + return fmt.Errorf("loading upstream source modules: %v", err) + } + for _, src := range mod.([]any) { + mu.sources = append(mu.sources, src.(UpstreamSource)) + } + } + + return nil +} + +func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + var upstreams []*Upstream + + for i, src := range mu.sources { + select { + case <-r.Context().Done(): + return upstreams, context.Canceled + default: + } + + up, err := src.GetUpstreams(r) + if err != nil { + mu.logger.Error("upstream source returned error", + zap.Int("source_idx", i), + zap.Error(err)) + } else if len(up) == 0 { + mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i)) + } else { + upstreams = append(upstreams, up...) + } + } + + return upstreams, nil +} + // UpstreamResolver holds the set of addresses of DNS resolvers of // upstream addresses type UpstreamResolver struct { @@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error { } var ( + srvs = make(map[string]srvLookup) + srvsMu sync.RWMutex + aAaaa = make(map[string]aLookup) aAaaaMu sync.RWMutex ) |