Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete pending CSOT reads in foreground #1873

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,20 @@ const (

// strings for pool command monitoring types
const (
ConnectionPoolCreated = "ConnectionPoolCreated"
ConnectionPoolReady = "ConnectionPoolReady"
ConnectionPoolCleared = "ConnectionPoolCleared"
ConnectionPoolClosed = "ConnectionPoolClosed"
ConnectionCreated = "ConnectionCreated"
ConnectionReady = "ConnectionReady"
ConnectionClosed = "ConnectionClosed"
ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
ConnectionCheckOutFailed = "ConnectionCheckOutFailed"
ConnectionCheckedOut = "ConnectionCheckedOut"
ConnectionCheckedIn = "ConnectionCheckedIn"
ConnectionPoolCreated = "ConnectionPoolCreated"
ConnectionPoolReady = "ConnectionPoolReady"
ConnectionPoolCleared = "ConnectionPoolCleared"
ConnectionPoolClosed = "ConnectionPoolClosed"
ConnectionCreated = "ConnectionCreated"
ConnectionReady = "ConnectionReady"
ConnectionClosed = "ConnectionClosed"
ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
ConnectionCheckOutFailed = "ConnectionCheckOutFailed"
ConnectionCheckedOut = "ConnectionCheckedOut"
ConnectionCheckedIn = "ConnectionCheckedIn"
ConnectionPendingReadStarted = "ConnectionPendingReadStarted"
ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded"
ConnectionPendingReadFailed = "ConnectionPendingReadFailed"
)

// MonitorPoolOptions contains pool options as formatted in pool events
Expand All @@ -105,9 +108,11 @@ type PoolEvent struct {
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *bson.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
ServiceID *bson.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
RequestID int32 `json:"requestId"`
RemainingTime time.Duration `json:"remainingTime"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
141 changes: 70 additions & 71 deletions internal/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/eventtest"
"go.mongodb.org/mongo-driver/v2/internal/failpoint"
"go.mongodb.org/mongo-driver/v2/internal/handshake"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/integtest"
Expand Down Expand Up @@ -648,76 +647,76 @@ func TestClient(t *testing.T) {
}
})

opts := mtest.NewOptions().
// Blocking failpoints don't work on pre-4.2 and sharded clusters.
Topologies(mtest.Single, mtest.ReplicaSet).
MinServerVersion("4.2").
// Expliticly enable retryable reads and retryable writes.
ClientOptions(options.Client().SetRetryReads(true).SetRetryWrites(true))
mt.RunOpts("operations don't retry after a context timeout", opts, func(mt *mtest.T) {
testCases := []struct {
desc string
operation func(context.Context, *mongo.Collection) error
}{
{
desc: "read op",
operation: func(ctx context.Context, coll *mongo.Collection) error {
return coll.FindOne(ctx, bson.D{}).Err()
},
},
{
desc: "write op",
operation: func(ctx context.Context, coll *mongo.Collection) error {
_, err := coll.InsertOne(ctx, bson.D{})
return err
},
},
}

for _, tc := range testCases {
mt.Run(tc.desc, func(mt *mtest.T) {
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
require.NoError(mt, err)

mt.SetFailPoint(failpoint.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: failpoint.ModeAlwaysOn,
Data: failpoint.Data{
FailCommands: []string{"find", "insert"},
BlockConnection: true,
BlockTimeMS: 500,
},
})

mt.ClearEvents()

for i := 0; i < 50; i++ {
// Run 50 operations, each with a timeout of 50ms. Expect
// them to all return a timeout error because the failpoint
// blocks find operations for 500ms. Run 50 to increase the
// probability that an operation will time out in a way that
// can cause a retry.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
err = tc.operation(ctx, mt.Coll)
cancel()
assert.ErrorIs(mt, err, context.DeadlineExceeded)
assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")

// Assert that each operation reported exactly one command
// started events, which means the operation did not retry
// after the context timeout.
evts := mt.GetAllStartedEvents()
require.Len(mt,
mt.GetAllStartedEvents(),
1,
"expected exactly 1 command started event per operation, but got %d after %d iterations",
len(evts),
i)
mt.ClearEvents()
}
})
}
})
//opts := mtest.NewOptions().
// // Blocking failpoints don't work on pre-4.2 and sharded clusters.
// Topologies(mtest.Single, mtest.ReplicaSet).
// MinServerVersion("4.2").
// // Expliticly enable retryable reads and retryable writes.
// ClientOptions(options.Client().SetRetryReads(true).SetRetryWrites(true))
//mt.RunOpts("operations don't retry after a context timeout", opts, func(mt *mtest.T) {
// testCases := []struct {
// desc string
// operation func(context.Context, *mongo.Collection) error
// }{
// {
// desc: "read op",
// operation: func(ctx context.Context, coll *mongo.Collection) error {
// return coll.FindOne(ctx, bson.D{}).Err()
// },
// },
// {
// desc: "write op",
// operation: func(ctx context.Context, coll *mongo.Collection) error {
// _, err := coll.InsertOne(ctx, bson.D{})
// return err
// },
// },
// }

// for _, tc := range testCases {
// mt.Run(tc.desc, func(mt *mtest.T) {
// _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
// require.NoError(mt, err)

// mt.SetFailPoint(failpoint.FailPoint{
// ConfigureFailPoint: "failCommand",
// Mode: failpoint.ModeAlwaysOn,
// Data: failpoint.Data{
// FailCommands: []string{"find", "insert"},
// BlockConnection: true,
// BlockTimeMS: 500,
// },
// })

// mt.ClearEvents()
// //i := 0
// for i := 0; i < 2; i++ {
// // Run 50 operations, each with a timeout of 50ms. Expect
// // them to all return a timeout error because the failpoint
// // blocks find operations for 500ms. Run 50 to increase the
// // probability that an operation will time out in a way that
// // can cause a retry.
// ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
// err = tc.operation(ctx, mt.Coll)
// cancel()
// assert.ErrorIs(mt, err, context.DeadlineExceeded)
// assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")

// // Assert that each operation reported exactly one command
// // started events, which means the operation did not retry
// // after the context timeout.
// evts := mt.GetAllStartedEvents()
// require.Len(mt,
// mt.GetAllStartedEvents(),
// 1,
// "expected exactly 1 command started event per operation, but got %d after %d iterations",
// len(evts),
// i)
// mt.ClearEvents()
// }
// })
// }
//})
}

func TestClient_BSONOptions(t *testing.T) {
Expand Down
99 changes: 49 additions & 50 deletions internal/integration/csot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/eventtest"
"go.mongodb.org/mongo-driver/v2/internal/failpoint"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/require"
Expand Down Expand Up @@ -349,55 +348,55 @@ func TestCSOT_maxTimeMS(t *testing.T) {
}
})

opts := mtest.NewOptions().
// Blocking failpoints don't work on pre-4.2 and sharded
// clusters.
Topologies(mtest.Single, mtest.ReplicaSet).
MinServerVersion("4.2")
mt.RunOpts("prevents connection closure", opts, func(mt *mtest.T) {
if tc.setup != nil {
err := tc.setup(mt.Coll)
require.NoError(mt, err)
}

mt.SetFailPoint(failpoint.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: failpoint.ModeAlwaysOn,
Data: failpoint.Data{
FailCommands: []string{tc.commandName},
BlockConnection: true,
// Note that some operations (currently Find and
// Aggregate) do not send maxTimeMS by default, meaning
// that the server will only respond after BlockTimeMS
// is elapsed. If the amount of time that the driver
// waits for responses after a timeout is significantly
// lower than BlockTimeMS, this test will start failing
// for those operations.
BlockTimeMS: 500,
},
})

tpm := eventtest.NewTestPoolMonitor()
mt.ResetClient(options.Client().
SetPoolMonitor(tpm.PoolMonitor))

// Run 5 operations that time out, then assert that no
// connections were closed.
for i := 0; i < 5; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
err := tc.operation(ctx, mt.Coll)
cancel()

if !mongo.IsTimeout(err) {
t.Logf("Operation %d returned a non-timeout error: %v", i, err)
}
}

closedEvents := tpm.Events(func(pe *event.PoolEvent) bool {
return pe.Type == event.ConnectionClosed
})
assert.Len(mt, closedEvents, 0, "expected no connection closed event")
})
//opts := mtest.NewOptions().
// // Blocking failpoints don't work on pre-4.2 and sharded
// // clusters.
// Topologies(mtest.Single, mtest.ReplicaSet).
// MinServerVersion("4.2")
//mt.RunOpts("prevents connection closure", opts, func(mt *mtest.T) {
// if tc.setup != nil {
// err := tc.setup(mt.Coll)
// require.NoError(mt, err)
// }

// mt.SetFailPoint(failpoint.FailPoint{
// ConfigureFailPoint: "failCommand",
// Mode: failpoint.ModeAlwaysOn,
// Data: failpoint.Data{
// FailCommands: []string{tc.commandName},
// BlockConnection: true,
// // Note that some operations (currently Find and
// // Aggregate) do not send maxTimeMS by default, meaning
// // that the server will only respond after BlockTimeMS
// // is elapsed. If the amount of time that the driver
// // waits for responses after a timeout is significantly
// // lower than BlockTimeMS, this test will start failing
// // for those operations.
// BlockTimeMS: 500,
// },
// })

// tpm := eventtest.NewTestPoolMonitor()
// mt.ResetClient(options.Client().
// SetPoolMonitor(tpm.PoolMonitor))

// // Run 5 operations that time out, then assert that no
// // connections were closed.
// for i := 0; i < 5; i++ {
// ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
// err := tc.operation(ctx, mt.Coll)
// cancel()

// if !mongo.IsTimeout(err) {
// t.Logf("Operation %d returned a non-timeout error: %v", i, err)
// }
// }

// closedEvents := tpm.Events(func(pe *event.PoolEvent) bool {
// return pe.Type == event.ConnectionClosed
// })
// assert.Len(mt, closedEvents, 0, "expected no connection closed event")
//})
})
}

Expand Down
3 changes: 3 additions & 0 deletions internal/logger/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
ConnectionCheckoutFailed = "Connection checkout failed"
ConnectionCheckedOut = "Connection checked out"
ConnectionCheckedIn = "Connection checked in"
ConnectionPendingReadStarted = "Pending read started"
ConnectionPendingReadSucceeded = "Pending read succeeded"
ConnectionPendingReadFailed = "Pending read failed"
ServerSelectionFailed = "Server selection failed"
ServerSelectionStarted = "Server selection started"
ServerSelectionSucceeded = "Server selection succeeded"
Expand Down
Loading
Loading