summaryrefslogtreecommitdiff
path: root/modules/caddyhttp/reverseproxy/reverseproxy.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/caddyhttp/reverseproxy/reverseproxy.go')
-rw-r--r--modules/caddyhttp/reverseproxy/reverseproxy.go275
1 files changed, 128 insertions, 147 deletions
diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go
index 1449785..08be40d 100644
--- a/modules/caddyhttp/reverseproxy/reverseproxy.go
+++ b/modules/caddyhttp/reverseproxy/reverseproxy.go
@@ -27,30 +27,23 @@ import (
"net/netip"
"net/textproto"
"net/url"
- "regexp"
- "runtime"
"strconv"
"strings"
"sync"
"time"
+ "go.uber.org/zap"
+ "golang.org/x/net/http/httpguts"
+
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
"github.com/caddyserver/caddy/v2/modules/caddyevents"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/caddy/v2/modules/caddyhttp/headers"
"github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite"
- "go.uber.org/zap"
- "golang.org/x/net/http/httpguts"
)
-var supports1xx bool
-
func init() {
- // Caddy requires at least Go 1.18, but Early Hints requires Go 1.19; thus we can simply check for 1.18 in version string
- // TODO: remove this once our minimum Go version is 1.19
- supports1xx = !strings.Contains(runtime.Version(), "go1.18")
-
caddy.RegisterModule(Handler{})
}
@@ -158,6 +151,19 @@ type Handler struct {
// could be useful if the backend has tighter memory constraints.
ResponseBuffers int64 `json:"response_buffers,omitempty"`
+ // If nonzero, streaming requests such as WebSockets will be
+ // forcibly closed at the end of the timeout. Default: no timeout.
+ StreamTimeout caddy.Duration `json:"stream_timeout,omitempty"`
+
+ // If nonzero, streaming requests such as WebSockets will not be
+ // closed when the proxy config is unloaded, and instead the stream
+ // will remain open until the delay is complete. In other words,
+ // enabling this prevents streams from closing when Caddy's config
+ // is reloaded. Enabling this may be a good idea to avoid a thundering
+ // herd of reconnecting clients which had their connections closed
+ // by the previous config closing. Default: no delay.
+ StreamCloseDelay caddy.Duration `json:"stream_close_delay,omitempty"`
+
// If configured, rewrites the copy of the upstream request.
// Allows changing the request method and URI (path and query).
// Since the rewrite is applied to the copy, it does not persist
@@ -185,6 +191,13 @@ type Handler struct {
// - `{http.reverse_proxy.header.*}` The headers from the response
HandleResponse []caddyhttp.ResponseHandler `json:"handle_response,omitempty"`
+ // If set, the proxy will write very detailed logs about its
+ // inner workings. Enable this only when debugging, as it
+ // will produce a lot of output.
+ //
+ // EXPERIMENTAL: This feature is subject to change or removal.
+ VerboseLogs bool `json:"verbose_logs,omitempty"`
+
Transport http.RoundTripper `json:"-"`
CB CircuitBreaker `json:"-"`
DynamicUpstreams UpstreamSource `json:"-"`
@@ -199,8 +212,9 @@ type Handler struct {
handleResponseSegments []*caddyfile.Dispenser
// Stores upgraded requests (hijacked connections) for proper cleanup
- connections map[io.ReadWriteCloser]openConnection
- connectionsMu *sync.Mutex
+ connections map[io.ReadWriteCloser]openConnection
+ connectionsCloseTimer *time.Timer
+ connectionsMu *sync.Mutex
ctx caddy.Context
logger *zap.Logger
@@ -243,20 +257,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.logger.Warn("UNLIMITED BUFFERING: buffering is enabled without any cap on buffer size, which can result in OOM crashes")
}
- // verify SRV compatibility - TODO: LookupSRV deprecated; will be removed
- for i, v := range h.Upstreams {
- if v.LookupSRV == "" {
- continue
- }
- h.logger.Warn("DEPRECATED: lookup_srv: will be removed in a near-future version of Caddy; use the http.reverse_proxy.upstreams.srv module instead")
- if h.HealthChecks != nil && h.HealthChecks.Active != nil {
- return fmt.Errorf(`upstream: lookup_srv is incompatible with active health checks: %d: {"dial": %q, "lookup_srv": %q}`, i, v.Dial, v.LookupSRV)
- }
- if v.Dial != "" {
- return fmt.Errorf(`upstream: specifying dial address is incompatible with lookup_srv: %d: {"dial": %q, "lookup_srv": %q}`, i, v.Dial, v.LookupSRV)
- }
- }
-
// start by loading modules
if h.TransportRaw != nil {
mod, err := ctx.LoadModule(h, "TransportRaw")
@@ -363,62 +363,22 @@ func (h *Handler) Provision(ctx caddy.Context) error {
if h.HealthChecks != nil {
// set defaults on passive health checks, if necessary
if h.HealthChecks.Passive != nil {
- if h.HealthChecks.Passive.FailDuration > 0 && h.HealthChecks.Passive.MaxFails == 0 {
+ h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive")
+ if h.HealthChecks.Passive.MaxFails == 0 {
h.HealthChecks.Passive.MaxFails = 1
}
}
// if active health checks are enabled, configure them and start a worker
- if h.HealthChecks.Active != nil && (h.HealthChecks.Active.Path != "" ||
- h.HealthChecks.Active.URI != "" ||
- h.HealthChecks.Active.Port != 0) {
-
- h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")
-
- timeout := time.Duration(h.HealthChecks.Active.Timeout)
- if timeout == 0 {
- timeout = 5 * time.Second
- }
-
- if h.HealthChecks.Active.Path != "" {
- h.HealthChecks.Active.logger.Warn("the 'path' option is deprecated, please use 'uri' instead!")
- }
-
- // parse the URI string (supports path and query)
- if h.HealthChecks.Active.URI != "" {
- parsedURI, err := url.Parse(h.HealthChecks.Active.URI)
- if err != nil {
- return err
- }
- h.HealthChecks.Active.uri = parsedURI
- }
-
- h.HealthChecks.Active.httpClient = &http.Client{
- Timeout: timeout,
- Transport: h.Transport,
- }
-
- for _, upstream := range h.Upstreams {
- // if there's an alternative port for health-check provided in the config,
- // then use it, otherwise use the port of upstream.
- if h.HealthChecks.Active.Port != 0 {
- upstream.activeHealthCheckPort = h.HealthChecks.Active.Port
- }
+ if h.HealthChecks.Active != nil {
+ err := h.HealthChecks.Active.Provision(ctx, h)
+ if err != nil {
+ return err
}
- if h.HealthChecks.Active.Interval == 0 {
- h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
+ if h.HealthChecks.Active.IsEnabled() {
+ go h.activeHealthChecker()
}
-
- if h.HealthChecks.Active.ExpectBody != "" {
- var err error
- h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
- if err != nil {
- return fmt.Errorf("expect_body: compiling regular expression: %v", err)
- }
- }
-
- go h.activeHealthChecker()
}
}
@@ -438,25 +398,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
// Cleanup cleans up the resources made by h.
func (h *Handler) Cleanup() error {
- // close hijacked connections (both to client and backend)
- var err error
- h.connectionsMu.Lock()
- for _, oc := range h.connections {
- if oc.gracefulClose != nil {
- // this is potentially blocking while we have the lock on the connections
- // map, but that should be OK since the server has in theory shut down
- // and we are no longer using the connections map
- gracefulErr := oc.gracefulClose()
- if gracefulErr != nil && err == nil {
- err = gracefulErr
- }
- }
- closeErr := oc.conn.Close()
- if closeErr != nil && err == nil {
- err = closeErr
- }
- }
- h.connectionsMu.Unlock()
+ err := h.cleanupConnections()
// remove hosts from our config from the pool
for _, upstream := range h.Upstreams {
@@ -517,7 +459,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
// It returns true when the loop is done and should break; false otherwise. The error value returned should
// be assigned to the proxyErr value for the next iteration of the loop (or the error handled after break).
func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w http.ResponseWriter, proxyErr error, start time.Time, retries int,
- repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler) (bool, error) {
+ repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler,
+) (bool, error) {
// get the updated list of upstreams
upstreams := h.Upstreams
if h.DynamicUpstreams != nil {
@@ -544,7 +487,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w)
if upstream == nil {
if proxyErr == nil {
- proxyErr = caddyhttp.Error(http.StatusServiceUnavailable, fmt.Errorf("no upstreams available"))
+ proxyErr = caddyhttp.Error(http.StatusServiceUnavailable, noUpstreamsAvailable)
}
if !h.LoadBalancing.tryAgain(h.ctx, start, retries, proxyErr, r) {
return true, proxyErr
@@ -646,7 +589,8 @@ func (h Handler) prepareRequest(req *http.Request, repl *caddy.Replacer) (*http.
// feature if absolutely required, if read timeouts are
// set, and if body size is limited
if h.RequestBuffers != 0 && req.Body != nil {
- req.Body, _ = h.bufferedBody(req.Body, h.RequestBuffers)
+ req.Body, req.ContentLength = h.bufferedBody(req.Body, h.RequestBuffers)
+ req.Header.Set("Content-Length", strconv.FormatInt(req.ContentLength, 10))
}
if req.ContentLength == 0 {
@@ -687,8 +631,24 @@ func (h Handler) prepareRequest(req *http.Request, repl *caddy.Replacer) (*http.
req.Header.Set("Upgrade", reqUpType)
}
+ // Set up the PROXY protocol info
+ address := caddyhttp.GetVar(req.Context(), caddyhttp.ClientIPVarKey).(string)
+ addrPort, err := netip.ParseAddrPort(address)
+ if err != nil {
+ // OK; probably didn't have a port
+ addr, err := netip.ParseAddr(address)
+ if err != nil {
+ // Doesn't seem like a valid ip address at all
+ } else {
+ // Ok, only the port was missing
+ addrPort = netip.AddrPortFrom(addr, 0)
+ }
+ }
+ proxyProtocolInfo := ProxyProtocolInfo{AddrPort: addrPort}
+ caddyhttp.SetVar(req.Context(), proxyProtocolInfoVarKey, proxyProtocolInfo)
+
// Add the supported X-Forwarded-* headers
- err := h.addForwardedHeaders(req)
+ err = h.addForwardedHeaders(req)
if err != nil {
return nil, err
}
@@ -795,25 +755,23 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
server := req.Context().Value(caddyhttp.ServerCtxKey).(*caddyhttp.Server)
shouldLogCredentials := server.Logs != nil && server.Logs.ShouldLogCredentials
- if supports1xx {
- // Forward 1xx status codes, backported from https://github.com/golang/go/pull/53164
- trace := &httptrace.ClientTrace{
- Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
- h := rw.Header()
- copyHeader(h, http.Header(header))
- rw.WriteHeader(code)
-
- // Clear headers coming from the backend
- // (it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses)
- for k := range header {
- delete(h, k)
- }
+ // Forward 1xx status codes, backported from https://github.com/golang/go/pull/53164
+ trace := &httptrace.ClientTrace{
+ Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
+ h := rw.Header()
+ copyHeader(h, http.Header(header))
+ rw.WriteHeader(code)
+
+ // Clear headers coming from the backend
+ // (it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses)
+ for k := range header {
+ delete(h, k)
+ }
- return nil
- },
- }
- req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
+ return nil
+ },
}
+ req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
// if FlushInterval is explicitly configured to -1 (i.e. flush continuously to achieve
// low-latency streaming), don't let the transport cancel the request if the client
@@ -821,7 +779,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
// regardless, and we should expect client disconnection in low-latency streaming
// scenarios (see issue #4922)
if h.FlushInterval == -1 {
- req = req.WithContext(ignoreClientGoneContext{req.Context(), h.ctx.Done()})
+ req = req.WithContext(ignoreClientGoneContext{req.Context()})
}
// do the round-trip; emit debug log with values we know are
@@ -897,12 +855,6 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
break
}
- // otherwise, if there are any routes configured, execute those as the
- // actual response instead of what we got from the proxy backend
- if len(rh.Routes) == 0 {
- continue
- }
-
// set up the replacer so that parts of the original response can be
// used for routing decisions
for field, value := range res.Header {
@@ -911,7 +863,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
repl.Set("http.reverse_proxy.status_code", res.StatusCode)
repl.Set("http.reverse_proxy.status_text", res.Status)
- h.logger.Debug("handling response", zap.Int("handler", i))
+ logger.Debug("handling response", zap.Int("handler", i))
// we make some data available via request context to child routes
// so that they may inherit some options and functions from the
@@ -956,7 +908,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
}
// finalizeResponse prepares and copies the response.
-func (h Handler) finalizeResponse(
+func (h *Handler) finalizeResponse(
rw http.ResponseWriter,
req *http.Request,
res *http.Response,
@@ -998,15 +950,21 @@ func (h Handler) finalizeResponse(
}
rw.WriteHeader(res.StatusCode)
+ if h.VerboseLogs {
+ logger.Debug("wrote header")
+ }
- err := h.copyResponse(rw, res.Body, h.flushInterval(req, res))
- res.Body.Close() // close now, instead of defer, to populate res.Trailer
+ err := h.copyResponse(rw, res.Body, h.flushInterval(req, res), logger)
+ errClose := res.Body.Close() // close now, instead of defer, to populate res.Trailer
+ if h.VerboseLogs || errClose != nil {
+ logger.Debug("closed response body from upstream", zap.Error(errClose))
+ }
if err != nil {
// we're streaming the response and we've already written headers, so
// there's nothing an error handler can do to recover at this point;
// the standard lib's proxy panics at this point, but we'll just log
// the error and abort the stream here
- h.logger.Error("aborting with incomplete response", zap.Error(err))
+ logger.Error("aborting with incomplete response", zap.Error(err))
return nil
}
@@ -1014,9 +972,8 @@ func (h Handler) finalizeResponse(
// Force chunking if we saw a response trailer.
// This prevents net/http from calculating the length for short
// bodies and adding a Content-Length.
- if fl, ok := rw.(http.Flusher); ok {
- fl.Flush()
- }
+ //nolint:bodyclose
+ http.NewResponseController(rw).Flush()
}
// total duration spent proxying, including writing response body
@@ -1035,6 +992,10 @@ func (h Handler) finalizeResponse(
}
}
+ if h.VerboseLogs {
+ logger.Debug("response finalized")
+ }
+
return nil
}
@@ -1066,17 +1027,23 @@ func (lb LoadBalancing) tryAgain(ctx caddy.Context, start time.Time, retries int
// should be safe to retry, since without a connection, no
// HTTP request can be transmitted; but if the error is not
// specifically a dialer error, we need to be careful
- if _, ok := proxyErr.(DialError); proxyErr != nil && !ok {
+ if proxyErr != nil {
+ _, isDialError := proxyErr.(DialError)
+ herr, isHandlerError := proxyErr.(caddyhttp.HandlerError)
+
// if the error occurred after a connection was established,
// we have to assume the upstream received the request, and
// retries need to be carefully decided, because some requests
// are not idempotent
- if lb.RetryMatch == nil && req.Method != "GET" {
- // by default, don't retry requests if they aren't GET
- return false
- }
- if !lb.RetryMatch.AnyMatch(req) {
- return false
+ if !isDialError && !(isHandlerError && errors.Is(herr, noUpstreamsAvailable)) {
+ if lb.RetryMatch == nil && req.Method != "GET" {
+ // by default, don't retry requests if they aren't GET
+ return false
+ }
+
+ if !lb.RetryMatch.AnyMatch(req) {
+ return false
+ }
}
}
@@ -1128,12 +1095,11 @@ func (h Handler) provisionUpstream(upstream *Upstream) {
// without MaxRequests), copy the value into this upstream, since the
// value in the upstream (MaxRequests) is what is used during
// availability checks
- if h.HealthChecks != nil && h.HealthChecks.Passive != nil {
- h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive")
- if h.HealthChecks.Passive.UnhealthyRequestCount > 0 &&
- upstream.MaxRequests == 0 {
- upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
- }
+ if h.HealthChecks != nil &&
+ h.HealthChecks.Passive != nil &&
+ h.HealthChecks.Passive.UnhealthyRequestCount > 0 &&
+ upstream.MaxRequests == 0 {
+ upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
}
// upstreams need independent access to the passive
@@ -1450,21 +1416,36 @@ type handleResponseContext struct {
// ignoreClientGoneContext is a special context.Context type
// intended for use when doing a RoundTrip where you don't
// want a client disconnection to cancel the request during
-// the roundtrip. Set its done field to a Done() channel
-// of a context that doesn't get canceled when the client
-// disconnects, such as caddy.Context.Done() instead.
+// the roundtrip.
+// This context clears cancellation, error, and deadline methods,
+// but still allows values to pass through from its embedded
+// context.
+//
+// TODO: This can be replaced with context.WithoutCancel once
+// the minimum required version of Go is 1.21.
type ignoreClientGoneContext struct {
context.Context
- done <-chan struct{}
}
-func (c ignoreClientGoneContext) Done() <-chan struct{} { return c.done }
+func (c ignoreClientGoneContext) Deadline() (deadline time.Time, ok bool) {
+ return
+}
+
+func (c ignoreClientGoneContext) Done() <-chan struct{} {
+ return nil
+}
+
+func (c ignoreClientGoneContext) Err() error {
+ return nil
+}
// proxyHandleResponseContextCtxKey is the context key for the active proxy handler
// so that handle_response routes can inherit some config options
// from the proxy handler.
const proxyHandleResponseContextCtxKey caddy.CtxKey = "reverse_proxy_handle_response_context"
+var noUpstreamsAvailable = fmt.Errorf("no upstreams available")
+
// Interface guards
var (
_ caddy.Provisioner = (*Handler)(nil)