Skip to content

Commit 111c4bc

Browse files
committed
WIP
1 parent f541322 commit 111c4bc

File tree

1 file changed

+58
-33
lines changed

1 file changed

+58
-33
lines changed

internal/integration/sessions_test.go

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"fmt"
1414
"reflect"
1515
"sync"
16+
"sync/atomic"
1617
"testing"
1718
"time"
1819

@@ -597,46 +598,70 @@ func TestSessionsProse(t *testing.T) {
597598
})
598599
*/
599600

601+
mt.ResetClient(options.Client())
602+
client := mt.Client
603+
heartbeatStarted := make(chan struct{})
604+
heartbeatSucceeded := make(chan struct{})
605+
var clusterTimeAdvanced uint32
606+
serverMonitor := &event.ServerMonitor{
607+
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
608+
fmt.Println("Server heartbeat started:", e.ConnectionID)
609+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
610+
fmt.Println("ServerHeartbeatStartedEvent: cluster time advanced")
611+
select {
612+
case heartbeatStarted <- struct{}{}:
613+
// NOOP
614+
default:
615+
// NOOP
616+
}
617+
}
618+
},
619+
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
620+
fmt.Println("Server heartbeat succeeded:", e.ConnectionID, e.Duration, e.Reply)
621+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
622+
fmt.Println("ServerHeartbeatSucceededEvent: cluster time advanced")
623+
select {
624+
case heartbeatSucceeded <- struct{}{}:
625+
// NOOP
626+
default:
627+
// NOOP
628+
}
629+
}
630+
},
631+
}
600632
pingOpts := mtest.NewOptions().
601633
CreateCollection(false).
602-
Topologies(mtest.ReplicaSet, mtest.LoadBalanced)
634+
ClientOptions(options.Client().
635+
SetServerMonitor(serverMonitor).
636+
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
637+
SetDirect(true)).
638+
ClientType(mtest.Pinned)
603639
mt.RunOpts("ping test", pingOpts, func(mt *mtest.T) {
604-
serverMonitor := &event.ServerMonitor{
605-
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
606-
fmt.Println("Server heartbeat started:", e.ConnectionID)
607-
},
608-
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
609-
fmt.Println("Server heartbeat succeeded:", e.ConnectionID, e.Duration, e.Reply)
610-
},
611-
}
640+
err := mt.Client.Ping(context.Background(), readpref.Primary())
641+
assert.NoError(mt, err, "expected no error, got: %v", err)
612642

613-
commandMonitor := &event.CommandMonitor{
614-
Started: func(_ context.Context, cse *event.CommandStartedEvent) {
615-
fmt.Println("Command started:", cse.CommandName, cse.Command)
616-
},
617-
Succeeded: func(_ context.Context, cse *event.CommandSucceededEvent) {
618-
fmt.Println("Command succeeded:", cse.CommandName, cse.Reply)
619-
},
620-
}
643+
_, err = client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
644+
require.NoError(mt, err, "expected no error inserting document, got: %v", err)
621645

622-
opts := options.Client().
623-
ApplyURI(mtest.ClusterURI()).
624-
SetHosts([]string{mtest.ClusterConnString().Hosts[0]}).
625-
SetDirect(true).
626-
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
627-
SetServerMonitor(serverMonitor).
628-
SetMonitor(commandMonitor)
646+
atomic.StoreUint32(&clusterTimeAdvanced, 1)
647+
<-heartbeatStarted
648+
<-heartbeatSucceeded
629649

630-
client, err := mongo.Connect(opts)
631-
require.NoError(mt, err, "expected no error connecting to client, got: %v", err)
632-
defer func() {
633-
err = client.Disconnect(context.Background())
634-
require.NoError(mt, err, "expected no error disconnecting client, got: %v", err)
635-
}()
650+
err = mt.Client.Ping(context.Background(), readpref.Primary())
651+
require.NoError(mt, err, "expected no error, got: %v", err)
636652

637-
err = client.Ping(context.Background(), readpref.Primary())
638-
assert.NoError(mt, err, "expected no error, got: %v", err)
639-
// mt.Fatal(mtest.ClusterURI(), mtest.ClusterConnString().Hosts, mtest.GlobalTopology().Description().Servers)
653+
succeededEvents := mt.GetAllSucceededEvents()
654+
require.Len(mt, succeededEvents, 2, "expected 2 succeeded events, got: %v", len(succeededEvents))
655+
require.Equal(mt, "ping", succeededEvents[0].CommandName, "expected first command to be ping, got: %v", succeededEvents[0].CommandName)
656+
initialClusterTime, err := succeededEvents[0].Reply.LookupErr("$clusterTime")
657+
require.NoError(mt, err, "$clusterTime not found in response")
658+
659+
startedEvents := mt.GetAllStartedEvents()
660+
require.Len(mt, startedEvents, 2, "expected 2 started events, got: %v", len(startedEvents))
661+
require.Equal(mt, "ping", startedEvents[1].CommandName, "expected second command to be ping, got: %v", startedEvents[1].CommandName)
662+
currentClusterTime, err := startedEvents[1].Command.LookupErr("$clusterTime")
663+
require.NoError(mt, err, "$clusterTime not found in commane")
664+
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
640665
})
641666
}
642667

0 commit comments

Comments
 (0)