From ccfb12347b1d2f65b279352116527df430e0fba6 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Tue, 3 Sep 2019 12:10:11 -0600 Subject: reverse_proxy: Implement active health checks --- modules/caddyhttp/reverseproxy/healthchecks.go | 220 +++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 modules/caddyhttp/reverseproxy/healthchecks.go (limited to 'modules/caddyhttp/reverseproxy/healthchecks.go') diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go new file mode 100644 index 0000000..96649a4 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -0,0 +1,220 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reverseproxy + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "regexp" + "strconv" + "time" + + "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/caddy/v2/modules/caddyhttp" +) + +type HealthChecks struct { + Active *ActiveHealthChecks `json:"active,omitempty"` + Passive *PassiveHealthChecks `json:"passive,omitempty"` +} + +type ActiveHealthChecks struct { + Path string `json:"path,omitempty"` + Port int `json:"port,omitempty"` + Interval caddy.Duration `json:"interval,omitempty"` + Timeout caddy.Duration `json:"timeout,omitempty"` + MaxSize int64 `json:"max_size,omitempty"` + ExpectStatus int `json:"expect_status,omitempty"` + ExpectBody string `json:"expect_body,omitempty"` + + stopChan chan struct{} + httpClient *http.Client + bodyRegexp *regexp.Regexp +} + +type PassiveHealthChecks struct { + MaxFails int `json:"max_fails,omitempty"` + FailDuration caddy.Duration `json:"fail_duration,omitempty"` + UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"` + UnhealthyStatus []int `json:"unhealthy_status,omitempty"` + UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"` +} + +func (h *Handler) activeHealthChecker() { + ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) + h.doActiveHealthChecksForAllHosts() + for { + select { + case <-ticker.C: + h.doActiveHealthChecksForAllHosts() + case <-h.HealthChecks.Active.stopChan: + ticker.Stop() + return + } + } +} + +func (h *Handler) doActiveHealthChecksForAllHosts() { + hosts.Range(func(key, value interface{}) bool { + addr := key.(string) + host := value.(Host) + + go func(addr string, host Host) { + err := h.doActiveHealthCheck(addr, host) + if err != nil { + log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", addr, err) + } + }(addr, host) + + // continue to iterate all hosts + return true + }) +} + +// doActiveHealthCheck performs a health check to host which +// can be reached at address hostAddr. The actual address for +// the request will be built according to active health checker +// config. The health status of the host will be updated +// according to whether it passes the health check. An error is +// returned only if the health check fails to occur or if marking +// the host's health status fails. +func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error { + // create the URL for the health check + u, err := url.Parse(hostAddr) + if err != nil { + return err + } + if h.HealthChecks.Active.Path != "" { + u.Path = h.HealthChecks.Active.Path + } + if h.HealthChecks.Active.Port != 0 { + portStr := strconv.Itoa(h.HealthChecks.Active.Port) + u.Host = net.JoinHostPort(u.Hostname(), portStr) + } + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + + // do the request, careful to tame the response body + resp, err := h.HealthChecks.Active.httpClient.Do(req) + if err != nil { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (HTTP request failed: %v)", hostAddr, err) + _, err2 := host.SetHealthy(false) + if err2 != nil { + return fmt.Errorf("marking unhealthy: %v", err2) + } + return nil + } + var body io.Reader = resp.Body + if h.HealthChecks.Active.MaxSize > 0 { + body = io.LimitReader(body, h.HealthChecks.Active.MaxSize) + } + defer func() { + // drain any remaining body so connection can be re-used + io.Copy(ioutil.Discard, body) + resp.Body.Close() + }() + + // if status code is outside criteria, mark down + if h.HealthChecks.Active.ExpectStatus > 0 { + if !caddyhttp.StatusCodeMatches(resp.StatusCode, h.HealthChecks.Active.ExpectStatus) { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d unexpected)", hostAddr, resp.StatusCode) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d out of tolerances)", hostAddr, resp.StatusCode) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + + // if body does not match regex, mark down + if h.HealthChecks.Active.bodyRegexp != nil { + bodyBytes, err := ioutil.ReadAll(body) + if err != nil { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (failed to read response body)", hostAddr) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (response body failed expectations)", hostAddr) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + } + + // passed health check parameters, so mark as healthy + swapped, err := host.SetHealthy(true) + if swapped { + log.Printf("[INFO] reverse_proxy: active health check: %s is back up", hostAddr) + } + if err != nil { + return fmt.Errorf("marking healthy: %v", err) + } + + return nil +} + +// countFailure is used with passive health checks. It +// remembers 1 failure for upstream for the configured +// duration. If passive health checks are disabled or +// failure expiry is 0, this is a no-op. +func (h Handler) countFailure(upstream *Upstream) { + // only count failures if passive health checking is enabled + // and if failures are configured have a non-zero expiry + if h.HealthChecks == nil || h.HealthChecks.Passive == nil { + return + } + failDuration := time.Duration(h.HealthChecks.Passive.FailDuration) + if failDuration == 0 { + return + } + + // count failure immediately + err := upstream.Host.CountFail(1) + if err != nil { + log.Printf("[ERROR] proxy: upstream %s: counting failure: %v", + upstream.hostURL, err) + } + + // forget it later + go func(host Host, failDuration time.Duration) { + time.Sleep(failDuration) + err := host.CountFail(-1) + if err != nil { + log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v", + upstream.hostURL, err) + } + }(upstream.Host, failDuration) +} -- cgit v1.2.3 From 652460e03e11a037d9f86b09b3546c9e42733d2d Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Tue, 3 Sep 2019 16:56:09 -0600 Subject: Some cleanup and godoc --- modules/caddyhttp/reverseproxy/healthchecks.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'modules/caddyhttp/reverseproxy/healthchecks.go') diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 96649a4..0b46d04 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -30,11 +30,15 @@ import ( "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) +// HealthChecks holds configuration related to health checking. type HealthChecks struct { Active *ActiveHealthChecks `json:"active,omitempty"` Passive *PassiveHealthChecks `json:"passive,omitempty"` } +// ActiveHealthChecks holds configuration related to active +// health checks (that is, health checks which occur in a +// background goroutine independently). type ActiveHealthChecks struct { Path string `json:"path,omitempty"` Port int `json:"port,omitempty"` @@ -49,6 +53,9 @@ type ActiveHealthChecks struct { bodyRegexp *regexp.Regexp } +// PassiveHealthChecks holds configuration related to passive +// health checks (that is, health checks which occur during +// the normal flow of request proxying). type PassiveHealthChecks struct { MaxFails int `json:"max_fails,omitempty"` FailDuration caddy.Duration `json:"fail_duration,omitempty"` @@ -57,6 +64,9 @@ type PassiveHealthChecks struct { UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"` } +// activeHealthChecker runs active health checks on a +// regular basis and blocks until +// h.HealthChecks.Active.stopChan is closed. func (h *Handler) activeHealthChecker() { ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) h.doActiveHealthChecksForAllHosts() @@ -71,6 +81,8 @@ func (h *Handler) activeHealthChecker() { } } +// doActiveHealthChecksForAllHosts immediately performs a +// health checks for all hosts in the global repository. func (h *Handler) doActiveHealthChecksForAllHosts() { hosts.Range(func(key, value interface{}) bool { addr := key.(string) -- cgit v1.2.3 From acb8f0e0c26acd95cbee8981469b4ac62535d164 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Tue, 3 Sep 2019 19:06:54 -0600 Subject: Integrate circuit breaker modules with reverse proxy --- modules/caddyhttp/reverseproxy/healthchecks.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'modules/caddyhttp/reverseproxy/healthchecks.go') diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 0b46d04..673f7c4 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -64,6 +64,14 @@ type PassiveHealthChecks struct { UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"` } +// CircuitBreaker is a type that can act as an early-warning +// system for the health checker when backends are getting +// overloaded. +type CircuitBreaker interface { + OK() bool + RecordMetric(statusCode int, latency time.Duration) +} + // activeHealthChecker runs active health checks on a // regular basis and blocks until // h.HealthChecks.Active.stopChan is closed. @@ -202,7 +210,7 @@ func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error { // remembers 1 failure for upstream for the configured // duration. If passive health checks are disabled or // failure expiry is 0, this is a no-op. -func (h Handler) countFailure(upstream *Upstream) { +func (h *Handler) countFailure(upstream *Upstream) { // only count failures if passive health checking is enabled // and if failures are configured have a non-zero expiry if h.HealthChecks == nil || h.HealthChecks.Passive == nil { -- cgit v1.2.3 From 0830fbad0347ead1dbea60e664556b263e44653f Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Thu, 5 Sep 2019 13:14:39 -0600 Subject: Reconcile upstream dial addresses and request host/URL information My goodness that was complicated Blessed be request.Context Sort of --- modules/caddyhttp/reverseproxy/healthchecks.go | 68 +++++++++++++++++++------- 1 file changed, 49 insertions(+), 19 deletions(-) (limited to 'modules/caddyhttp/reverseproxy/healthchecks.go') diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 673f7c4..abe0f9c 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -15,6 +15,7 @@ package reverseproxy import ( + "context" "fmt" "io" "io/ioutil" @@ -93,15 +94,31 @@ func (h *Handler) activeHealthChecker() { // health checks for all hosts in the global repository. func (h *Handler) doActiveHealthChecksForAllHosts() { hosts.Range(func(key, value interface{}) bool { - addr := key.(string) + networkAddr := key.(string) host := value.(Host) - go func(addr string, host Host) { - err := h.doActiveHealthCheck(addr, host) + go func(networkAddr string, host Host) { + network, addrs, err := caddy.ParseNetworkAddress(networkAddr) if err != nil { - log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", addr, err) + log.Printf("[ERROR] reverse_proxy: active health check for host %s: bad network address: %v", networkAddr, err) + return } - }(addr, host) + if len(addrs) != 1 { + log.Printf("[ERROR] reverse_proxy: active health check for host %s: multiple addresses (upstream must map to only one address)", networkAddr) + return + } + hostAddr := addrs[0] + if network == "unix" || network == "unixgram" || network == "unixpacket" { + // this will be used as the Host portion of a http.Request URL, and + // paths to socket files would produce an error when creating URL, + // so use a fake Host value instead + hostAddr = network + } + err = h.doActiveHealthCheck(DialInfo{network, addrs[0]}, hostAddr, host) + if err != nil { + log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", networkAddr, err) + } + }(networkAddr, host) // continue to iterate all hosts return true @@ -115,26 +132,39 @@ func (h *Handler) doActiveHealthChecksForAllHosts() { // according to whether it passes the health check. An error is // returned only if the health check fails to occur or if marking // the host's health status fails. -func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error { - // create the URL for the health check - u, err := url.Parse(hostAddr) - if err != nil { - return err +func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host Host) error { + // create the URL for the request that acts as a health check + scheme := "http" + if ht, ok := h.Transport.(*http.Transport); ok && ht.TLSClientConfig != nil { + // this is kind of a hacky way to know if we should use HTTPS, but whatever + scheme = "https" } - if h.HealthChecks.Active.Path != "" { - u.Path = h.HealthChecks.Active.Path + u := &url.URL{ + Scheme: scheme, + Host: hostAddr, + Path: h.HealthChecks.Active.Path, } + + // adjust the port, if configured to be different if h.HealthChecks.Active.Port != 0 { portStr := strconv.Itoa(h.HealthChecks.Active.Port) - u.Host = net.JoinHostPort(u.Hostname(), portStr) + host, _, err := net.SplitHostPort(hostAddr) + if err != nil { + host = hostAddr + } + u.Host = net.JoinHostPort(host, portStr) } - req, err := http.NewRequest(http.MethodGet, u.String(), nil) + // attach dialing information to this request + ctx := context.Background() + ctx = context.WithValue(ctx, caddy.ReplacerCtxKey, caddy.NewReplacer()) + ctx = context.WithValue(ctx, DialInfoCtxKey, dialInfo) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { - return err + return fmt.Errorf("making request: %v", err) } - // do the request, careful to tame the response body + // do the request, being careful to tame the response body resp, err := h.HealthChecks.Active.httpClient.Do(req) if err != nil { log.Printf("[INFO] reverse_proxy: active health check: %s is down (HTTP request failed: %v)", hostAddr, err) @@ -149,7 +179,7 @@ func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error { body = io.LimitReader(body, h.HealthChecks.Active.MaxSize) } defer func() { - // drain any remaining body so connection can be re-used + // drain any remaining body so connection could be re-used io.Copy(ioutil.Discard, body) resp.Body.Close() }() @@ -225,7 +255,7 @@ func (h *Handler) countFailure(upstream *Upstream) { err := upstream.Host.CountFail(1) if err != nil { log.Printf("[ERROR] proxy: upstream %s: counting failure: %v", - upstream.hostURL, err) + upstream.dialInfo, err) } // forget it later @@ -234,7 +264,7 @@ func (h *Handler) countFailure(upstream *Upstream) { err := host.CountFail(-1) if err != nil { log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v", - upstream.hostURL, err) + upstream.dialInfo, err) } }(upstream.Host, failDuration) } -- cgit v1.2.3