summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--modules/caddyhttp/reverseproxy/healthchecks.go220
-rw-r--r--modules/caddyhttp/reverseproxy/reverseproxy.go155
-rw-r--r--usagepool.go8
3 files changed, 309 insertions, 74 deletions
diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go
new file mode 100644
index 0000000..96649a4
--- /dev/null
+++ b/modules/caddyhttp/reverseproxy/healthchecks.go
@@ -0,0 +1,220 @@
+// Copyright 2015 Matthew Holt and The Caddy Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reverseproxy
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "net/url"
+ "regexp"
+ "strconv"
+ "time"
+
+ "github.com/caddyserver/caddy/v2"
+ "github.com/caddyserver/caddy/v2/modules/caddyhttp"
+)
+
+type HealthChecks struct {
+ Active *ActiveHealthChecks `json:"active,omitempty"`
+ Passive *PassiveHealthChecks `json:"passive,omitempty"`
+}
+
+type ActiveHealthChecks struct {
+ Path string `json:"path,omitempty"`
+ Port int `json:"port,omitempty"`
+ Interval caddy.Duration `json:"interval,omitempty"`
+ Timeout caddy.Duration `json:"timeout,omitempty"`
+ MaxSize int64 `json:"max_size,omitempty"`
+ ExpectStatus int `json:"expect_status,omitempty"`
+ ExpectBody string `json:"expect_body,omitempty"`
+
+ stopChan chan struct{}
+ httpClient *http.Client
+ bodyRegexp *regexp.Regexp
+}
+
+type PassiveHealthChecks struct {
+ MaxFails int `json:"max_fails,omitempty"`
+ FailDuration caddy.Duration `json:"fail_duration,omitempty"`
+ UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"`
+ UnhealthyStatus []int `json:"unhealthy_status,omitempty"`
+ UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"`
+}
+
+func (h *Handler) activeHealthChecker() {
+ ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval))
+ h.doActiveHealthChecksForAllHosts()
+ for {
+ select {
+ case <-ticker.C:
+ h.doActiveHealthChecksForAllHosts()
+ case <-h.HealthChecks.Active.stopChan:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func (h *Handler) doActiveHealthChecksForAllHosts() {
+ hosts.Range(func(key, value interface{}) bool {
+ addr := key.(string)
+ host := value.(Host)
+
+ go func(addr string, host Host) {
+ err := h.doActiveHealthCheck(addr, host)
+ if err != nil {
+ log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", addr, err)
+ }
+ }(addr, host)
+
+ // continue to iterate all hosts
+ return true
+ })
+}
+
+// doActiveHealthCheck performs a health check to host which
+// can be reached at address hostAddr. The actual address for
+// the request will be built according to active health checker
+// config. The health status of the host will be updated
+// according to whether it passes the health check. An error is
+// returned only if the health check fails to occur or if marking
+// the host's health status fails.
+func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error {
+ // create the URL for the health check
+ u, err := url.Parse(hostAddr)
+ if err != nil {
+ return err
+ }
+ if h.HealthChecks.Active.Path != "" {
+ u.Path = h.HealthChecks.Active.Path
+ }
+ if h.HealthChecks.Active.Port != 0 {
+ portStr := strconv.Itoa(h.HealthChecks.Active.Port)
+ u.Host = net.JoinHostPort(u.Hostname(), portStr)
+ }
+
+ req, err := http.NewRequest(http.MethodGet, u.String(), nil)
+ if err != nil {
+ return err
+ }
+
+ // do the request, careful to tame the response body
+ resp, err := h.HealthChecks.Active.httpClient.Do(req)
+ if err != nil {
+ log.Printf("[INFO] reverse_proxy: active health check: %s is down (HTTP request failed: %v)", hostAddr, err)
+ _, err2 := host.SetHealthy(false)
+ if err2 != nil {
+ return fmt.Errorf("marking unhealthy: %v", err2)
+ }
+ return nil
+ }
+ var body io.Reader = resp.Body
+ if h.HealthChecks.Active.MaxSize > 0 {
+ body = io.LimitReader(body, h.HealthChecks.Active.MaxSize)
+ }
+ defer func() {
+ // drain any remaining body so connection can be re-used
+ io.Copy(ioutil.Discard, body)
+ resp.Body.Close()
+ }()
+
+ // if status code is outside criteria, mark down
+ if h.HealthChecks.Active.ExpectStatus > 0 {
+ if !caddyhttp.StatusCodeMatches(resp.StatusCode, h.HealthChecks.Active.ExpectStatus) {
+ log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d unexpected)", hostAddr, resp.StatusCode)
+ _, err := host.SetHealthy(false)
+ if err != nil {
+ return fmt.Errorf("marking unhealthy: %v", err)
+ }
+ return nil
+ }
+ } else if resp.StatusCode < 200 || resp.StatusCode >= 400 {
+ log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d out of tolerances)", hostAddr, resp.StatusCode)
+ _, err := host.SetHealthy(false)
+ if err != nil {
+ return fmt.Errorf("marking unhealthy: %v", err)
+ }
+ return nil
+ }
+
+ // if body does not match regex, mark down
+ if h.HealthChecks.Active.bodyRegexp != nil {
+ bodyBytes, err := ioutil.ReadAll(body)
+ if err != nil {
+ log.Printf("[INFO] reverse_proxy: active health check: %s is down (failed to read response body)", hostAddr)
+ _, err := host.SetHealthy(false)
+ if err != nil {
+ return fmt.Errorf("marking unhealthy: %v", err)
+ }
+ return nil
+ }
+ if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) {
+ log.Printf("[INFO] reverse_proxy: active health check: %s is down (response body failed expectations)", hostAddr)
+ _, err := host.SetHealthy(false)
+ if err != nil {
+ return fmt.Errorf("marking unhealthy: %v", err)
+ }
+ return nil
+ }
+ }
+
+ // passed health check parameters, so mark as healthy
+ swapped, err := host.SetHealthy(true)
+ if swapped {
+ log.Printf("[INFO] reverse_proxy: active health check: %s is back up", hostAddr)
+ }
+ if err != nil {
+ return fmt.Errorf("marking healthy: %v", err)
+ }
+
+ return nil
+}
+
+// countFailure is used with passive health checks. It
+// remembers 1 failure for upstream for the configured
+// duration. If passive health checks are disabled or
+// failure expiry is 0, this is a no-op.
+func (h Handler) countFailure(upstream *Upstream) {
+ // only count failures if passive health checking is enabled
+ // and if failures are configured have a non-zero expiry
+ if h.HealthChecks == nil || h.HealthChecks.Passive == nil {
+ return
+ }
+ failDuration := time.Duration(h.HealthChecks.Passive.FailDuration)
+ if failDuration == 0 {
+ return
+ }
+
+ // count failure immediately
+ err := upstream.Host.CountFail(1)
+ if err != nil {
+ log.Printf("[ERROR] proxy: upstream %s: counting failure: %v",
+ upstream.hostURL, err)
+ }
+
+ // forget it later
+ go func(host Host, failDuration time.Duration) {
+ time.Sleep(failDuration)
+ err := host.CountFail(-1)
+ if err != nil {
+ log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v",
+ upstream.hostURL, err)
+ }
+ }(upstream.Host, failDuration)
+}
diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go
index e312d71..ebf6ac1 100644
--- a/modules/caddyhttp/reverseproxy/reverseproxy.go
+++ b/modules/caddyhttp/reverseproxy/reverseproxy.go
@@ -15,15 +15,14 @@
package reverseproxy
import (
- "bytes"
"context"
"encoding/json"
"fmt"
"io"
- "log"
"net"
"net/http"
"net/url"
+ "regexp"
"strings"
"sync"
"sync/atomic"
@@ -90,11 +89,41 @@ func (h *Handler) Provision(ctx caddy.Context) error {
if h.LoadBalancing.TryDuration > 0 && h.LoadBalancing.TryInterval == 0 {
// a non-zero try_duration with a zero try_interval
// will always spin the CPU for try_duration if the
- // upstream is local or low-latency; default to some
- // sane waiting period before try attempts
+ // upstream is local or low-latency; avoid that by
+ // defaulting to a sane wait period between attempts
h.LoadBalancing.TryInterval = caddy.Duration(250 * time.Millisecond)
}
+ // if active health checks are enabled, configure them and start a worker
+ if h.HealthChecks != nil &&
+ h.HealthChecks.Active != nil &&
+ (h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
+ timeout := time.Duration(h.HealthChecks.Active.Timeout)
+ if timeout == 0 {
+ timeout = 10 * time.Second
+ }
+
+ h.HealthChecks.Active.stopChan = make(chan struct{})
+ h.HealthChecks.Active.httpClient = &http.Client{
+ Timeout: timeout,
+ Transport: h.Transport,
+ }
+
+ if h.HealthChecks.Active.Interval == 0 {
+ h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
+ }
+
+ if h.HealthChecks.Active.ExpectBody != "" {
+ var err error
+ h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
+ if err != nil {
+ return fmt.Errorf("expect_body: compiling regular expression: %v", err)
+ }
+ }
+
+ go h.activeHealthChecker()
+ }
+
for _, upstream := range h.Upstreams {
// url parser requires a scheme
if !strings.Contains(upstream.Address, "://") {
@@ -130,8 +159,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {
upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
}
- // TODO: active health checks
-
if h.HealthChecks != nil {
// upstreams need independent access to the passive
// health check policy so they can, you know, passively
@@ -143,11 +170,20 @@ func (h *Handler) Provision(ctx caddy.Context) error {
return nil
}
+// Cleanup cleans up the resources made by h during provisioning.
func (h *Handler) Cleanup() error {
- // TODO: finish this up, make sure it takes care of any active health checkers or whatever
+ // stop the active health checker
+ if h.HealthChecks != nil &&
+ h.HealthChecks.Active != nil &&
+ h.HealthChecks.Active.stopChan != nil {
+ close(h.HealthChecks.Active.stopChan)
+ }
+
+ // remove hosts from our config from the pool
for _, upstream := range h.Upstreams {
hosts.Delete(upstream.hostURL.String())
}
+
return nil
}
@@ -539,38 +575,6 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er
}
}
-// countFailure remembers 1 failure for upstream for the
-// configured duration. If passive health checks are
-// disabled or failure expiry is 0, this is a no-op.
-func (h Handler) countFailure(upstream *Upstream) {
- // only count failures if passive health checking is enabled
- // and if failures are configured have a non-zero expiry
- if h.HealthChecks == nil || h.HealthChecks.Passive == nil {
- return
- }
- failDuration := time.Duration(h.HealthChecks.Passive.FailDuration)
- if failDuration == 0 {
- return
- }
-
- // count failure immediately
- err := upstream.Host.CountFail(1)
- if err != nil {
- log.Printf("[ERROR] proxy: upstream %s: counting failure: %v",
- upstream.hostURL, err)
- }
-
- // forget it later
- go func(host Host, failDuration time.Duration) {
- time.Sleep(failDuration)
- err := host.CountFail(-1)
- if err != nil {
- log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v",
- upstream.hostURL, err)
- }
- }(upstream.Host, failDuration)
-}
-
type writeFlusher interface {
io.Writer
http.Flusher
@@ -722,29 +726,6 @@ type Selector interface {
Select(HostPool, *http.Request) *Upstream
}
-type HealthChecks struct {
- Active *ActiveHealthChecks `json:"active,omitempty"`
- Passive *PassiveHealthChecks `json:"passive,omitempty"`
-}
-
-type ActiveHealthChecks struct {
- Path string `json:"path,omitempty"`
- Port int `json:"port,omitempty"`
- Interval caddy.Duration `json:"interval,omitempty"`
- Timeout caddy.Duration `json:"timeout,omitempty"`
- MaxSize int `json:"max_size,omitempty"`
- ExpectStatus int `json:"expect_status,omitempty"`
- ExpectBody string `json:"expect_body,omitempty"`
-}
-
-type PassiveHealthChecks struct {
- MaxFails int `json:"max_fails,omitempty"`
- FailDuration caddy.Duration `json:"fail_duration,omitempty"`
- UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"`
- UnhealthyStatus []int `json:"unhealthy_status,omitempty"`
- UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"`
-}
-
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
@@ -762,22 +743,33 @@ var hopHeaders = []string{
"Upgrade",
}
-var bufPool = sync.Pool{
- New: func() interface{} {
- return new(bytes.Buffer)
- },
-}
-
-//////////////////////////////////
-// TODO:
-
+// Host represents a remote host which can be proxied to.
+// Its methods must be safe for concurrent use.
type Host interface {
+ // NumRequests returns the numnber of requests
+ // currently in process with the host.
NumRequests() int
+
+ // Fails returns the count of recent failures.
Fails() int
+
+ // Unhealthy returns true if the backend is unhealthy.
Unhealthy() bool
+ // CountRequest counts the given number of requests
+ // as currently in process with the host. The count
+ // should not go below 0.
CountRequest(int) error
+
+ // CountFail counts the given number of failures
+ // with the host. The count should not go below 0.
CountFail(int) error
+
+ // SetHealthy marks the host as either healthy (true)
+ // or unhealthy (false). If the given status is the
+ // same, this should be a no-op. It returns true if
+ // the given status was different, false otherwise.
+ SetHealthy(bool) (bool, error)
}
type HostPool []*Upstream
@@ -788,13 +780,13 @@ type upstreamHost struct {
unhealthy int32
}
-func (uh upstreamHost) NumRequests() int {
+func (uh *upstreamHost) NumRequests() int {
return int(atomic.LoadInt64(&uh.numRequests))
}
-func (uh upstreamHost) Fails() int {
+func (uh *upstreamHost) Fails() int {
return int(atomic.LoadInt64(&uh.fails))
}
-func (uh upstreamHost) Unhealthy() bool {
+func (uh *upstreamHost) Unhealthy() bool {
return atomic.LoadInt32(&uh.unhealthy) == 1
}
func (uh *upstreamHost) CountRequest(delta int) error {
@@ -811,6 +803,14 @@ func (uh *upstreamHost) CountFail(delta int) error {
}
return nil
}
+func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) {
+ var unhealthy, compare int32 = 1, 0
+ if healthy {
+ unhealthy, compare = 0, 1
+ }
+ swapped := atomic.CompareAndSwapInt32(&uh.unhealthy, compare, unhealthy)
+ return swapped, nil
+}
type Upstream struct {
Host `json:"-"`
@@ -854,6 +854,13 @@ var hosts = caddy.NewUsagePool()
type UpstreamProvider interface {
}
+// TODO: see if we can use this
+// var bufPool = sync.Pool{
+// New: func() interface{} {
+// return new(bytes.Buffer)
+// },
+// }
+
// Interface guards
var (
_ caddyhttp.MiddlewareHandler = (*Handler)(nil)
diff --git a/usagepool.go b/usagepool.go
index 3b8e975..dd4f606 100644
--- a/usagepool.go
+++ b/usagepool.go
@@ -80,6 +80,14 @@ func (up *UsagePool) LoadOrStore(key, val interface{}) (actual interface{}, load
return
}
+// Range iterates the pool the same way sync.Map.Range does.
+// This does not affect usage counts.
+func (up *UsagePool) Range(f func(key, value interface{}) bool) {
+ up.pool.Range(func(key, value interface{}) bool {
+ return f(key, value.(*usagePoolVal).value)
+ })
+}
+
type usagePoolVal struct {
usage int32 // accessed atomically; must be 64-bit aligned for 32-bit systems
value interface{}