diff --git a/.dockerignore b/.dockerignore index 55946b4..3fea7ce 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,6 @@ .git LICENSE +docker-compose.yaml *.md assets Dockerfile diff --git a/README.md b/README.md index 555cade..f1fe625 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,10 @@ -# framer -framer is the most performant grpc load generator +# ozon-framer +ozon-framer is the most performant grpc load generator ## Performance ![benchmark chart](./assets/benchmark_chart.png) +Benchmarks are done with `11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz`. Load generators was limited in 2 CPU. Load generators configurations are available in [benchmarks directory](./benchmarks) @@ -17,7 +18,7 @@ Load generators configurations are available in [benchmarks directory](./benchma This is alpha version. Public api and request file format may be changed. ## Install -Download binary from [github release page](./releases/latest) and place it in your PATH. +Download binary from [github release page](https://github.com/ozontech/framer/releases/latest) and place it in your PATH. ### Compile **Build using go** @@ -77,6 +78,13 @@ framer load --addr=localhost:9090 --requests-file=test_files/requests --clients It makes 10 rps from 10 clients in 10 second. ## Converter +`framer convert` command may be used to convert requests file between different formats. +Now is supported next formats: +* ozon.binary - [see format description above](#ozon.binary-file-format); +* pandora.json - grpc json format of pandora load generator. [See documentation](https://yandex.cloud/ru/docs/load-testing/concepts/payloads/grpc-json); +* ozon.json - same as pandora.json, but has ability to store repeatable meta value. + +### Supported formats ### Ozon.binary file format Rules are using [ABNF syntax](https://tools.ietf.org/html/rfc5234). @@ -94,6 +102,9 @@ Body = 1*({any byte}) [Example requests file](https://github.com/ozontech/framer/-/blob/master/test_files/requests) +#### Programatic ozon.binary generation example +[Full example](./examples/requestsgen) + ### Usage ``` Usage: framer convert --help @@ -123,12 +134,10 @@ framer convert --from=ozon.json --to=ozon.binary --reflection-proto=formats/grpc ``` It converts requests file from ozon.json format to ozon.binary format using protofile. -### Programatic requests generation example -[Full example](./examples/requestsgen) - ## TODO -- [ ] Installation from homebrew for macOS; -- [ ] Publish to dockerhub; +- [ ] Installation + - [ ] Homebrew suport for macOS; + - [ ] Publish to dockerhub; - [ ] Configuration file support; - [ ] Requests scheduling strategys combination; - [ ] More reporting variants; diff --git a/assets/benchmark_chart.png b/assets/benchmark_chart.png index 58b67a1..b0886d4 100644 Binary files a/assets/benchmark_chart.png and b/assets/benchmark_chart.png differ diff --git a/benchmarks/dumb-server/Dockerfile b/benchmarks/dumb-server/Dockerfile new file mode 100644 index 0000000..b307298 --- /dev/null +++ b/benchmarks/dumb-server/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.22-alpine AS build-stage +WORKDIR /app +RUN apk add make +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +WORKDIR /app/benchmarks/dumb-server +RUN make build + +FROM alpine +WORKDIR / +COPY --from=build-stage /tmp/bin/dumb-server /dumb-server +ENTRYPOINT ["/dumb-server"] diff --git a/benchmarks/dumb-server/docker-compose.yaml b/benchmarks/dumb-server/docker-compose.yaml new file mode 100644 index 0000000..0694d20 --- /dev/null +++ b/benchmarks/dumb-server/docker-compose.yaml @@ -0,0 +1,9 @@ +version: "3.8" +services: + server: + container_name: server-${COMPOSE_PROJECT_NAME} + image: server + build: + context: ../.. + dockerfile: benchmarks/dumb-server/Dockerfile + network_mode: host diff --git a/benchmarks/dumb-server/main.go b/benchmarks/dumb-server/main.go index 799aee4..710add1 100644 --- a/benchmarks/dumb-server/main.go +++ b/benchmarks/dumb-server/main.go @@ -122,8 +122,6 @@ func newResps() (resp1 []byte, resp1Mod respModifier, resp2 []byte, resp2Mod res resp = append(resp, headerBuf.Bytes()...) msgPrefix := make([]byte, 5) // не кодированный ответ 0 длины = любое сообщение со всеми пустыми полями - // TODO: remove debug - // msgPrefix := []byte{255, 254, 253, 252, 251} dataLen := len(msgPrefix) respMod.dataStreamIDindx = len(resp) + 5 @@ -305,8 +303,16 @@ func handleConn(conn net.Conn, i int) (err error) { processor := reciever.NewProcessor([]reciever.FrameTypeProcessor{ http2.FrameData: endStreamProcessor, http2.FrameHeaders: endStreamProcessor, - http2.FrameContinuation: endStreamProcessor, http2.FramePing: &pingProcessor{cmds}, + http2.FrameContinuation: endStreamProcessor, + + http2.FrameRSTStream: noopFrameProcessor{}, + http2.FrameGoAway: noopFrameProcessor{}, + http2.FrameWindowUpdate: noopFrameProcessor{}, + + // http.FramePriority not supported + http2.FrameSettings: settingsProcessor{}, + // http.FramePushPromise not supported }) g.Go(func() error { defer println("processor done") @@ -369,7 +375,6 @@ func handleConn(conn net.Conn, i int) (err error) { } func writeLoop(ctx context.Context, conn io.Writer, commands <-chan *command, _ types.FlowControl, _ int) (err error) { - f := http2.NewFramer(conn, nil) r1Original, r1mod, r2Original, r2mod := newResps() select { case cmd := <-commands: @@ -381,7 +386,7 @@ func writeLoop(ctx context.Context, conn io.Writer, commands <-chan *command, _ return err } case commandTypePing: - err := f.WritePing(true, cmd.pingPayload) + err := http2.NewFramer(conn, nil).WritePing(true, cmd.pingPayload) if err != nil { return err } @@ -393,8 +398,9 @@ func writeLoop(ctx context.Context, conn io.Writer, commands <-chan *command, _ const limit = 1024 type t struct { - bufSrc [limit][]byte - requests [limit][]byte + bufSrc [limit][]byte + requests [limit][]byte + pingFrame []byte } var t1, t2 t @@ -403,16 +409,23 @@ func writeLoop(ctx context.Context, conn io.Writer, commands <-chan *command, _ t2.requests[i] = bytes.Clone(r2Original) } - writeChan := make(chan net.Buffers, 1) - go func() { - ticker := time.NewTicker(time.Millisecond) - defer close(writeChan) - defer ticker.Stop() + { pingFrame := make([]byte, 9+8) pingHeader := frameheader.FrameHeader(pingFrame) pingHeader.SetType(http2.FramePing) + pingHeader.SetLength(8) pingHeader.SetFlags(http2.FlagPingAck) + t1.pingFrame = pingFrame + t2.pingFrame = bytes.Clone(pingFrame) + } + + writeChan := make(chan net.Buffers) // важно чтобы канал был небуферизованный! + go func() { + ticker := time.NewTicker(time.Millisecond) + defer close(writeChan) + defer ticker.Stop() + nextT, t := &t1, &t2 for { t, nextT = nextT, t @@ -430,16 +443,16 @@ func writeLoop(ctx context.Context, conn io.Writer, commands <-chan *command, _ scheduledBytesOUT.Add(uint64(len(r2))) if len(buf) == limit { - writeBuf = buf[1:] // не пишем резервную позицию + writeBuf = buf[1:] // НЕ пишем резервную позицию } case commandTypePing: - copy(pingFrame[9:], cmd.pingPayload[:]) - buf[0] = pingFrame - writeBuf = buf + copy(t.pingFrame[9:], cmd.pingPayload[:]) + buf[0] = t.pingFrame + writeBuf = buf // пишем резервную позицию } commandsPool.Release(cmd) case <-ticker.C: - writeBuf = buf[1:] // не пишем резервную позицию + writeBuf = buf[1:] // НЕ пишем резервную позицию case <-ctx.Done(): return } @@ -463,7 +476,7 @@ func writeLoop(ctx context.Context, conn io.Writer, commands <-chan *command, _ cmdsAfter := len(commands) since := time.Since(writeStart) - if since > time.Millisecond*10 { + if since > time.Millisecond*10 && false { println(since.String(), cmdsBefore, cmdsAfter) } bytesOUT.Add(l * uint64(len(r2Original))) @@ -503,18 +516,16 @@ func configureConn(conn io.ReadWriter, buf []byte) (settings, error) { return s, fmt.Errorf("first frame from other end is not settings, got %T", frame) } - if !sf.IsAck() { - if val, ok := sf.Value(http2.SettingInitialWindowSize); ok { - s.InitialWindowSize = val - } - if val, ok := sf.Value(http2.SettingMaxConcurrentStreams); ok { - s.MaxConcurrentStreams = val - } + if val, ok := sf.Value(http2.SettingInitialWindowSize); ok { + s.InitialWindowSize = val + } + if val, ok := sf.Value(http2.SettingMaxConcurrentStreams); ok { + s.MaxConcurrentStreams = val + } - err = framer.WriteSettingsAck() - if err != nil { - return s, fmt.Errorf("writing settings ack: %w", err) - } + err = framer.WriteSettingsAck() + if err != nil { + return s, fmt.Errorf("writing settings ack: %w", err) } // у h2load на валидное значение происходит переполнение буффера с отвалом соедиенинения, поэтому: - 65_535 @@ -527,6 +538,16 @@ func configureConn(conn io.ReadWriter, buf []byte) (settings, error) { return s, nil } +type noopFrameProcessor struct{} + +func (p noopFrameProcessor) Process( + _ frameheader.FrameHeader, + _ []byte, + incomplete bool, +) error { + return nil +} + type endStreamProcessor struct { respChan chan<- *command } @@ -557,11 +578,11 @@ type pingProcessor struct { } func (p *pingProcessor) Process( - _ frameheader.FrameHeader, + header frameheader.FrameHeader, payload []byte, incomplete bool, ) error { - if incomplete { + if incomplete || !header.Flags().Has(http2.FlagPingAck) { return nil } @@ -575,3 +596,20 @@ func (p *pingProcessor) Process( return nil } + +type settingsProcessor struct{} + +func (p settingsProcessor) Process( + header frameheader.FrameHeader, + payload []byte, + incomplete bool, +) error { + if incomplete { + return nil + } + + if !header.Flags().Has(http2.FlagSettingsAck) { + return errors.New("update settings in runtime not supported") + } + return nil +} diff --git a/benchmarks/framer/README.md b/benchmarks/framer/README.md new file mode 100644 index 0000000..0181536 --- /dev/null +++ b/benchmarks/framer/README.md @@ -0,0 +1,6 @@ +How to run +- Run in docker (needs docker-compose v2): `docker compose up --abort-on-container-exit` +- Run local: + * Install [framer](../../README.md#install) + * Run dumb server: `(cd ../dumb-server && make run)` + * Execute `./run.sh` diff --git a/benchmarks/framer/REAME.md b/benchmarks/framer/REAME.md deleted file mode 100644 index 5705f01..0000000 --- a/benchmarks/framer/REAME.md +++ /dev/null @@ -1,4 +0,0 @@ -How to run -- Install [framer](../../README.md#install) -- Run dumb server: `(cd ../dumb-server && make run)` -- Execute `./run.sh` diff --git a/benchmarks/framer/docker-compose.yaml b/benchmarks/framer/docker-compose.yaml new file mode 100644 index 0000000..26eb442 --- /dev/null +++ b/benchmarks/framer/docker-compose.yaml @@ -0,0 +1,22 @@ +name: framer-benchmark +include: + - ../dumb-server/docker-compose.yaml +services: + framer: + container_name: framer + image: framer + build: + context: ../.. + dockerfile: Dockerfile + network_mode: host + volumes: + - ./requests.bin:/tmp/requests.bin + command: > + load --addr=localhost:9090 --inmem-requests + --requests-file=/tmp/requests.bin --clients 10 + unlimited --duration 1m + deploy: + resources: + limits: + cpus: '2' + memory: 2G diff --git a/benchmarks/gatling/.dockerignore b/benchmarks/gatling/.dockerignore new file mode 100644 index 0000000..378eac2 --- /dev/null +++ b/benchmarks/gatling/.dockerignore @@ -0,0 +1 @@ +build diff --git a/benchmarks/gatling/Dockerfile b/benchmarks/gatling/Dockerfile new file mode 100644 index 0000000..ac58c20 --- /dev/null +++ b/benchmarks/gatling/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:17-oracle +WORKDIR /app +COPY . /app +RUN ./gradlew +ENTRYPOINT ["./gradlew", "gatlingRun-bench.BenchKt"] diff --git a/benchmarks/gatling/README.md b/benchmarks/gatling/README.md index 0fad7e0..fc0479f 100644 --- a/benchmarks/gatling/README.md +++ b/benchmarks/gatling/README.md @@ -1,4 +1,6 @@ How to run -- Install java/gradle -- Run dumb server: `(cd ../dumb-server && make run)` -- Execute `./run.sh` \ No newline at end of file +- Run in docker (needs docker-compose v2): `docker compose up --abort-on-container-exit` +- Run local: + * Install java and gradle + * Run dumb server: `(cd ../dumb-server && make run)` + * Execute `./run.sh` diff --git a/benchmarks/gatling/docker-compose.yaml b/benchmarks/gatling/docker-compose.yaml new file mode 100644 index 0000000..c264fb7 --- /dev/null +++ b/benchmarks/gatling/docker-compose.yaml @@ -0,0 +1,19 @@ +name: gatling-benchmark +include: + - ../dumb-server/docker-compose.yaml +services: + gatling: + container_name: gatling + image: gatling + build: + context: ./ + dockerfile: Dockerfile + network_mode: host + working_dir: /app + volumes: + - ./:/app + deploy: + resources: + limits: + cpus: '2' + memory: 2G diff --git a/benchmarks/ghz/Dockerfile b/benchmarks/ghz/Dockerfile new file mode 100644 index 0000000..b54c863 --- /dev/null +++ b/benchmarks/ghz/Dockerfile @@ -0,0 +1,5 @@ +FROM ubuntu:24.04 +WORKDIR /app +ADD https://github.com/bojand/ghz/releases/download/v0.120.0/ghz-linux-x86_64.tar.gz ./ +RUN tar xvf ghz-linux-x86_64.tar.gz +ENTRYPOINT ["/app/ghz"] diff --git a/benchmarks/ghz/README.md b/benchmarks/ghz/README.md new file mode 100644 index 0000000..b395057 --- /dev/null +++ b/benchmarks/ghz/README.md @@ -0,0 +1,6 @@ +How to run +- Run in docker (needs docker-compose v2): `docker compose up --abort-on-container-exit` +- Run local: + - Install [ghz](https://ghz.sh/docs/install) + - Run dumb server: `(cd ../dumb-server && make run)` + - Execute `./run.sh` diff --git a/benchmarks/ghz/REAME.md b/benchmarks/ghz/REAME.md deleted file mode 100644 index 9677fb5..0000000 --- a/benchmarks/ghz/REAME.md +++ /dev/null @@ -1,4 +0,0 @@ -How to run -- Install [framer](https://ghz.sh/docs/install) -- Run dumb server: `(cd ../dumb-server && make run)` -- Execute `./run.sh` diff --git a/benchmarks/ghz/docker-compose.yaml b/benchmarks/ghz/docker-compose.yaml new file mode 100644 index 0000000..e282ee1 --- /dev/null +++ b/benchmarks/ghz/docker-compose.yaml @@ -0,0 +1,25 @@ +name: ghz-benchmark +include: + - ../dumb-server/docker-compose.yaml +services: + ghz: + container_name: ghz + image: ghz + build: + context: ./ + dockerfile: Dockerfile + network_mode: host + volumes: + - ../dumb-server/api/api.proto:/tmp/api.proto + - ./ghz-data.bin:/tmp/ghz-data.bin + command: > + --insecure --async + --proto /tmp/api.proto + --call test.api.TestApi/Test + -c 10 --total 100000 + -B /tmp/ghz-data.bin localhost:9090 + deploy: + resources: + limits: + cpus: '2' + memory: 2G diff --git a/benchmarks/h2load/Dockerfile b/benchmarks/h2load/Dockerfile new file mode 100644 index 0000000..c0613db --- /dev/null +++ b/benchmarks/h2load/Dockerfile @@ -0,0 +1,3 @@ +FROM ubuntu:24.04 +RUN apt-get update && apt-get install -y nghttp2-client +ENTRYPOINT ["h2load"] diff --git a/benchmarks/h2load/README.md b/benchmarks/h2load/README.md new file mode 100644 index 0000000..eb4ea08 --- /dev/null +++ b/benchmarks/h2load/README.md @@ -0,0 +1,6 @@ +How to run +- Run in docker (needs docker-compose v2): `docker compose up --abort-on-container-exit` +- Run local: + * Install h2load: `sudo apt install nghttp2-client` + * Run dumb server: `(cd ../dumb-server && make run)` + * Execute `./run.sh` diff --git a/benchmarks/h2load/REAME.md b/benchmarks/h2load/REAME.md deleted file mode 100644 index 7391af4..0000000 --- a/benchmarks/h2load/REAME.md +++ /dev/null @@ -1,4 +0,0 @@ -How to run -- Install h2load: `sudo apt install nghttp2-client` -- Run dumb server: `(cd ../dumb-server && make run)` -- Execute `./run.sh` diff --git a/benchmarks/h2load/docker-compose.yaml b/benchmarks/h2load/docker-compose.yaml new file mode 100644 index 0000000..e6f3a5c --- /dev/null +++ b/benchmarks/h2load/docker-compose.yaml @@ -0,0 +1,24 @@ +name: h2load-benchmark +include: + - ../dumb-server/docker-compose.yaml +services: + h2load: + container_name: h2load + image: h2load + build: + context: ./ + dockerfile: Dockerfile + network_mode: host + volumes: + - ./data:/tmp/data + command: > + http://localhost:9090/test.api.TestService/Test + -n 6000000 -c 10 -m 200 + --data=/tmp/data + -H 'x-my-header-key1: my-header-val1' -H 'x-my-header-key2: my-header-val2' + -H 'te: trailers' -H 'content-type: application/grpc' + deploy: + resources: + limits: + cpus: '2' + memory: 2G diff --git a/benchmarks/k6/README.md b/benchmarks/k6/README.md new file mode 100644 index 0000000..f561e8b --- /dev/null +++ b/benchmarks/k6/README.md @@ -0,0 +1,6 @@ +How to run +- Run in docker (needs docker-compose v2): `docker compose up --abort-on-container-exit` +- Run local: + * [Install k6](https://k6.io/docs/get-started/installation/) + * Run dumb server: `(cd ../dumb-server && make run)` + * Execute `./run.sh` diff --git a/benchmarks/k6/REAME.md b/benchmarks/k6/REAME.md deleted file mode 100644 index 92edb3f..0000000 --- a/benchmarks/k6/REAME.md +++ /dev/null @@ -1,4 +0,0 @@ -How to run -- [Install k6](https://k6.io/docs/get-started/installation/) -- Run dumb server: `(cd ../dumb-server && make run)` -- Execute `./run.sh` diff --git a/benchmarks/k6/docker-compose.yaml b/benchmarks/k6/docker-compose.yaml new file mode 100644 index 0000000..e345c4e --- /dev/null +++ b/benchmarks/k6/docker-compose.yaml @@ -0,0 +1,19 @@ +name: k6-benchmark +include: + - ../dumb-server/docker-compose.yaml +services: + k6: + container_name: k6 + image: grafana/k6:0.51.0 + network_mode: host + volumes: + - ./script.js:/app/k6/script.js + - ../dumb-server/api/api.proto:/app/dumb-server/api/api.proto + working_dir: /app + command: > + run /app/k6/script.js + deploy: + resources: + limits: + cpus: '2' + memory: 2G diff --git a/benchmarks/pandora/Dockerfile b/benchmarks/pandora/Dockerfile new file mode 100644 index 0000000..70a66cd --- /dev/null +++ b/benchmarks/pandora/Dockerfile @@ -0,0 +1,5 @@ +FROM ubuntu:24.04 +WORKDIR /app +ADD https://github.com/yandex/pandora/releases/download/v0.5.26/pandora_0.5.26_linux_amd64 /usr/local/bin/pandora +RUN chmod +x /usr/local/bin/pandora +ENTRYPOINT ["pandora"] diff --git a/benchmarks/pandora/README.md b/benchmarks/pandora/README.md new file mode 100644 index 0000000..9930db1 --- /dev/null +++ b/benchmarks/pandora/README.md @@ -0,0 +1,6 @@ +How to run +- Run in docker (needs docker-compose v2): `docker compose up --abort-on-container-exit` +- Run local: + * Install [pandora](https://github.com/yandex/pandora#how-to-start) + * Run dumb server: `(cd ../dumb-server && make run)` + * Execute `./run.sh` diff --git a/benchmarks/pandora/REAME.md b/benchmarks/pandora/REAME.md deleted file mode 100644 index bd4b9a5..0000000 --- a/benchmarks/pandora/REAME.md +++ /dev/null @@ -1,4 +0,0 @@ -How to run -- Install [pandora](https://github.com/yandex/pandora#how-to-start) -- Run dumb server: `(cd ../dumb-server && make run)` -- Execute `./run.sh` diff --git a/benchmarks/pandora/config.yaml b/benchmarks/pandora/config.yaml index 3eaf129..97cc276 100644 --- a/benchmarks/pandora/config.yaml +++ b/benchmarks/pandora/config.yaml @@ -12,8 +12,9 @@ pools: type: discard rps: - duration: 1m - type: unlimited - discard_overflow: true + ops: 200_000 + type: const # type unlimited has bug + discard_overflow: false startup: type: once # Clients count. Yandex pandora don't use http/2 multiplexing, diff --git a/benchmarks/pandora/docker-compose.yaml b/benchmarks/pandora/docker-compose.yaml new file mode 100644 index 0000000..3736a4e --- /dev/null +++ b/benchmarks/pandora/docker-compose.yaml @@ -0,0 +1,20 @@ +name: pandora-benchmark +include: + - ../dumb-server/docker-compose.yaml +services: + pandora: + container_name: pandora + image: pandora + build: + context: ./ + dockerfile: Dockerfile + network_mode: host + volumes: + - ./:/app/ + command: > + config.yaml + deploy: + resources: + limits: + cpus: '2' + memory: 2G diff --git a/cmd/framer/cmd_load.go b/cmd/framer/cmd_load.go index 2210aa5..ab48667 100644 --- a/cmd/framer/cmd_load.go +++ b/cmd/framer/cmd_load.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/datasource" "github.com/ozontech/framer/loader" "github.com/ozontech/framer/loader/types" @@ -131,33 +132,29 @@ func (c *LoadCommand) Run( g, ctx := errgroup.WithContext(ctx) - loaderConfig := loader.DefaultConfig() - - var reporter types.Reporter = supersimpleReporter.New(loaderConfig.Timeout) + timeout := consts.DefaultTimeout + var reporter types.Reporter = supersimpleReporter.New(timeout) if c.Phout != "" { f, err := os.Create(c.Phout) if err != nil { return fmt.Errorf("creating phout file(%s): %w", c.Phout, err) } - phoutReporter := phoutReporter.New(f, loaderConfig.Timeout) - + phoutReporter := phoutReporter.New(f, timeout) reporter = multi.NewMutli(phoutReporter, reporter) } g.Go(reporter.Run) loaders := make([]*loader.Loader, clients) for i := 0; i < clients; i++ { - conn, err := createConn(ctx, loaderConfig.Timeout, addr) - // conn, err := createUnixConn(addr) + conn, err := createConn(ctx, timeout, addr) if err != nil { return fmt.Errorf("dialing: %w", err) } l, err := loader.NewLoader( - ctx, conn, reporter, + timeout, log, - loader.DefaultConfig(), ) if err != nil { return fmt.Errorf("loader setup: %w", err) diff --git a/cmd/framer/main.go b/cmd/framer/main.go index d4f76f2..57fd021 100644 --- a/cmd/framer/main.go +++ b/cmd/framer/main.go @@ -6,6 +6,7 @@ import ( "math" "net/http" _ "net/http/pprof" //nolint:gosec + "runtime/debug" "github.com/alecthomas/kong" mangokong "github.com/alecthomas/mango-kong" @@ -19,14 +20,12 @@ var CLI struct { DebugServer bool `help:"Enable debug server."` } -var Version = "unknown" - type VersionFlag string func (v VersionFlag) Decode(ctx *kong.DecodeContext) error { return nil } func (v VersionFlag) IsBool() bool { return true } func (v VersionFlag) BeforeApply(app *kong.Kong, vars kong.Vars) error { - fmt.Println(Version) + fmt.Println(getVersion()) app.Exit(0) return nil } @@ -66,3 +65,29 @@ The framer is used to generate test requests to grpc servers and measure codes a err := kongCtx.Run() kongCtx.FatalIfErrorf(err) } + +const unknownVersion = "unknown" + +var Version = unknownVersion + +func getVersion() string { + if Version != unknownVersion { + return Version + } + + info, ok := debug.ReadBuildInfo() + if !ok { + return Version + } + + for _, kv := range info.Settings { + if kv.Value == "" { + continue + } + if kv.Key == "vcs.revision" && kv.Value != "" { + return kv.Value + } + } + + return Version +} diff --git a/cmd/framer/main_benchmark_test.go b/cmd/framer/main_benchmark_test.go index 348e157..5ad1d1a 100644 --- a/cmd/framer/main_benchmark_test.go +++ b/cmd/framer/main_benchmark_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/datasource" "github.com/ozontech/framer/loader" "github.com/ozontech/framer/report/simple" @@ -52,7 +53,7 @@ func BenchmarkE2E(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - conn, err := createConn(ctx, 5*time.Second, "localhost:9090") + conn, err := createConn(ctx, consts.DefaultTimeout, "localhost:9090") if err != nil { b.Fatal(err) } @@ -61,14 +62,11 @@ func BenchmarkE2E(b *testing.B) { go func() { a.NoError(reporter.Run()) }() defer func() { a.NoError(reporter.Close()) }() - conf := loader.DefaultConfig() - conf.StreamsLimit = 16 l, err := loader.NewLoader( - ctx, conn, reporter, + consts.DefaultTimeout, zaptest.NewLogger(b), - loader.DefaultConfig(), ) if err != nil { b.Fatal(fmt.Errorf("loader setup: %w", err)) @@ -127,14 +125,11 @@ func BenchmarkE2EInMemDatasource(b *testing.B) { reportErr := make(chan error) go func() { reportErr <- reporter.Run() }() - conf := loader.DefaultConfig() - conf.StreamsLimit = 16 l, err := loader.NewLoader( - ctx, conn, reporter, + consts.DefaultTimeout, zaptest.NewLogger(b), - loader.DefaultConfig(), ) a.NoError(err) diff --git a/consts/consts.go b/consts/consts.go index eec6a72..d6939a7 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -7,4 +7,8 @@ const ( RecieveBufferSize = 2048 SendBatchTimeout = time.Millisecond RecieveBatchTimeout = time.Millisecond + + DefaultInitialWindowSize = 65_535 + DefaultTimeout = 11 * time.Second + DefaultMaxFrameSize = 16384 // Максимальная длина пейлоада фрейма в grpc. У http2 ограничение больше. ) diff --git a/datasource/file_benchmark_test.go b/datasource/file_benchmark_test.go index dd32c3f..5054497 100644 --- a/datasource/file_benchmark_test.go +++ b/datasource/file_benchmark_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/loader/types" hpackwrapper "github.com/ozontech/framer/utils/hpack_wrapper" ) @@ -27,7 +28,7 @@ func BenchmarkFileDataSource(b *testing.B) { go func() { defer close(done) for r := range rr { - r.SetUp(0, &noopHpackFieldWriter{}) + r.SetUp(consts.DefaultMaxFrameSize, 0, &noopHpackFieldWriter{}) b.SetBytes(int64(r.Size())) r.Release() } @@ -60,7 +61,7 @@ func BenchmarkRequestSetupNoop(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - r.SetUp(0, &noopHpackFieldWriter{}) + r.SetUp(consts.DefaultMaxFrameSize, 0, &noopHpackFieldWriter{}) b.SetBytes(int64(r.Size())) } } @@ -81,7 +82,7 @@ func BenchmarkRequestSetupHpack(b *testing.B) { hpackwrapper := hpackwrapper.NewWrapper() b.ResetTimer() for i := 0; i < b.N; i++ { - r.SetUp(0, hpackwrapper) + r.SetUp(consts.DefaultMaxFrameSize, 0, hpackwrapper) b.SetBytes(int64(r.Size())) } } diff --git a/datasource/inmem_benchmark_test.go b/datasource/inmem_benchmark_test.go index 993a93d..0247766 100644 --- a/datasource/inmem_benchmark_test.go +++ b/datasource/inmem_benchmark_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/loader/types" ) @@ -33,7 +34,7 @@ func BenchmarkInmemDataSource(b *testing.B) { }() for r := range rr { - r.SetUp(0, &noopHpackFieldWriter{}) + r.SetUp(consts.DefaultMaxFrameSize, 0, &noopHpackFieldWriter{}) b.SetBytes(int64(r.Size())) r.Release() } diff --git a/datasource/request.go b/datasource/request.go index 7e1b934..9d81fad 100644 --- a/datasource/request.go +++ b/datasource/request.go @@ -41,31 +41,6 @@ func NewRequestAdapterFactory(ops ...Option) *RequestAdapterFactory { return f } -type additionalHeadersOpts []string - -func (h additionalHeadersOpts) apply(f *RequestAdapterFactory) { - for i := 0; i < len(h); i += 2 { - k, v := h[i], h[i+1] - if strings.HasPrefix(k, ":") { - f.staticPseudoHeaders = append(f.staticPseudoHeaders, k, v) - } else { - f.staticRegularHeaders = append(f.staticRegularHeaders, k, v) - } - } -} - -func WithAdditionalHeader(k, v string) Option { - return additionalHeadersOpts([]string{k, v}) -} - -func WithAdditionalHeaders(headers []string) Option { - return additionalHeadersOpts(headers) -} - -func WithTimeout(t time.Duration) Option { - return additionalHeadersOpts([]string{"grpc-timeout", grpcutil.EncodeDuration(t)}) -} - func (f *RequestAdapterFactory) Build() *RequestAdapter { return NewRequestAdapter( f.isAllowedMeta, @@ -142,8 +117,6 @@ func (a *RequestAdapter) setData(data model.Data) { a.data = data } func (a *RequestAdapter) FullMethodName() string { return unsafeString(a.data.Method) } func (a *RequestAdapter) Tag() string { return unsafeString(a.data.Tag) } -const maxFramePayloadLen int = 16384 // Максимальная длина пейлоада фрейма в grpc. У http2 ограничение больше. - // TODO(pgribanov): после реализации собственной системы энкодинга хедеров, // отказаться от unsafe func unsafeString(b []byte) string { @@ -152,6 +125,7 @@ func unsafeString(b []byte) string { } func (a *RequestAdapter) setUpHeaders( + maxFramePayloadLen int, streamID uint32, hpack types.HPackFieldWriter, ) { @@ -219,6 +193,7 @@ func (a *RequestAdapter) setUpHeaders( } func (a *RequestAdapter) setUpPayload( + maxFramePayloadLen int, streamID uint32, ) { data := a.data @@ -293,6 +268,7 @@ func (a *RequestAdapter) Size() int { } func (a *RequestAdapter) SetUp( + maxFramePayloadLen int, streamID uint32, hpackFieldWriter types.HPackFieldWriter, ) []types.Frame { @@ -301,8 +277,33 @@ func (a *RequestAdapter) SetUp( a.frames = a.frames[:0] - a.setUpHeaders(streamID, hpackFieldWriter) - a.setUpPayload(streamID) + a.setUpHeaders(maxFramePayloadLen, streamID, hpackFieldWriter) + a.setUpPayload(maxFramePayloadLen, streamID) return a.frames } + +func WithAdditionalHeader(k, v string) Option { + return additionalHeadersOpts([]string{k, v}) +} + +func WithAdditionalHeaders(headers []string) Option { + return additionalHeadersOpts(headers) +} + +func WithTimeout(t time.Duration) Option { + return additionalHeadersOpts([]string{"grpc-timeout", grpcutil.EncodeDuration(t)}) +} + +type additionalHeadersOpts []string + +func (h additionalHeadersOpts) apply(f *RequestAdapterFactory) { + for i := 0; i < len(h); i += 2 { + k, v := h[i], h[i+1] + if strings.HasPrefix(k, ":") { + f.staticPseudoHeaders = append(f.staticPseudoHeaders, k, v) + } else { + f.staticRegularHeaders = append(f.staticRegularHeaders, k, v) + } + } +} diff --git a/datasource/request_test.go b/datasource/request_test.go index 5b10eb2..eaa8231 100644 --- a/datasource/request_test.go +++ b/datasource/request_test.go @@ -9,6 +9,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/formats/model" hpackwrapper "github.com/ozontech/framer/utils/hpack_wrapper" ) @@ -76,7 +77,7 @@ func TestRequest1(t *testing.T) { hpw := hpackwrapper.NewWrapper() const streamID uint32 = 123 - frames := r.SetUp(streamID, hpw) + frames := r.SetUp(consts.DefaultMaxFrameSize, streamID, hpw) a.Len(frames, 2) for _, f := range frames { for _, c := range f.Chunks { diff --git a/loader/e2e_test.go b/loader/e2e_test.go index cc0f472..93f384e 100644 --- a/loader/e2e_test.go +++ b/loader/e2e_test.go @@ -10,6 +10,7 @@ import ( "os" "testing" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/datasource" "github.com/ozontech/framer/loader/types" "github.com/stretchr/testify/assert" @@ -26,7 +27,10 @@ func TestE2E(t *testing.T) { log := zaptest.NewLogger(t) a := assert.New(t) clientConn, serverConn := net.Pipe() - l := newLoader(clientConn, nooReporter{}, log, DefaultConfig()) + l := newLoader( + clientConn, nooReporter{}, + loaderConfig{timeout: consts.DefaultTimeout}, log, + ) requestsFile, err := os.Open("../test_files/requests") if err != nil { diff --git a/loader/loader.go b/loader/loader.go index e255450..f98cce2 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -15,12 +15,14 @@ import ( "golang.org/x/net/http2" "golang.org/x/sync/errgroup" + "github.com/ozontech/framer/consts" fc "github.com/ozontech/framer/loader/flowcontrol" "github.com/ozontech/framer/loader/reciever" "github.com/ozontech/framer/loader/sender" streamsPool "github.com/ozontech/framer/loader/streams/pool" streamsStore "github.com/ozontech/framer/loader/streams/store" "github.com/ozontech/framer/loader/types" + hpackwrapper "github.com/ozontech/framer/utils/hpack_wrapper" ) var clientPreface = []byte(http2.ClientPreface) @@ -37,8 +39,6 @@ type Loader struct { streamsStore types.StreamStore timeoutQueue TimeoutQueue - conf Config - sender *sender.Sender reciever *reciever.Reciever loaderID int32 @@ -46,61 +46,79 @@ type Loader struct { log *zap.Logger } -type Config struct { - Timeout time.Duration // таймаут для запросов - StreamsLimit uint32 `config:"max-open-streams" validate:"min=0"` -} - -const defaultTimeout = 11 * time.Second - -func DefaultConfig() Config { - return Config{ - Timeout: defaultTimeout, - StreamsLimit: 0, - } -} - -var i int32 - func NewLoader( - ctx context.Context, conn net.Conn, reporter types.Reporter, + timeout time.Duration, log *zap.Logger, - conf Config, ) (*Loader, error) { - l := newLoader(conn, reporter, log, conf) - setupCtx, setupCancel := context.WithTimeout(ctx, conf.Timeout) - defer setupCancel() - return l, l.setup(setupCtx) + conf := loaderConfig{timeout: timeout} + + err := conn.SetDeadline(time.Now().Add(conf.timeout)) + if err != nil { + return nil, fmt.Errorf("set conn deadline: %w", err) + } + + err = setupHTTP2(conn, conn, &conf) + if err != nil { + return nil, err + } + + return newLoader(conn, reporter, conf, log), nil } +type loaderConfig struct { + timeout time.Duration + maxConcurrentStreams uint32 + initialWindowSize uint32 + maxDymanicTableSize uint32 + maxFrameSize uint32 +} + +var i int32 + func newLoader( conn net.Conn, reporter types.LoaderReporter, + conf loaderConfig, log *zap.Logger, - conf Config, ) *Loader { + if conf.timeout == 0 { + conf.timeout = consts.DefaultTimeout + } loaderID := atomic.AddInt32(&i, 1) log = log.Named("loader").With(zap.Int32("loader-id", loaderID)) log.Debug("loader created") - // streamsStore := streamsStore.NewStreamsNoop() streamsStore := streamsStore.NewShardedStreamsMap(16, func() types.StreamStore { return streamsStore.NewStreamsMap() }) - // streamsStore := NewLimitedStreams(NewStreamsSyncMap(), loaderID, conf.StreamsLimit) - // streamsStore := NewLimitedStreams(NewStreamsMap(), loaderID, conf.StreamsLimit) - timeoutQueue := NewTimeoutQueue(conf.Timeout) + timeoutQueue := NewTimeoutQueue(conf.timeout) + fcConn := fc.NewFlowControl(consts.DefaultInitialWindowSize) // для соединения (по спеке игнорирует SETTINGS_INITIAL_WINDOW_SIZE) - // TODO(pgribanov): math.MaxUint32? - fcConn := fc.NewFlowControl(math.MaxUint32) // для соединения (не может меняться в течение жизни соединения) + var streamPoolOpts []streamsPool.Opt + if conf.maxConcurrentStreams != 0 { + streamPoolOpts = append(streamPoolOpts, streamsPool.WithMaxConcurrentStreams(conf.maxConcurrentStreams)) + } + if conf.initialWindowSize != 0 { + streamPoolOpts = append(streamPoolOpts, streamsPool.WithInitialWindowSize(conf.initialWindowSize)) + } + streamsPool := streamsPool.NewStreamsPool(reporter, streamPoolOpts...) + + var hpackWrapperOpts []hpackwrapper.Opt + if conf.maxDymanicTableSize != 0 { + hpackWrapperOpts = append(hpackWrapperOpts, hpackwrapper.WithMaxDynamicTableSize(conf.maxDymanicTableSize)) + } + hpackWrapper := hpackwrapper.NewWrapper(hpackWrapperOpts...) + + maxFrameSize := consts.DefaultMaxFrameSize + if conf.maxFrameSize != 0 { + maxFrameSize = int(conf.maxFrameSize) + } priorityFramesCh := make(chan []byte, 1) - streamsPool := streamsPool.NewStreamsPool(reporter, 1024, conf.StreamsLimit) return &Loader{ conn: conn, - conf: conf, timeoutQueue: timeoutQueue, log: log, loaderID: loaderID, @@ -108,34 +126,16 @@ func newLoader( streamsStore: streamsStore, streamsPool: streamsPool, - sender: sender.NewSender(conn, fcConn, priorityFramesCh, streamsPool, streamsStore), - reciever: reciever.NewReciever(conn, fcConn, priorityFramesCh, streamsStore), + sender: sender.NewSender( + conn, fcConn, priorityFramesCh, streamsPool, + streamsStore, hpackWrapper, maxFrameSize, + ), + reciever: reciever.NewReciever( + conn, fcConn, priorityFramesCh, streamsStore, + ), } } -func (l *Loader) setup(ctx context.Context) (err error) { - deadline, ok := ctx.Deadline() - if ok { - err = l.conn.SetDeadline(deadline) - if err != nil { - return fmt.Errorf("set conn deadline: %w", err) - } - } - - connConf, err := setupHTTP2(l.conn, l.conn) - if err != nil { - return err - } - - if connConf.InitialWindowSize != 0 { - l.streamsPool.SetInitialWindowSize(connConf.InitialWindowSize) - } - if connConf.MaxConcurrentStreams != 0 { - l.streamsPool.SetLimit(connConf.MaxConcurrentStreams) - } - return nil -} - func (l *Loader) Shutdown(ctx context.Context) (err error) { defer func() { l.timeoutQueue.Close() @@ -264,58 +264,59 @@ func (l *Loader) DoRequest(req types.Req) { l.sender.Send(req) } -type connConfig struct { - InitialWindowSize uint32 - MaxConcurrentStreams uint32 -} - -func setupHTTP2(r io.Reader, w io.Writer) (connConfig, error) { - var conf connConfig - +func setupHTTP2(r io.Reader, w io.Writer, conf *loaderConfig) error { // we should not check n, because Write must return error on n < len(clientPreface) _, err := w.Write(clientPreface) if err != nil { - return conf, fmt.Errorf("write http2 preface: %w", err) + return fmt.Errorf("write http2 preface: %w", err) } framer := http2.NewFramer(w, r) // handle settings - { - frame, err := framer.ReadFrame() - if err != nil { - return conf, fmt.Errorf("read settings frame: %w", err) - } + frame, err := framer.ReadFrame() + if err != nil { + return fmt.Errorf("read settings frame: %w", err) + } - sf, ok := frame.(*http2.SettingsFrame) - if !ok { - return conf, errors.New("protocol error: first frame from server is not settings") - } - if val, ok := sf.Value(http2.SettingInitialWindowSize); ok { - conf.InitialWindowSize = val - } - if val, ok := sf.Value(http2.SettingMaxConcurrentStreams); ok { - conf.MaxConcurrentStreams = val - } + sf, ok := frame.(*http2.SettingsFrame) + if !ok { + return errors.New("protocol error: first frame from server is not settings") + } - err = framer.WriteSettings(http2.Setting{ - ID: http2.SettingInitialWindowSize, - Val: math.MaxUint32 & 0x7fffffff, // mask off high reserved bit - }) - if err != nil { - return conf, fmt.Errorf("write settings frame: %w", err) + for i := 0; i < sf.NumSettings(); i++ { + s := sf.Setting(i) + switch s.ID { + case http2.SettingInitialWindowSize: + conf.initialWindowSize = s.Val + case http2.SettingMaxConcurrentStreams: + conf.maxConcurrentStreams = s.Val + case http2.SettingHeaderTableSize: + conf.maxDymanicTableSize = s.Val + case http2.SettingMaxFrameSize: + conf.maxFrameSize = s.Val + default: + return fmt.Errorf("got not supported setting: %s (%d)", s.ID.String(), s.Val) } + } - err = framer.WriteSettingsAck() - if err != nil { - return conf, fmt.Errorf("write settings ack: %w", err) - } + err = framer.WriteSettings(http2.Setting{ + ID: http2.SettingInitialWindowSize, + Val: math.MaxUint32 & 0x7fffffff, // mask off high reserved bit + }) + if err != nil { + return fmt.Errorf("write settings frame: %w", err) + } - err = framer.WriteWindowUpdate(0, math.MaxUint32&0x7fffffff) - if err != nil { - return conf, fmt.Errorf("write window update frame: %w", err) - } + err = framer.WriteSettingsAck() + if err != nil { + return fmt.Errorf("write settings ack: %w", err) } - return conf, nil + err = framer.WriteWindowUpdate(0, math.MaxUint32&0x7fffffff) + if err != nil { + return fmt.Errorf("write window update frame: %w", err) + } + + return nil } diff --git a/loader/reciever/processor.go b/loader/reciever/processor.go index e3c5002..0f12598 100644 --- a/loader/reciever/processor.go +++ b/loader/reciever/processor.go @@ -2,6 +2,7 @@ package reciever import ( "bytes" + "errors" "fmt" "github.com/ozontech/framer/frameheader" @@ -10,6 +11,8 @@ import ( "golang.org/x/net/http2/hpack" ) +var ErrFrameTypeNotSupported = errors.New("frame type not supported") + type FrameTypeProcessor interface { Process(header frameheader.FrameHeader, payload []byte, incomplete bool) error } @@ -30,9 +33,12 @@ func NewDefaultProcessor( ) *Processor { headersFrameProcessor := newHeadersFrameProcessor(streams) return NewProcessor([]FrameTypeProcessor{ - http2.FrameData: newDataFrameProcessor(priorityFramesChan, streams), - http2.FrameHeaders: headersFrameProcessor, - http2.FrameRSTStream: newRSTStreamFrameProcessor(streams), + http2.FrameData: newDataFrameProcessor(priorityFramesChan, streams), + http2.FrameHeaders: headersFrameProcessor, + // http2.FramePriority not supported + http2.FrameRSTStream: newRSTStreamFrameProcessor(streams), + http2.FrameSettings: settingsProcessor{}, + // http2.FramePushPromise not supported http2.FramePing: newPingFrameProcessor(priorityFramesChan), http2.FrameGoAway: newGoAwayFrameProcessor(), http2.FrameWindowUpdate: newWindowUpdateFrameProcessor(streams, fcConn), @@ -70,11 +76,13 @@ func (p *Processor) process(buf []byte) error { // } sp := p.subprocessors[header.Type()] - if sp != nil { - err = sp.Process(header, b, status == StatusPayloadIncomplete) - if err != nil { - return err - } + if sp == nil { + return fmt.Errorf("%w: %s", ErrFrameTypeNotSupported, header.Type().String()) + } + + err = sp.Process(header, b, status == StatusPayloadIncomplete) + if err != nil { + return err } if status == StatusFrameDone { @@ -175,12 +183,6 @@ func newHeadersFrameProcessor(streams types.StreamStore) *headersFrameProcessor return p } -// func printAllocs() { -// var m runtime.MemStats -// runtime.ReadMemStats(&m) -// println(m.Mallocs, m.Frees) -// } - func (p *headersFrameProcessor) OnHeader(f hpack.HeaderField) { p.currentStream.OnHeader(f.Name, f.Value) } @@ -314,3 +316,20 @@ func (p *goAwayFrameProcessor) Process(_ frameheader.FrameHeader, payload []byte p.debugData = p.debugData[:0] return err } + +type settingsProcessor struct{} + +func (p settingsProcessor) Process( + header frameheader.FrameHeader, + payload []byte, + incomplete bool, +) error { + if incomplete { + return nil + } + + if !header.Flags().Has(http2.FlagSettingsAck) { + return errors.New("update settings in runtime not supported") + } + return nil +} diff --git a/loader/sender/sender.go b/loader/sender/sender.go index 10f73e7..1a30be9 100644 --- a/loader/sender/sender.go +++ b/loader/sender/sender.go @@ -23,8 +23,9 @@ type frame struct { } type Sender struct { - streamPool *streamsPool.StreamsPool - streamStore types.StreamStore + maxFrameSize int + streamPool *streamsPool.StreamsPool + streamStore types.StreamStore fcConn types.FlowControl conn io.Writer @@ -43,13 +44,16 @@ func NewSender( priorityChunkChan chan []byte, streamPool *streamsPool.StreamsPool, streamStore types.StreamStore, + hpackEncWrapper *hpackwrapper.Wrapper, + maxFrameSize int, ) *Sender { return &Sender{ + maxFrameSize, streamPool, streamStore, fcConn, conn, - hpackwrapper.NewWrapper(), + hpackEncWrapper, 1, make(chan writeCmd), @@ -212,7 +216,7 @@ func (s *Sender) Send(a types.Req) { func (s *Sender) send(a types.Req) { // n.Add(1) s.streamID += 2 - frames := a.SetUp(s.streamID, s.hpackEncWrapper) + frames := a.SetUp(s.maxFrameSize, s.streamID, s.hpackEncWrapper) stream := s.streamPool.Acquire(s.streamID, a.Tag()) stream.SetSize(a.Size()) diff --git a/loader/streams/pool/pool.go b/loader/streams/pool/pool.go index 0ef2bc9..9d3b7d0 100644 --- a/loader/streams/pool/pool.go +++ b/loader/streams/pool/pool.go @@ -4,12 +4,11 @@ import ( "math" "sync" + "github.com/ozontech/framer/consts" "github.com/ozontech/framer/loader/flowcontrol" "github.com/ozontech/framer/loader/types" ) -const defaultInitialWindowSize = 65535 - type StreamsPool struct { reporter types.LoaderReporter @@ -21,21 +20,18 @@ type StreamsPool struct { initialWindowSize uint32 } -func NewStreamsPool( - reporter types.LoaderReporter, - initSize uint32, - limit uint32, // limit = 0 интерпретируется как неограниченное количество -) *StreamsPool { - if limit == 0 { - limit = math.MaxUint32 - } - return &StreamsPool{ +func NewStreamsPool(reporter types.LoaderReporter, opts ...Opt) *StreamsPool { + p := &StreamsPool{ reporter: reporter, cond: sync.NewCond(&sync.Mutex{}), - pool: make([]*streamImpl, 0, initSize), - maxConcurrentStreams: limit, - initialWindowSize: defaultInitialWindowSize, + pool: make([]*streamImpl, 0, 1024), + maxConcurrentStreams: math.MaxUint32, + initialWindowSize: consts.DefaultInitialWindowSize, + } + for _, o := range opts { + o.apply(p) } + return p } func (p *StreamsPool) Acquire(streamID uint32, tag string) types.Stream { @@ -79,18 +75,6 @@ func (p *StreamsPool) InUse() uint32 { return p.inUse } -func (p *StreamsPool) SetInitialWindowSize(size uint32) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - p.initialWindowSize = size -} - -func (p *StreamsPool) SetLimit(limit uint32) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - p.maxConcurrentStreams = limit -} - func (p *StreamsPool) WaitAllReleased() <-chan struct{} { ch := make(chan struct{}) @@ -121,3 +105,19 @@ func (s *streamImpl) End() { s.StreamState.End() s.pool.release(s) } + +type Opt interface { + apply(*StreamsPool) +} + +type WithMaxConcurrentStreams uint32 + +func (s WithMaxConcurrentStreams) apply(p *StreamsPool) { + p.maxConcurrentStreams = uint32(s) +} + +type WithInitialWindowSize uint32 + +func (s WithInitialWindowSize) apply(p *StreamsPool) { + p.initialWindowSize = uint32(s) +} diff --git a/loader/streams/pool/pool_benchmark_test.go b/loader/streams/pool/pool_benchmark_test.go index 7530428..14ae755 100644 --- a/loader/streams/pool/pool_benchmark_test.go +++ b/loader/streams/pool/pool_benchmark_test.go @@ -8,7 +8,7 @@ import ( ) func BenchmarkStreamsPool(b *testing.B) { - p := pool.NewStreamsPool(noop.New(), 1024, 0) + p := pool.NewStreamsPool(noop.New()) for i := 0; i < b.N; i++ { s := p.Acquire(0, "") s.End() diff --git a/loader/streams/store/store.go b/loader/streams/store/store.go index dd4ed26..7027526 100644 --- a/loader/streams/store/store.go +++ b/loader/streams/store/store.go @@ -95,7 +95,7 @@ type ShardedStreamsMap struct { func NewShardedStreamsMap(size uint32, build func() types.StreamStore) *ShardedStreamsMap { shards := make([]types.StreamStore, size*2) - for i := 1; i < len(shards); i++ { + for i := 1; i < len(shards); i += 2 { shards[i] = build() } return &ShardedStreamsMap{shards, size - 1} @@ -106,7 +106,8 @@ func (s *ShardedStreamsMap) shard(id uint32) types.StreamStore { } func (s *ShardedStreamsMap) Each(fn func(types.Stream)) { - for _, shard := range s.shards { + for i := 1; i < len(s.shards); i += 2 { + shard := s.shards[i] shard.Each(fn) } } diff --git a/loader/types/req.go b/loader/types/req.go index 3b28d98..5083e2c 100644 --- a/loader/types/req.go +++ b/loader/types/req.go @@ -8,7 +8,7 @@ type HPackFieldWriter interface { } type Req interface { - SetUp(streamID uint32, fieldWriter HPackFieldWriter) []Frame + SetUp(maxFramePayloadLen int, streamID uint32, fieldWriter HPackFieldWriter) []Frame Tag() string Size() int Releaser diff --git a/report/phout/phout.go b/report/phout/phout.go index fd445d3..0dadd48 100644 --- a/report/phout/phout.go +++ b/report/phout/phout.go @@ -141,7 +141,7 @@ func (s *streamState) result() []byte { s.reportLine = strconv.AppendInt(s.reportLine, rtt, 10) s.reportLine = append(s.reportLine, tabChar) - // keyConnectMicro // TODO (skipor): set all for HTTP using httptrace and helper structs + // keyConnectMicro s.reportLine = append(s.reportLine, '0', tabChar) // keySendMicro s.reportLine = append(s.reportLine, '0', tabChar) @@ -149,7 +149,7 @@ func (s *streamState) result() []byte { s.reportLine = append(s.reportLine, '0', tabChar) // keyReceiveMicro s.reportLine = append(s.reportLine, '0', tabChar) - // keyIntervalEventMicro // TODO: understand WTF is that mean and set it right. + // keyIntervalEventMicro s.reportLine = append(s.reportLine, '0', tabChar) // keyRequestBytes s.reportLine = strconv.AppendInt(s.reportLine, int64(s.reqSize), 10) diff --git a/utils/hpack_wrapper/wrapper.go b/utils/hpack_wrapper/wrapper.go index f0f4da6..f4e2156 100644 --- a/utils/hpack_wrapper/wrapper.go +++ b/utils/hpack_wrapper/wrapper.go @@ -11,9 +11,12 @@ type Wrapper struct { enc *hpack.Encoder } -func NewWrapper() *Wrapper { +func NewWrapper(opts ...Opt) *Wrapper { wrapper := &Wrapper{} wrapper.enc = hpack.NewEncoder(wrapper) + for _, o := range opts { + o.apply(wrapper) + } return wrapper } @@ -24,3 +27,13 @@ func (ww *Wrapper) WriteField(k, v string) error { Value: v, }) } + +type Opt interface { + apply(*Wrapper) +} + +type WithMaxDynamicTableSize uint32 + +func (s WithMaxDynamicTableSize) apply(w *Wrapper) { + w.enc.SetMaxDynamicTableSize(uint32(s)) +}