From 361946eb0c08791ad16ebc3e82a79512895e650f Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Tue, 20 Jun 2023 10:42:58 -0700 Subject: reverseproxy: weighted_round_robin load balancing policy (#5579) * added weighted round robin algorithm to load balancer * added an adapt integration test for wrr and fixed a typo * changed args format to Caddyfile args convention * added provisioner and validator for wrr * simplified the code and improved doc --- .../caddyhttp/reverseproxy/selectionpolicies.go | 91 +++++++++++++++++++++- .../reverseproxy/selectionpolicies_test.go | 57 ++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) (limited to 'modules') diff --git a/modules/caddyhttp/reverseproxy/selectionpolicies.go b/modules/caddyhttp/reverseproxy/selectionpolicies.go index 35fb143..f89c48f 100644 --- a/modules/caddyhttp/reverseproxy/selectionpolicies.go +++ b/modules/caddyhttp/reverseproxy/selectionpolicies.go @@ -40,6 +40,7 @@ func init() { caddy.RegisterModule(RandomChoiceSelection{}) caddy.RegisterModule(LeastConnSelection{}) caddy.RegisterModule(RoundRobinSelection{}) + caddy.RegisterModule(WeightedRoundRobinSelection{}) caddy.RegisterModule(FirstSelection{}) caddy.RegisterModule(IPHashSelection{}) caddy.RegisterModule(ClientIPHashSelection{}) @@ -78,6 +79,90 @@ func (r *RandomSelection) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return nil } +// WeightedRoundRobinSelection is a policy that selects +// a host based on weighted round-robin ordering. +type WeightedRoundRobinSelection struct { + // The weight of each upstream in order, + // corresponding with the list of upstreams configured. + Weights []int `json:"weights,omitempty"` + index uint32 + totalWeight int +} + +// CaddyModule returns the Caddy module information. +func (WeightedRoundRobinSelection) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.selection_policies.weighted_round_robin", + New: func() caddy.Module { + return new(WeightedRoundRobinSelection) + }, + } +} + +// UnmarshalCaddyfile sets up the module from Caddyfile tokens. +func (r *WeightedRoundRobinSelection) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + for d.Next() { + args := d.RemainingArgs() + if len(args) == 0 { + return d.ArgErr() + } + + for _, weight := range args { + weightInt, err := strconv.Atoi(weight) + if err != nil { + return d.Errf("invalid weight value '%s': %v", weight, err) + } + if weightInt < 1 { + return d.Errf("invalid weight value '%s': weight should be non-zero and positive", weight) + } + r.Weights = append(r.Weights, weightInt) + } + } + return nil +} + +// Provision sets up r. +func (r *WeightedRoundRobinSelection) Provision(ctx caddy.Context) error { + for _, weight := range r.Weights { + r.totalWeight += weight + } + return nil +} + +// Select returns an available host, if any. +func (r *WeightedRoundRobinSelection) Select(pool UpstreamPool, _ *http.Request, _ http.ResponseWriter) *Upstream { + if len(pool) == 0 { + return nil + } + if len(r.Weights) < 2 { + return pool[0] + } + var index, totalWeight int + currentWeight := int(atomic.AddUint32(&r.index, 1)) % r.totalWeight + for i, weight := range r.Weights { + totalWeight += weight + if currentWeight < totalWeight { + index = i + break + } + } + + upstreams := make([]*Upstream, 0, len(r.Weights)) + for _, upstream := range pool { + if !upstream.Available() { + continue + } + upstreams = append(upstreams, upstream) + if len(upstreams) == cap(upstreams) { + break + } + } + if len(upstreams) == 0 { + return nil + } + return upstreams[index%len(upstreams)] +} + // RandomChoiceSelection is a policy that selects // two or more available hosts at random, then // chooses the one with the least load. @@ -762,6 +847,7 @@ var ( _ Selector = (*RandomChoiceSelection)(nil) _ Selector = (*LeastConnSelection)(nil) _ Selector = (*RoundRobinSelection)(nil) + _ Selector = (*WeightedRoundRobinSelection)(nil) _ Selector = (*FirstSelection)(nil) _ Selector = (*IPHashSelection)(nil) _ Selector = (*ClientIPHashSelection)(nil) @@ -770,8 +856,11 @@ var ( _ Selector = (*HeaderHashSelection)(nil) _ Selector = (*CookieHashSelection)(nil) - _ caddy.Validator = (*RandomChoiceSelection)(nil) + _ caddy.Validator = (*RandomChoiceSelection)(nil) + _ caddy.Provisioner = (*RandomChoiceSelection)(nil) + _ caddy.Provisioner = (*WeightedRoundRobinSelection)(nil) _ caddyfile.Unmarshaler = (*RandomChoiceSelection)(nil) + _ caddyfile.Unmarshaler = (*WeightedRoundRobinSelection)(nil) ) diff --git a/modules/caddyhttp/reverseproxy/selectionpolicies_test.go b/modules/caddyhttp/reverseproxy/selectionpolicies_test.go index 93dcb77..dc613a5 100644 --- a/modules/caddyhttp/reverseproxy/selectionpolicies_test.go +++ b/modules/caddyhttp/reverseproxy/selectionpolicies_test.go @@ -74,6 +74,63 @@ func TestRoundRobinPolicy(t *testing.T) { } } +func TestWeightedRoundRobinPolicy(t *testing.T) { + pool := testPool() + wrrPolicy := WeightedRoundRobinSelection{ + Weights: []int{3, 2, 1}, + totalWeight: 6, + } + req, _ := http.NewRequest("GET", "/", nil) + + h := wrrPolicy.Select(pool, req, nil) + if h != pool[0] { + t.Error("Expected first weighted round robin host to be first host in the pool.") + } + h = wrrPolicy.Select(pool, req, nil) + if h != pool[0] { + t.Error("Expected second weighted round robin host to be first host in the pool.") + } + // Third selected host is 1, because counter starts at 0 + // and increments before host is selected + h = wrrPolicy.Select(pool, req, nil) + if h != pool[1] { + t.Error("Expected third weighted round robin host to be second host in the pool.") + } + h = wrrPolicy.Select(pool, req, nil) + if h != pool[1] { + t.Error("Expected fourth weighted round robin host to be second host in the pool.") + } + h = wrrPolicy.Select(pool, req, nil) + if h != pool[2] { + t.Error("Expected fifth weighted round robin host to be third host in the pool.") + } + h = wrrPolicy.Select(pool, req, nil) + if h != pool[0] { + t.Error("Expected sixth weighted round robin host to be first host in the pool.") + } + + // mark host as down + pool[0].setHealthy(false) + h = wrrPolicy.Select(pool, req, nil) + if h != pool[1] { + t.Error("Expected to skip down host.") + } + // mark host as up + pool[0].setHealthy(true) + + h = wrrPolicy.Select(pool, req, nil) + if h != pool[0] { + t.Error("Expected to select first host on availablity.") + } + // mark host as full + pool[1].countRequest(1) + pool[1].MaxRequests = 1 + h = wrrPolicy.Select(pool, req, nil) + if h != pool[2] { + t.Error("Expected to skip full host.") + } +} + func TestLeastConnPolicy(t *testing.T) { pool := testPool() lcPolicy := LeastConnSelection{} -- cgit v1.2.3