From 026df7c5cb33331d223afc6a9599274e8c89dfd9 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Mon, 2 Sep 2019 22:01:02 -0600 Subject: reverse_proxy: WIP refactor and support for FastCGI --- modules/caddyhttp/reverseproxy/upstream.go | 450 ----------------------------- 1 file changed, 450 deletions(-) delete mode 100755 modules/caddyhttp/reverseproxy/upstream.go (limited to 'modules/caddyhttp/reverseproxy/upstream.go') diff --git a/modules/caddyhttp/reverseproxy/upstream.go b/modules/caddyhttp/reverseproxy/upstream.go deleted file mode 100755 index 1f0693e..0000000 --- a/modules/caddyhttp/reverseproxy/upstream.go +++ /dev/null @@ -1,450 +0,0 @@ -// Copyright 2015 Matthew Holt and The Caddy Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package reverseproxy implements a load-balanced reverse proxy. -package reverseproxy - -import ( - "context" - "encoding/json" - "fmt" - "math/rand" - "net" - "net/http" - "net/url" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/caddyserver/caddy/v2" - "github.com/caddyserver/caddy/v2/modules/caddyhttp" -) - -// CircuitBreaker defines the functionality of a circuit breaker module. -type CircuitBreaker interface { - Ok() bool - RecordMetric(statusCode int, latency time.Duration) -} - -type noopCircuitBreaker struct{} - -func (ncb noopCircuitBreaker) RecordMetric(statusCode int, latency time.Duration) {} -func (ncb noopCircuitBreaker) Ok() bool { - return true -} - -const ( - // TypeBalanceRoundRobin represents the value to use for configuring a load balanced reverse proxy to use round robin load balancing. - TypeBalanceRoundRobin = iota - - // TypeBalanceRandom represents the value to use for configuring a load balanced reverse proxy to use random load balancing. - TypeBalanceRandom - - // TODO: add random with two choices - - // msgNoHealthyUpstreams is returned if there are no upstreams that are healthy to proxy a request to - msgNoHealthyUpstreams = "No healthy upstreams." - - // by default perform health checks every 30 seconds - defaultHealthCheckDur = time.Second * 30 - - // used when an upstream is unhealthy, health checks can be configured to perform at a faster rate - defaultFastHealthCheckDur = time.Second * 1 -) - -var ( - // defaultTransport is the default transport to use for the reverse proxy. - defaultTransport = &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 5 * time.Second, - }).Dial, - TLSHandshakeTimeout: 5 * time.Second, - } - - // defaultHTTPClient is the default http client to use for the healthchecker. - defaultHTTPClient = &http.Client{ - Timeout: time.Second * 10, - Transport: defaultTransport, - } - - // typeMap maps caddy load balance configuration to the internal representation of the loadbalance algorithm type. - typeMap = map[string]int{ - "round_robin": TypeBalanceRoundRobin, - "random": TypeBalanceRandom, - } -) - -// NewLoadBalancedReverseProxy returns a collection of Upstreams that are to be loadbalanced. -func NewLoadBalancedReverseProxy(lb *LoadBalanced, ctx caddy.Context) error { - // set defaults - if lb.NoHealthyUpstreamsMessage == "" { - lb.NoHealthyUpstreamsMessage = msgNoHealthyUpstreams - } - - if lb.TryInterval == "" { - lb.TryInterval = "20s" - } - - // set request retry interval - ti, err := time.ParseDuration(lb.TryInterval) - if err != nil { - return fmt.Errorf("NewLoadBalancedReverseProxy: %v", err.Error()) - } - lb.tryInterval = ti - - // set load balance algorithm - t, ok := typeMap[lb.LoadBalanceType] - if !ok { - t = TypeBalanceRandom - } - lb.loadBalanceType = t - - // setup each upstream - var us []*upstream - for _, uc := range lb.Upstreams { - // pass the upstream decr and incr methods to keep track of unhealthy nodes - nu, err := newUpstream(uc, lb.decrUnhealthy, lb.incrUnhealthy) - if err != nil { - return err - } - - // setup any configured circuit breakers - var cbModule = "http.handlers.reverse_proxy.circuit_breaker" - var cb CircuitBreaker - - if uc.CircuitBreaker != nil { - if _, err := caddy.GetModule(cbModule); err == nil { - val, err := ctx.LoadModule(cbModule, uc.CircuitBreaker) - if err == nil { - cbv, ok := val.(CircuitBreaker) - if ok { - cb = cbv - } else { - fmt.Printf("\nerr: %v; cannot load circuit_breaker, using noop", err.Error()) - cb = noopCircuitBreaker{} - } - } else { - fmt.Printf("\nerr: %v; cannot load circuit_breaker, using noop", err.Error()) - cb = noopCircuitBreaker{} - } - } else { - fmt.Println("circuit_breaker module not loaded, using noop") - cb = noopCircuitBreaker{} - } - } else { - cb = noopCircuitBreaker{} - } - nu.CB = cb - - // start a healthcheck worker which will periodically check to see if an upstream is healthy - // to proxy requests to. - nu.healthChecker = NewHealthCheckWorker(nu, defaultHealthCheckDur, defaultHTTPClient) - - // TODO :- if path is empty why does this empty the entire Target? - // nu.Target.Path = uc.HealthCheckPath - - nu.healthChecker.ScheduleChecks(nu.Target.String()) - lb.HealthCheckers = append(lb.HealthCheckers, nu.healthChecker) - - us = append(us, nu) - } - - lb.upstreams = us - - return nil -} - -// LoadBalanced represents a collection of upstream hosts that are loadbalanced. It -// contains multiple features like health checking and circuit breaking functionality -// for upstreams. -type LoadBalanced struct { - mu sync.Mutex - numUnhealthy int32 - selectedServer int // used during round robin load balancing - loadBalanceType int - tryInterval time.Duration - upstreams []*upstream - - // The following struct fields are set by caddy configuration. - // TryInterval is the max duration for which request retrys will be performed for a request. - TryInterval string `json:"try_interval,omitempty"` - - // Upstreams are the configs for upstream hosts - Upstreams []*UpstreamConfig `json:"upstreams,omitempty"` - - // LoadBalanceType is the string representation of what loadbalancing algorithm to use. i.e. "random" or "round_robin". - LoadBalanceType string `json:"load_balance_type,omitempty"` - - // NoHealthyUpstreamsMessage is returned as a response when there are no healthy upstreams to loadbalance to. - NoHealthyUpstreamsMessage string `json:"no_healthy_upstreams_message,omitempty"` - - // TODO :- store healthcheckers as package level state where each upstream gets a single healthchecker - // currently a healthchecker is created for each upstream defined, even if a healthchecker was previously created - // for that upstream - HealthCheckers []*HealthChecker `json:"health_checkers,omitempty"` -} - -// Cleanup stops all health checkers on a loadbalanced reverse proxy. -func (lb *LoadBalanced) Cleanup() error { - for _, hc := range lb.HealthCheckers { - hc.Stop() - } - - return nil -} - -// Provision sets up a new loadbalanced reverse proxy. -func (lb *LoadBalanced) Provision(ctx caddy.Context) error { - return NewLoadBalancedReverseProxy(lb, ctx) -} - -// ServeHTTP implements the caddyhttp.MiddlewareHandler interface to -// dispatch an HTTP request to the proper server. -func (lb *LoadBalanced) ServeHTTP(w http.ResponseWriter, r *http.Request, _ caddyhttp.Handler) error { - // ensure requests don't hang if an upstream does not respond or is not eventually healthy - var u *upstream - var done bool - - retryTimer := time.NewTicker(lb.tryInterval) - defer retryTimer.Stop() - - go func() { - select { - case <-retryTimer.C: - done = true - } - }() - - // keep trying to get an available upstream to process the request - for { - switch lb.loadBalanceType { - case TypeBalanceRandom: - u = lb.random() - case TypeBalanceRoundRobin: - u = lb.roundRobin() - } - - // if we can't get an upstream and our retry interval has ended return an error response - if u == nil && done { - w.WriteHeader(http.StatusBadGateway) - fmt.Fprint(w, lb.NoHealthyUpstreamsMessage) - - return fmt.Errorf(msgNoHealthyUpstreams) - } - - // attempt to get an available upstream - if u == nil { - continue - } - - start := time.Now() - - // if we get an error retry until we get a healthy upstream - res, err := u.ReverseProxy.ServeHTTP(w, r) - if err != nil { - if err == context.Canceled { - return nil - } - - continue - } - - // record circuit breaker metrics - go u.CB.RecordMetric(res.StatusCode, time.Now().Sub(start)) - - return nil - } -} - -// incrUnhealthy increments the amount of unhealthy nodes in a loadbalancer. -func (lb *LoadBalanced) incrUnhealthy() { - atomic.AddInt32(&lb.numUnhealthy, 1) -} - -// decrUnhealthy decrements the amount of unhealthy nodes in a loadbalancer. -func (lb *LoadBalanced) decrUnhealthy() { - atomic.AddInt32(&lb.numUnhealthy, -1) -} - -// roundRobin implements a round robin load balancing algorithm to select -// which server to forward requests to. -func (lb *LoadBalanced) roundRobin() *upstream { - if atomic.LoadInt32(&lb.numUnhealthy) == int32(len(lb.upstreams)) { - return nil - } - - selected := lb.upstreams[lb.selectedServer] - - lb.mu.Lock() - lb.selectedServer++ - if lb.selectedServer >= len(lb.upstreams) { - lb.selectedServer = 0 - } - lb.mu.Unlock() - - if selected.IsHealthy() && selected.CB.Ok() { - return selected - } - - return nil -} - -// random implements a random server selector for load balancing. -func (lb *LoadBalanced) random() *upstream { - if atomic.LoadInt32(&lb.numUnhealthy) == int32(len(lb.upstreams)) { - return nil - } - - n := rand.Int() % len(lb.upstreams) - selected := lb.upstreams[n] - - if selected.IsHealthy() && selected.CB.Ok() { - return selected - } - - return nil -} - -// UpstreamConfig represents the config of an upstream. -type UpstreamConfig struct { - // Host is the host name of the upstream server. - Host string `json:"host,omitempty"` - - // FastHealthCheckDuration is the duration for which a health check is performed when a node is considered unhealthy. - FastHealthCheckDuration string `json:"fast_health_check_duration,omitempty"` - - CircuitBreaker json.RawMessage `json:"circuit_breaker,omitempty"` - - // // CircuitBreakerConfig is the config passed to setup a circuit breaker. - // CircuitBreakerConfig *circuitbreaker.Config `json:"circuit_breaker,omitempty"` - circuitbreaker CircuitBreaker - - // HealthCheckDuration is the default duration for which a health check is performed. - HealthCheckDuration string `json:"health_check_duration,omitempty"` - - // HealthCheckPath is the path at the upstream host to use for healthchecks. - HealthCheckPath string `json:"health_check_path,omitempty"` -} - -// upstream represents an upstream host. -type upstream struct { - Healthy int32 // 0 = false, 1 = true - Target *url.URL - ReverseProxy *ReverseProxy - Incr func() - Decr func() - CB CircuitBreaker - healthChecker *HealthChecker - healthCheckDur time.Duration - fastHealthCheckDur time.Duration -} - -// newUpstream returns a new upstream. -func newUpstream(uc *UpstreamConfig, d func(), i func()) (*upstream, error) { - host := strings.TrimSpace(uc.Host) - protoIdx := strings.Index(host, "://") - if protoIdx == -1 || len(host[:protoIdx]) == 0 { - return nil, fmt.Errorf("protocol is required for host") - } - - hostURL, err := url.Parse(host) - if err != nil { - return nil, err - } - - // parse healthcheck durations - hcd, err := time.ParseDuration(uc.HealthCheckDuration) - if err != nil { - hcd = defaultHealthCheckDur - } - - fhcd, err := time.ParseDuration(uc.FastHealthCheckDuration) - if err != nil { - fhcd = defaultFastHealthCheckDur - } - - u := upstream{ - healthCheckDur: hcd, - fastHealthCheckDur: fhcd, - Target: hostURL, - Decr: d, - Incr: i, - Healthy: int32(0), // assume is unhealthy on start - } - - u.ReverseProxy = newReverseProxy(hostURL, u.SetHealthiness) - return &u, nil -} - -// SetHealthiness sets whether an upstream is healthy or not. The health check worker is updated to -// perform checks faster if a node is unhealthy. -func (u *upstream) SetHealthiness(ok bool) { - h := atomic.LoadInt32(&u.Healthy) - var wasHealthy bool - if h == 1 { - wasHealthy = true - } else { - wasHealthy = false - } - - if ok { - u.healthChecker.Ticker = time.NewTicker(u.healthCheckDur) - - if !wasHealthy { - atomic.AddInt32(&u.Healthy, 1) - u.Decr() - } - } else { - u.healthChecker.Ticker = time.NewTicker(u.fastHealthCheckDur) - - if wasHealthy { - atomic.AddInt32(&u.Healthy, -1) - u.Incr() - } - } -} - -// IsHealthy returns whether an Upstream is healthy or not. -func (u *upstream) IsHealthy() bool { - i := atomic.LoadInt32(&u.Healthy) - if i == 1 { - return true - } - - return false -} - -// newReverseProxy returns a new reverse proxy handler. -func newReverseProxy(target *url.URL, setHealthiness func(bool)) *ReverseProxy { - errorHandler := func(w http.ResponseWriter, r *http.Request, err error) { - // we don't need to worry about cancelled contexts since this doesn't necessarilly mean that - // the upstream is unhealthy. - if err != context.Canceled { - setHealthiness(false) - } - } - - rp := NewSingleHostReverseProxy(target) - rp.ErrorHandler = errorHandler - rp.Transport = defaultTransport // use default transport that times out in 5 seconds - return rp -} - -// Interface guards -var ( - _ caddyhttp.MiddlewareHandler = (*LoadBalanced)(nil) - _ caddy.Provisioner = (*LoadBalanced)(nil) - _ caddy.CleanerUpper = (*LoadBalanced)(nil) -) -- cgit v1.2.3