Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long running tests #16

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ GOBUILD=$(GOCMD) build
GOCLEAN=$(GOCMD) clean
GOTEST=$(GOCMD) test
BINARY_NAME=mongo-bench
MAIN_FILE=main.go

.PHONY: all
all: build

build: test
$(GOBUILD) -o $(BINARY_NAME) $(MAIN_FILE)
$(GOBUILD) -o $(BINARY_NAME) *.go
@echo "Build complete: $(BINARY_NAME)"

run:
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ After building the tool, run it with customizable parameters:

- `-threads`: Number of concurrent threads to use for inserting, updating, deleting, or upserting documents.
- `-docs`: Total number of documents to process during the benchmark.
- `-duration`: Duration of the test in seconds (default: 0 seconds).
- `-largeDocs`: Use large documents (2K) (default: false).
- `-dropDb`: Drop the database before running the test (default: true).
- `-uri`: MongoDB connection URI.
- `-type`: Type of test to run. Accepts `insert`, `update`, `delete`, `upsert`, or `runAll`:
- `insert`: The tool will insert new documents.
- `update`: The tool will update existing documents (requires that documents have been inserted in a prior run).
- `delete`: The tool will delete existing documents.
- `upsert`: The tool will perform upserts, repeatedly updating a specified range.
- `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially.
- `delete`: The tool will delete existing documents. (just if `docs` is given)
- `upsert`: The tool will perform upserts, repeatedly updating a specified range. (just if `docs` is given)
- `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially. (just if `docs` is given)
- `runAll`: Runs the `insert`, `update` tests sequentially. (just if `duration` is given)

### Example Commands

Expand Down
80 changes: 80 additions & 0 deletions collection_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
)

// CollectionAPI defines an interface for MongoDB operations, allowing for testing
type CollectionAPI interface {
InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error)
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error)
CountDocuments(ctx context.Context, filter interface{}) (int64, error)
Drop(ctx context.Context) error
Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error)
}

// MongoDBCollection is a wrapper around mongo.Collection to implement CollectionAPI
type MongoDBCollection struct {
*mongo.Collection
}

func (c *MongoDBCollection) InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error) {
return c.Collection.InsertOne(ctx, document)
}

func (c *MongoDBCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return c.Collection.UpdateOne(ctx, filter, update, opts...)
}

func (c *MongoDBCollection) DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) {
return c.Collection.DeleteOne(ctx, filter)
}

func (c *MongoDBCollection) CountDocuments(ctx context.Context, filter interface{}) (int64, error) {
return c.Collection.CountDocuments(ctx, filter)
}

func (c *MongoDBCollection) Drop(ctx context.Context) error {
return c.Collection.Drop(ctx)
}

func (c *MongoDBCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) {
return c.Collection.Find(ctx, filter, opts...)
}

func fetchDocumentIDs(collection CollectionAPI) ([]primitive.ObjectID, error) {
var docIDs []primitive.ObjectID

cursor, err := collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1}))
if err != nil {
return nil, fmt.Errorf("failed to fetch document IDs: %v", err)
}
defer cursor.Close(context.Background())

for cursor.Next(context.Background()) {
var result bson.M
if err := cursor.Decode(&result); err != nil {
log.Printf("Failed to decode document: %v", err)
continue
}
// Check if `_id` is of type `ObjectId` and add to `docIDs`
if id, ok := result["_id"].(primitive.ObjectID); ok {
docIDs = append(docIDs, id)
} else {
log.Printf("Skipping document with unsupported _id type: %T", result["_id"])
}
}

if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}

return docIDs, nil
}
200 changes: 200 additions & 0 deletions docs_testing_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package main

import (
"context"
"encoding/csv"
"fmt"
"github.com/rcrowley/go-metrics"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"math/rand"
"os"
"sync"
"time"
)

type DocCountTestingStrategy struct{}

func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, config TestingConfig) {
tests := []string{"insert", "update", "delete", "upsert"}
for _, test := range tests {
t.runTest(collection, test, config, fetchDocumentIDs)
}
}

func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) {
if testType == "insert" || testType == "upsert" {
if config.DropDb {
if err := collection.Drop(context.Background()); err != nil {
log.Fatalf("Failed to drop collection: %v", err)
}
log.Println("Collection dropped. Starting new rate test...")
} else {
log.Println("Collection stays. Dropping disabled.")
}
} else {
log.Printf("Starting %s test...\n", testType)
}

insertRate := metrics.NewMeter()
var records [][]string
records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"})

var partitions [][]primitive.ObjectID

var threads = config.Threads
var docCount = config.DocCount

// Prepare partitions based on test type
switch testType {
case "delete":
// Fetch document IDs as ObjectId and partition them
docIDs, err := fetchDocIDs(collection)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}
partitions = make([][]primitive.ObjectID, threads)
for i, id := range docIDs {
partitions[i%threads] = append(partitions[i%threads], id)
}

case "insert", "upsert":
partitions = make([][]primitive.ObjectID, threads)
for i := 0; i < docCount; i++ {
partitions[i%threads] = append(partitions[i%threads], primitive.NewObjectID())
}

case "update":
docIDs, err := fetchDocIDs(collection)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}

partitions = make([][]primitive.ObjectID, threads)
for i := 0; i < len(docIDs); i++ {
docID := docIDs[rand.Intn(len(docIDs))]
partitions[i%threads] = append(partitions[i%threads], docID)
}
}

// Start the ticker just before starting the main workload goroutines
secondTicker := time.NewTicker(1 * time.Second)
defer secondTicker.Stop()
go func() {
for range secondTicker.C {
timestamp := time.Now().Unix()
count := insertRate.Count()
mean := insertRate.RateMean()
m1Rate := insertRate.Rate1()
m5Rate := insertRate.Rate5()
m15Rate := insertRate.Rate15()

log.Printf("Timestamp: %d, Document Count: %d, Mean Rate: %.2f docs/sec, m1_rate: %.2f, m5_rate: %.2f, m15_rate: %.2f",
timestamp, count, mean, m1Rate, m5Rate, m15Rate)

record := []string{
fmt.Sprintf("%d", timestamp),
fmt.Sprintf("%d", count),
fmt.Sprintf("%.6f", mean),
fmt.Sprintf("%.6f", m1Rate),
fmt.Sprintf("%.6f", m5Rate),
fmt.Sprintf("%.6f", m15Rate),
}
records = append(records, record)
}
}()

// Launch threads based on the specific workload type
var wg sync.WaitGroup
wg.Add(threads)

for i := 0; i < threads; i++ {
go func(partition []primitive.ObjectID) {
defer wg.Done()
for _, docID := range partition {
switch testType {
case "insert":
// Let MongoDB generate the _id automatically
doc := bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Insert failed: %v", err)
}

case "update":
filter := bson.M{"_id": docID}
update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}}
_, err := collection.UpdateOne(context.Background(), filter, update)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Update failed for _id %v: %v", docID, err)
}

case "upsert":
randomDocID := partition[rand.Intn(len(partition)/2)]
filter := bson.M{"_id": randomDocID}
update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}}
opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(context.Background(), filter, update, opts)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Upsert failed for _id %v: %v", docID, err)
}

case "delete":
// Use ObjectId in the filter for delete
filter := bson.M{"_id": docID}
result, err := collection.DeleteOne(context.Background(), filter)
if err != nil {
log.Printf("Delete failed for _id %v: %v", docID, err)
continue // Move to next document without retrying
}
if result.DeletedCount > 0 {
insertRate.Mark(1)
}
}
}
}(partitions[i])
}

wg.Wait()

// Final metrics recording
timestamp := time.Now().Unix()
count := insertRate.Count()
mean := insertRate.RateMean()
m1Rate := insertRate.Rate1()
m5Rate := insertRate.Rate5()
m15Rate := insertRate.Rate15()

finalRecord := []string{
fmt.Sprintf("%d", timestamp),
fmt.Sprintf("%d", count),
fmt.Sprintf("%.6f", mean),
fmt.Sprintf("%.6f", m1Rate),
fmt.Sprintf("%.6f", m5Rate),
fmt.Sprintf("%.6f", m15Rate),
}
records = append(records, finalRecord)

filename := fmt.Sprintf("benchmark_results_%s.csv", testType)
file, err := os.Create(filename)
if err != nil {
log.Fatalf("Failed to create CSV file: %v", err)
}
defer file.Close()

writer := csv.NewWriter(file)
if err := writer.WriteAll(records); err != nil {
log.Fatalf("Failed to write records to CSV: %v", err)
}
writer.Flush()

fmt.Printf("Benchmarking completed. Results saved to %s\n", filename)
}
Loading