From 83b26975bd9330c4cfc44d52b106da739240e72b Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Sat, 3 Sep 2022 06:57:55 +0800 Subject: fastcgi: Optimize FastCGI transport (#4978) * break up code and use lazy reading and pool bufio.Writer * close underlying connection when operation failed * allocate bufWriter and streamWriter only once * refactor record writing * rebase from master * handle err * Fix type assertion Also reduce some duplication * Refactor client and clientCloser for logging Should reduce allocations * Minor cosmetic adjustments; apply Apache license * Appease the linter Co-authored-by: Matthew Holt --- modules/caddyhttp/reverseproxy/fastcgi/client.go | 345 ++++----------------- .../caddyhttp/reverseproxy/fastcgi/client_test.go | 6 +- modules/caddyhttp/reverseproxy/fastcgi/fastcgi.go | 57 ++-- modules/caddyhttp/reverseproxy/fastcgi/header.go | 32 ++ modules/caddyhttp/reverseproxy/fastcgi/pool.go | 26 ++ modules/caddyhttp/reverseproxy/fastcgi/reader.go | 44 +++ modules/caddyhttp/reverseproxy/fastcgi/record.go | 58 ++++ modules/caddyhttp/reverseproxy/fastcgi/writer.go | 145 +++++++++ 8 files changed, 399 insertions(+), 314 deletions(-) create mode 100644 modules/caddyhttp/reverseproxy/fastcgi/header.go create mode 100644 modules/caddyhttp/reverseproxy/fastcgi/pool.go create mode 100644 modules/caddyhttp/reverseproxy/fastcgi/reader.go create mode 100644 modules/caddyhttp/reverseproxy/fastcgi/record.go create mode 100644 modules/caddyhttp/reverseproxy/fastcgi/writer.go (limited to 'modules/caddyhttp') diff --git a/modules/caddyhttp/reverseproxy/fastcgi/client.go b/modules/caddyhttp/reverseproxy/fastcgi/client.go index bccb49c..ae36dd8 100644 --- a/modules/caddyhttp/reverseproxy/fastcgi/client.go +++ b/modules/caddyhttp/reverseproxy/fastcgi/client.go @@ -26,9 +26,6 @@ package fastcgi import ( "bufio" "bytes" - "context" - "encoding/binary" - "errors" "io" "mime/multipart" "net" @@ -40,7 +37,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" "go.uber.org/zap" @@ -122,285 +118,60 @@ const ( maxPad = 255 ) -type header struct { - Version uint8 - Type uint8 - ID uint16 - ContentLength uint16 - PaddingLength uint8 - Reserved uint8 -} - // for padding so we don't have to allocate all the time // not synchronized because we don't care what the contents are var pad [maxPad]byte -func (h *header) init(recType uint8, reqID uint16, contentLength int) { - h.Version = 1 - h.Type = recType - h.ID = reqID - h.ContentLength = uint16(contentLength) - h.PaddingLength = uint8(-contentLength & 7) -} - -type record struct { - h header - rbuf []byte -} - -func (rec *record) read(r io.Reader) (buf []byte, err error) { - if err = binary.Read(r, binary.BigEndian, &rec.h); err != nil { - return - } - if rec.h.Version != 1 { - err = errors.New("fcgi: invalid header version") - return - } - if rec.h.Type == EndRequest { - err = io.EOF - return - } - n := int(rec.h.ContentLength) + int(rec.h.PaddingLength) - if len(rec.rbuf) < n { - rec.rbuf = make([]byte, n) - } - if _, err = io.ReadFull(r, rec.rbuf[:n]); err != nil { - return - } - buf = rec.rbuf[:int(rec.h.ContentLength)] - - return -} - -// FCGIClient implements a FastCGI client, which is a standard for +// client implements a FastCGI client, which is a standard for // interfacing external applications with Web servers. -type FCGIClient struct { - mutex sync.Mutex - rwc io.ReadWriteCloser - h header - buf bytes.Buffer - stderr bytes.Buffer - keepAlive bool - reqID uint16 - logger *zap.Logger -} - -// DialWithDialerContext connects to the fcgi responder at the specified network address, using custom net.Dialer -// and a context. -// See func net.Dial for a description of the network and address parameters. -func DialWithDialerContext(ctx context.Context, network, address string, dialer net.Dialer) (fcgi *FCGIClient, err error) { - var conn net.Conn - conn, err = dialer.DialContext(ctx, network, address) - if err != nil { - return - } - - fcgi = &FCGIClient{ - rwc: conn, - keepAlive: false, - reqID: 1, - } - - return -} - -// DialContext is like Dial but passes ctx to dialer.Dial. -func DialContext(ctx context.Context, network, address string) (fcgi *FCGIClient, err error) { - // TODO: why not set timeout here? - return DialWithDialerContext(ctx, network, address, net.Dialer{}) -} - -// Dial connects to the fcgi responder at the specified network address, using default net.Dialer. -// See func net.Dial for a description of the network and address parameters. -func Dial(network, address string) (fcgi *FCGIClient, err error) { - return DialContext(context.Background(), network, address) -} - -// Close closes fcgi connection -func (c *FCGIClient) Close() { - c.rwc.Close() -} - -func (c *FCGIClient) writeRecord(recType uint8, content []byte) (err error) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.buf.Reset() - c.h.init(recType, c.reqID, len(content)) - if err := binary.Write(&c.buf, binary.BigEndian, c.h); err != nil { - return err - } - if _, err := c.buf.Write(content); err != nil { - return err - } - if _, err := c.buf.Write(pad[:c.h.PaddingLength]); err != nil { - return err - } - _, err = c.rwc.Write(c.buf.Bytes()) - return err -} - -func (c *FCGIClient) writeBeginRequest(role uint16, flags uint8) error { - b := [8]byte{byte(role >> 8), byte(role), flags} - return c.writeRecord(BeginRequest, b[:]) -} - -func (c *FCGIClient) writePairs(recType uint8, pairs map[string]string) error { - w := newWriter(c, recType) - b := make([]byte, 8) - nn := 0 - for k, v := range pairs { - m := 8 + len(k) + len(v) - if m > maxWrite { - // param data size exceed 65535 bytes" - vl := maxWrite - 8 - len(k) - v = v[:vl] - } - n := encodeSize(b, uint32(len(k))) - n += encodeSize(b[n:], uint32(len(v))) - m = n + len(k) + len(v) - if (nn + m) > maxWrite { - w.Flush() - nn = 0 - } - nn += m - if _, err := w.Write(b[:n]); err != nil { - return err - } - if _, err := w.WriteString(k); err != nil { - return err - } - if _, err := w.WriteString(v); err != nil { - return err - } - } - w.Close() - return nil -} - -func encodeSize(b []byte, size uint32) int { - if size > 127 { - size |= 1 << 31 - binary.BigEndian.PutUint32(b, size) - return 4 - } - b[0] = byte(size) - return 1 -} - -// bufWriter encapsulates bufio.Writer but also closes the underlying stream when -// Closed. -type bufWriter struct { - closer io.Closer - *bufio.Writer -} - -func (w *bufWriter) Close() error { - if err := w.Writer.Flush(); err != nil { - w.closer.Close() - return err - } - return w.closer.Close() -} - -func newWriter(c *FCGIClient, recType uint8) *bufWriter { - s := &streamWriter{c: c, recType: recType} - w := bufio.NewWriterSize(s, maxWrite) - return &bufWriter{s, w} -} - -// streamWriter abstracts out the separation of a stream into discrete records. -// It only writes maxWrite bytes at a time. -type streamWriter struct { - c *FCGIClient - recType uint8 -} - -func (w *streamWriter) Write(p []byte) (int, error) { - nn := 0 - for len(p) > 0 { - n := len(p) - if n > maxWrite { - n = maxWrite - } - if err := w.c.writeRecord(w.recType, p[:n]); err != nil { - return nn, err - } - nn += n - p = p[n:] - } - return nn, nil -} - -func (w *streamWriter) Close() error { - // send empty record to close the stream - return w.c.writeRecord(w.recType, nil) -} - -type streamReader struct { - c *FCGIClient - buf []byte -} - -func (w *streamReader) Read(p []byte) (n int, err error) { - if len(p) > 0 { - if len(w.buf) == 0 { - - // filter outputs for error log - for { - rec := &record{} - var buf []byte - buf, err = rec.read(w.c.rwc) - if err != nil { - return - } - // standard error output - if rec.h.Type == Stderr { - w.c.stderr.Write(buf) - continue - } - w.buf = buf - break - } - } - - n = len(p) - if n > len(w.buf) { - n = len(w.buf) - } - copy(p, w.buf[:n]) - w.buf = w.buf[n:] - } - - return +type client struct { + rwc net.Conn + // keepAlive bool // TODO: implement + reqID uint16 + stderr bool + logger *zap.Logger } // Do made the request and returns a io.Reader that translates the data read // from fcgi responder out of fcgi packet before returning it. -func (c *FCGIClient) Do(p map[string]string, req io.Reader) (r io.Reader, err error) { - err = c.writeBeginRequest(uint16(Responder), 0) +func (c *client) Do(p map[string]string, req io.Reader) (r io.Reader, err error) { + writer := &streamWriter{c: c} + writer.buf = bufPool.Get().(*bytes.Buffer) + writer.buf.Reset() + defer bufPool.Put(writer.buf) + + err = writer.writeBeginRequest(uint16(Responder), 0) if err != nil { return } - err = c.writePairs(Params, p) + writer.recType = Params + err = writer.writePairs(p) if err != nil { return } - body := newWriter(c, Stdin) + writer.recType = Stdin if req != nil { - _, _ = io.Copy(body, req) + _, err = io.Copy(writer, req) + if err != nil { + return nil, err + } + } + err = writer.FlushStream() + if err != nil { + return nil, err } - body.Close() r = &streamReader{c: c} return } // clientCloser is a io.ReadCloser. It wraps a io.Reader with a Closer -// that closes FCGIClient connection. +// that closes the client connection. type clientCloser struct { - *FCGIClient + rwc net.Conn + r *streamReader io.Reader status int @@ -408,9 +179,9 @@ type clientCloser struct { } func (f clientCloser) Close() error { - stderr := f.FCGIClient.stderr.Bytes() + stderr := f.r.stderr.Bytes() if len(stderr) == 0 { - return f.FCGIClient.rwc.Close() + return f.rwc.Close() } if f.status >= 400 { @@ -418,12 +189,13 @@ func (f clientCloser) Close() error { } else { f.logger.Warn("stderr", zap.ByteString("body", stderr)) } - return f.FCGIClient.rwc.Close() + + return f.rwc.Close() } // Request returns a HTTP Response with Header and Body // from fcgi responder -func (c *FCGIClient) Request(p map[string]string, req io.Reader) (resp *http.Response, err error) { +func (c *client) Request(p map[string]string, req io.Reader) (resp *http.Response, err error) { r, err := c.Do(p, req) if err != nil { return @@ -458,26 +230,27 @@ func (c *FCGIClient) Request(p map[string]string, req io.Reader) (resp *http.Res resp.TransferEncoding = resp.Header["Transfer-Encoding"] resp.ContentLength, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) + // wrap the response body in our closer + closer := clientCloser{ + rwc: c.rwc, + r: r.(*streamReader), + Reader: rb, + status: resp.StatusCode, + logger: noopLogger, + } if chunked(resp.TransferEncoding) { - resp.Body = clientCloser{ - FCGIClient: c, - Reader: httputil.NewChunkedReader(rb), - status: resp.StatusCode, - logger: c.logger, - } - } else { - resp.Body = clientCloser{ - FCGIClient: c, - Reader: rb, - status: resp.StatusCode, - logger: c.logger, - } + closer.Reader = httputil.NewChunkedReader(rb) } + if c.stderr { + closer.logger = c.logger + } + resp.Body = closer + return } // Get issues a GET request to the fcgi responder. -func (c *FCGIClient) Get(p map[string]string, body io.Reader, l int64) (resp *http.Response, err error) { +func (c *client) Get(p map[string]string, body io.Reader, l int64) (resp *http.Response, err error) { p["REQUEST_METHOD"] = "GET" p["CONTENT_LENGTH"] = strconv.FormatInt(l, 10) @@ -486,7 +259,7 @@ func (c *FCGIClient) Get(p map[string]string, body io.Reader, l int64) (resp *ht } // Head issues a HEAD request to the fcgi responder. -func (c *FCGIClient) Head(p map[string]string) (resp *http.Response, err error) { +func (c *client) Head(p map[string]string) (resp *http.Response, err error) { p["REQUEST_METHOD"] = "HEAD" p["CONTENT_LENGTH"] = "0" @@ -495,7 +268,7 @@ func (c *FCGIClient) Head(p map[string]string) (resp *http.Response, err error) } // Options issues an OPTIONS request to the fcgi responder. -func (c *FCGIClient) Options(p map[string]string) (resp *http.Response, err error) { +func (c *client) Options(p map[string]string) (resp *http.Response, err error) { p["REQUEST_METHOD"] = "OPTIONS" p["CONTENT_LENGTH"] = "0" @@ -505,7 +278,7 @@ func (c *FCGIClient) Options(p map[string]string) (resp *http.Response, err erro // Post issues a POST request to the fcgi responder. with request body // in the format that bodyType specified -func (c *FCGIClient) Post(p map[string]string, method string, bodyType string, body io.Reader, l int64) (resp *http.Response, err error) { +func (c *client) Post(p map[string]string, method string, bodyType string, body io.Reader, l int64) (resp *http.Response, err error) { if p == nil { p = make(map[string]string) } @@ -528,7 +301,7 @@ func (c *FCGIClient) Post(p map[string]string, method string, bodyType string, b // PostForm issues a POST to the fcgi responder, with form // as a string key to a list values (url.Values) -func (c *FCGIClient) PostForm(p map[string]string, data url.Values) (resp *http.Response, err error) { +func (c *client) PostForm(p map[string]string, data url.Values) (resp *http.Response, err error) { body := bytes.NewReader([]byte(data.Encode())) return c.Post(p, "POST", "application/x-www-form-urlencoded", body, int64(body.Len())) } @@ -536,7 +309,7 @@ func (c *FCGIClient) PostForm(p map[string]string, data url.Values) (resp *http. // PostFile issues a POST to the fcgi responder in multipart(RFC 2046) standard, // with form as a string key to a list values (url.Values), // and/or with file as a string key to a list file path. -func (c *FCGIClient) PostFile(p map[string]string, data url.Values, file map[string]string) (resp *http.Response, err error) { +func (c *client) PostFile(p map[string]string, data url.Values, file map[string]string) (resp *http.Response, err error) { buf := &bytes.Buffer{} writer := multipart.NewWriter(buf) bodyType := writer.FormDataContentType() @@ -577,18 +350,18 @@ func (c *FCGIClient) PostFile(p map[string]string, data url.Values, file map[str // SetReadTimeout sets the read timeout for future calls that read from the // fcgi responder. A zero value for t means no timeout will be set. -func (c *FCGIClient) SetReadTimeout(t time.Duration) error { - if conn, ok := c.rwc.(net.Conn); ok && t != 0 { - return conn.SetReadDeadline(time.Now().Add(t)) +func (c *client) SetReadTimeout(t time.Duration) error { + if t != 0 { + return c.rwc.SetReadDeadline(time.Now().Add(t)) } return nil } // SetWriteTimeout sets the write timeout for future calls that send data to // the fcgi responder. A zero value for t means no timeout will be set. -func (c *FCGIClient) SetWriteTimeout(t time.Duration) error { - if conn, ok := c.rwc.(net.Conn); ok && t != 0 { - return conn.SetWriteDeadline(time.Now().Add(t)) +func (c *client) SetWriteTimeout(t time.Duration) error { + if t != 0 { + return c.rwc.SetWriteDeadline(time.Now().Add(t)) } return nil } diff --git a/modules/caddyhttp/reverseproxy/fastcgi/client_test.go b/modules/caddyhttp/reverseproxy/fastcgi/client_test.go index ef3474d..78e5713 100644 --- a/modules/caddyhttp/reverseproxy/fastcgi/client_test.go +++ b/modules/caddyhttp/reverseproxy/fastcgi/client_test.go @@ -118,12 +118,14 @@ func (s FastCGIServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } func sendFcgi(reqType int, fcgiParams map[string]string, data []byte, posts map[string]string, files map[string]string) (content []byte) { - fcgi, err := Dial("tcp", ipPort) + conn, err := net.Dial("tcp", ipPort) if err != nil { log.Println("err:", err) return } + fcgi := client{rwc: conn, reqID: 1} + length := 0 var resp *http.Response @@ -168,7 +170,7 @@ func sendFcgi(reqType int, fcgiParams map[string]string, data []byte, posts map[ content, _ = io.ReadAll(resp.Body) log.Println("c: send data length ≈", length, string(content)) - fcgi.Close() + conn.Close() time.Sleep(1 * time.Second) if bytes.Contains(content, []byte("FAILED")) { diff --git a/modules/caddyhttp/reverseproxy/fastcgi/fastcgi.go b/modules/caddyhttp/reverseproxy/fastcgi/fastcgi.go index a9f8be1..6ff6ff4 100644 --- a/modules/caddyhttp/reverseproxy/fastcgi/fastcgi.go +++ b/modules/caddyhttp/reverseproxy/fastcgi/fastcgi.go @@ -15,7 +15,6 @@ package fastcgi import ( - "context" "crypto/tls" "fmt" "net" @@ -129,13 +128,7 @@ func (t Transport) RoundTrip(r *http.Request) (*http.Response, error) { return nil, fmt.Errorf("building environment: %v", err) } - // TODO: doesn't dialer have a Timeout field? ctx := r.Context() - if t.DialTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(t.DialTimeout)) - defer cancel() - } // extract dial information from request (should have been embedded by the reverse proxy) network, address := "tcp", r.URL.Host @@ -150,32 +143,41 @@ func (t Transport) RoundTrip(r *http.Request) (*http.Response, error) { ShouldLogCredentials: logCreds, } loggableEnv := loggableEnv{vars: env, logCredentials: logCreds} - t.logger.Debug("roundtrip", + + logger := t.logger.With( zap.Object("request", loggableReq), - zap.String("dial", address), zap.Object("env", loggableEnv), ) + logger.Debug("roundtrip", + zap.String("dial", address), + zap.Object("env", loggableEnv), + zap.Object("request", loggableReq)) - fcgiBackend, err := DialContext(ctx, network, address) + // connect to the backend + dialer := net.Dialer{Timeout: time.Duration(t.DialTimeout)} + conn, err := dialer.DialContext(ctx, network, address) if err != nil { - // TODO: wrap in a special error type if the dial failed, so retries can happen if enabled return nil, fmt.Errorf("dialing backend: %v", err) } - if t.CaptureStderr { - fcgiBackend.logger = t.logger.With( - zap.Object("request", loggableReq), - zap.Object("env", loggableEnv), - ) - } else { - fcgiBackend.logger = noopLogger + defer func() { + // conn will be closed with the response body unless there's an error + if err != nil { + conn.Close() + } + }() + + // create the client that will facilitate the protocol + client := client{ + rwc: conn, + reqID: 1, + logger: logger, } - // fcgiBackend gets closed when response body is closed (see clientCloser) // read/write timeouts - if err := fcgiBackend.SetReadTimeout(time.Duration(t.ReadTimeout)); err != nil { + if err = client.SetReadTimeout(time.Duration(t.ReadTimeout)); err != nil { return nil, fmt.Errorf("setting read timeout: %v", err) } - if err := fcgiBackend.SetWriteTimeout(time.Duration(t.WriteTimeout)); err != nil { + if err = client.SetWriteTimeout(time.Duration(t.WriteTimeout)); err != nil { return nil, fmt.Errorf("setting write timeout: %v", err) } @@ -187,16 +189,19 @@ func (t Transport) RoundTrip(r *http.Request) (*http.Response, error) { var resp *http.Response switch r.Method { case http.MethodHead: - resp, err = fcgiBackend.Head(env) + resp, err = client.Head(env) case http.MethodGet: - resp, err = fcgiBackend.Get(env, r.Body, contentLength) + resp, err = client.Get(env, r.Body, contentLength) case http.MethodOptions: - resp, err = fcgiBackend.Options(env) + resp, err = client.Options(env) default: - resp, err = fcgiBackend.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength) + resp, err = client.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength) + } + if err != nil { + return nil, err } - return resp, err + return resp, nil } // buildEnv returns a set of CGI environment variables for the request. diff --git a/modules/caddyhttp/reverseproxy/fastcgi/header.go b/modules/caddyhttp/reverseproxy/fastcgi/header.go new file mode 100644 index 0000000..59dce71 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/fastcgi/header.go @@ -0,0 +1,32 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fastcgi + +type header struct { + Version uint8 + Type uint8 + ID uint16 + ContentLength uint16 + PaddingLength uint8 + Reserved uint8 +} + +func (h *header) init(recType uint8, reqID uint16, contentLength int) { + h.Version = 1 + h.Type = recType + h.ID = reqID + h.ContentLength = uint16(contentLength) + h.PaddingLength = uint8(-contentLength & 7) +} diff --git a/modules/caddyhttp/reverseproxy/fastcgi/pool.go b/modules/caddyhttp/reverseproxy/fastcgi/pool.go new file mode 100644 index 0000000..29017f1 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/fastcgi/pool.go @@ -0,0 +1,26 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fastcgi + +import ( + "bytes" + "sync" +) + +var bufPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} diff --git a/modules/caddyhttp/reverseproxy/fastcgi/reader.go b/modules/caddyhttp/reverseproxy/fastcgi/reader.go new file mode 100644 index 0000000..3a8e91d --- /dev/null +++ b/modules/caddyhttp/reverseproxy/fastcgi/reader.go @@ -0,0 +1,44 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fastcgi + +import ( + "bytes" + "io" +) + +type streamReader struct { + c *client + rec record + stderr bytes.Buffer +} + +func (w *streamReader) Read(p []byte) (n int, err error) { + for !w.rec.hasMore() { + err = w.rec.fill(w.c.rwc) + if err != nil { + return 0, err + } + + // standard error output + if w.rec.h.Type == Stderr { + if _, err = io.Copy(&w.stderr, &w.rec); err != nil { + return 0, err + } + } + } + + return w.rec.Read(p) +} diff --git a/modules/caddyhttp/reverseproxy/fastcgi/record.go b/modules/caddyhttp/reverseproxy/fastcgi/record.go new file mode 100644 index 0000000..46c1f17 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/fastcgi/record.go @@ -0,0 +1,58 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fastcgi + +import ( + "encoding/binary" + "errors" + "io" +) + +type record struct { + h header + lr io.LimitedReader + padding int64 +} + +func (rec *record) fill(r io.Reader) (err error) { + rec.lr.N = rec.padding + rec.lr.R = r + if _, err = io.Copy(io.Discard, rec); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &rec.h); err != nil { + return + } + if rec.h.Version != 1 { + err = errors.New("fcgi: invalid header version") + return + } + if rec.h.Type == EndRequest { + err = io.EOF + return + } + rec.lr.N = int64(rec.h.ContentLength) + rec.padding = int64(rec.h.PaddingLength) + return +} + +func (rec *record) Read(p []byte) (n int, err error) { + return rec.lr.Read(p) +} + +func (rec *record) hasMore() bool { + return rec.lr.N > 0 +} diff --git a/modules/caddyhttp/reverseproxy/fastcgi/writer.go b/modules/caddyhttp/reverseproxy/fastcgi/writer.go new file mode 100644 index 0000000..3af00d9 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/fastcgi/writer.go @@ -0,0 +1,145 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fastcgi + +import ( + "bytes" + "encoding/binary" +) + +// streamWriter abstracts out the separation of a stream into discrete records. +// It only writes maxWrite bytes at a time. +type streamWriter struct { + c *client + h header + buf *bytes.Buffer + recType uint8 +} + +func (w *streamWriter) writeRecord(recType uint8, content []byte) (err error) { + w.h.init(recType, w.c.reqID, len(content)) + w.buf.Write(pad[:8]) + w.writeHeader() + w.buf.Write(content) + w.buf.Write(pad[:w.h.PaddingLength]) + _, err = w.buf.WriteTo(w.c.rwc) + return err +} + +func (w *streamWriter) writeBeginRequest(role uint16, flags uint8) error { + b := [8]byte{byte(role >> 8), byte(role), flags} + return w.writeRecord(BeginRequest, b[:]) +} + +func (w *streamWriter) Write(p []byte) (int, error) { + // init header + if w.buf.Len() < 8 { + w.buf.Write(pad[:8]) + } + + nn := 0 + for len(p) > 0 { + n := len(p) + nl := maxWrite + 8 - w.buf.Len() + if n > nl { + n = nl + w.buf.Write(p[:n]) + if err := w.Flush(); err != nil { + return nn, err + } + // reset headers + w.buf.Write(pad[:8]) + } else { + w.buf.Write(p[:n]) + } + nn += n + p = p[n:] + } + return nn, nil +} + +func (w *streamWriter) endStream() error { + // send empty record to close the stream + return w.writeRecord(w.recType, nil) +} + +func (w *streamWriter) writePairs(pairs map[string]string) error { + b := make([]byte, 8) + nn := 0 + // init headers + w.buf.Write(b) + for k, v := range pairs { + m := 8 + len(k) + len(v) + if m > maxWrite { + // param data size exceed 65535 bytes" + vl := maxWrite - 8 - len(k) + v = v[:vl] + } + n := encodeSize(b, uint32(len(k))) + n += encodeSize(b[n:], uint32(len(v))) + m = n + len(k) + len(v) + if (nn + m) > maxWrite { + if err := w.Flush(); err != nil { + return err + } + // reset headers + w.buf.Write(b) + nn = 0 + } + nn += m + w.buf.Write(b[:n]) + w.buf.WriteString(k) + w.buf.WriteString(v) + } + return w.FlushStream() +} + +func encodeSize(b []byte, size uint32) int { + if size > 127 { + size |= 1 << 31 + binary.BigEndian.PutUint32(b, size) + return 4 + } + b[0] = byte(size) + return 1 +} + +// writeHeader populate header wire data in buf, it abuses buffer.Bytes() modification +func (w *streamWriter) writeHeader() { + h := w.buf.Bytes()[:8] + h[0] = w.h.Version + h[1] = w.h.Type + binary.BigEndian.PutUint16(h[2:4], w.h.ID) + binary.BigEndian.PutUint16(h[4:6], w.h.ContentLength) + h[6] = w.h.PaddingLength + h[7] = w.h.Reserved +} + +// Flush write buffer data to the underlying connection, it assumes header data is the first 8 bytes of buf +func (w *streamWriter) Flush() error { + w.h.init(w.recType, w.c.reqID, w.buf.Len()-8) + w.writeHeader() + w.buf.Write(pad[:w.h.PaddingLength]) + _, err := w.buf.WriteTo(w.c.rwc) + return err +} + +// FlushStream flush data then end current stream +func (w *streamWriter) FlushStream() error { + if err := w.Flush(); err != nil { + return err + } + return w.endStream() +} -- cgit v1.2.3