diff --git a/config.go b/config.go index a22f0c50..5dde4340 100644 --- a/config.go +++ b/config.go @@ -7,8 +7,14 @@ import ( "slices" ) -// ConfigContext is a context that carries a configuration for quorum calls. -// It embeds context.Context and provides access to the Configuration. +// Configuration represents a static set of nodes on which multicast or +// quorum calls may be invoked. A configuration is created using [NewConfig]. +// A configuration should be treated as immutable. Therefore, methods that +// operate on a configuration always return a new Configuration instance. +type Configuration []*Node + +// ConfigContext is a context that carries a configuration for multicast or +// quorum calls. It embeds context.Context and provides access to the configuration. // // Use [Configuration.Context] to create a ConfigContext from an existing context. type ConfigContext struct { @@ -16,23 +22,17 @@ type ConfigContext struct { cfg Configuration } -// Configuration returns the Configuration associated with this context. +// Configuration returns the configuration associated with this context. func (c ConfigContext) Configuration() Configuration { return c.cfg } -// Configuration represents a static set of nodes on which quorum calls may be invoked. -// A configuration is created using [NewConfiguration] or [NewConfig]. A configuration -// should be treated as immutable. Therefore, methods that operate on a configuration -// always return a new Configuration instance. -type Configuration []*Node - // Context creates a new ConfigContext from the given parent context // and this configuration. // // Example: // -// config, _ := gorums.NewConfiguration(mgr, gorums.WithNodeList(addrs)) +// config, _ := gorums.NewConfig(gorums.WithNodeList(addrs), dialOpts...) // cfgCtx := config.Context(context.Background()) // resp, err := paxos.Prepare(cfgCtx, req) func (c Configuration) Context(parent context.Context) *ConfigContext { @@ -42,14 +42,6 @@ func (c Configuration) Context(parent context.Context) *ConfigContext { return &ConfigContext{Context: parent, cfg: c} } -// Deprecated: Use [NewConfig] instead. -func NewConfiguration(mgr *Manager, opt NodeListOption) (nodes Configuration, err error) { - if opt == nil { - return nil, fmt.Errorf("config: missing required node list") - } - return opt.newConfig(mgr) -} - // NewConfig returns a new [Configuration] based on the provided nodes and dial options. // // Example: @@ -60,10 +52,10 @@ func NewConfiguration(mgr *Manager, opt NodeListOption) (nodes Configuration, er // ) func NewConfig(nodes NodeListOption, opts ...DialOption) (Configuration, error) { if nodes == nil { - return nil, fmt.Errorf("gorums: missing required NodeListOption") + return nil, fmt.Errorf("config: missing required node list") } mgr := newOutboundManager(opts...) - cfg, err := NewConfiguration(mgr, nodes) + cfg, err := nodes.newConfig(mgr) if err != nil { _ = mgr.Close() return nil, err @@ -72,7 +64,6 @@ func NewConfig(nodes NodeListOption, opts ...DialOption) (Configuration, error) } // Extend returns a new Configuration combining c with new nodes from the provided NodeListOption. -// This is the only way to add nodes that are not yet registered with the manager. func (c Configuration) Extend(opt NodeListOption) (Configuration, error) { if len(c) == 0 { return nil, fmt.Errorf("config: cannot extend empty configuration") @@ -120,14 +111,6 @@ func (c Configuration) Equal(b Configuration) bool { return true } -// Manager returns the Manager that manages this configuration's nodes. -// Returns nil if the configuration is empty. -// -// Deprecated: Use [Configuration.Close] to close the configuration instead. -func (c Configuration) Manager() *Manager { - return c.mgr() -} - // mgr returns the outboundManager for this configuration's nodes. func (c Configuration) mgr() *outboundManager { if len(c) == 0 { diff --git a/config_test.go b/config_test.go index 4829aabd..142c6c44 100644 --- a/config_test.go +++ b/config_test.go @@ -25,7 +25,7 @@ func (n testNode) Addr() string { return n.addr } -func TestNewConfiguration(t *testing.T) { +func TestNewConfig(t *testing.T) { tests := []struct { name string opt gorums.NodeListOption diff --git a/doc/user-guide.md b/doc/user-guide.md index df7650d7..91ffc290 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -324,13 +324,11 @@ func ExampleStorageServer(port int) { ## Implementing the StorageClient Next, we write client code to call RPCs on our servers. -The first thing we need to do is to create an instance of the `Manager` type. -The manager maintains a pool of connections to nodes. -Nodes are added to the connection pool via new configurations, as shown below. +The first thing we need to do is to create a `Configuration` using `gorums.NewConfig`. +`NewConfig` establishes connections to the given nodes and returns a configuration +ready for making RPC calls. -The manager takes as arguments a set of optional manager options. -We can forward gRPC dial options to the manager if needed. -The manager will use these options when connecting to nodes. +We can forward gRPC dial options to `NewConfig` if needed. Below we use only a simple insecure connection option. ```go @@ -345,32 +343,28 @@ import ( ) func ExampleStorageClient() { - mgr := NewManager( - gorums.WithDialOptions( - grpc.WithTransportCredentials(insecure.NewCredentials()), - ), - ) -``` - -A configuration is a set of nodes on which our RPC calls can be invoked. -Using the `WithNodeList` option, the manager assigns a unique identifier to each node. -The code below shows how to create a configuration: - -```go - // Get all all available node ids, 3 nodes addrs := []string{ "127.0.0.1:8080", "127.0.0.1:8081", "127.0.0.1:8082", } // Create a configuration including all nodes - allNodesConfig, err := NewConfiguration(mgr, gorums.WithNodeList(addrs)) + allNodesConfig, err := gorums.NewConfig( + gorums.WithNodeList(addrs), + gorums.WithDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), + ) if err != nil { log.Fatalln("error creating read config:", err) } + defer allNodesConfig.Close() ``` -The `Manager` and `Configuration` types also have a few other available methods. +A configuration is a set of nodes on which RPC calls can be invoked. +`WithNodeList` assigns a unique identifier to each node by address. + +The `Configuration` type has several useful methods for combining and filtering configurations. Inspect the package documentation or source code for details. We can now invoke the WriteUnicast RPC on each `node` in the configuration: @@ -597,17 +591,17 @@ func ExampleStorageClient() { "127.0.0.1:8082", } - mgr := gorums.NewManager( + // Create a configuration with all nodes + config, err := gorums.NewConfig( + gorums.WithNodeList(addrs), gorums.WithDialOptions( grpc.WithTransportCredentials(insecure.NewCredentials()), ), ) - - // Create a configuration with all nodes - cfg, err := NewConfiguration(mgr, gorums.WithNodeList(addrs)) if err != nil { log.Fatalln("error creating configuration:", err) } + defer config.Close() ctx := context.Background() cfgCtx := config.Context(ctx) @@ -1090,7 +1084,7 @@ Gorums defines several sentinel errors that commonly appear as the cause of a `Q Here's how to properly handle errors from a quorum call: ```go -func handleQuorumCall(cfg *gorums.Configuration, req *ReadRequest) { +func handleQuorumCall(config *gorums.Configuration, req *ReadRequest) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -1168,18 +1162,17 @@ if err != nil { var qcErr gorums.QuorumCallError if errors.As(err, &qcErr) { // Option 1: Exclude all failed nodes - newConfig, err := NewConfiguration(mgr, config.WithoutErrors(qcErr)) + newConfig := config.WithoutErrors(qcErr) // Option 2: Exclude only nodes with specific error types // For example, exclude only nodes that timed out - newConfig, err := NewConfiguration(mgr, config.WithoutErrors(qcErr, context.DeadlineExceeded)) + newConfig = config.WithoutErrors(qcErr, context.DeadlineExceeded) // Option 3: Exclude nodes with multiple specific error types - newConfig, err := NewConfiguration(mgr, config.WithoutErrors(qcErr, - context.DeadlineExceeded, - context.Canceled, - io.EOF, - ), + newConfig = config.WithoutErrors(qcErr, + context.DeadlineExceeded, + context.Canceled, + io.EOF, ) // Retry the operation with the new configuration @@ -1196,7 +1189,7 @@ This allows you to filter nodes based on the underlying cause of their failures, Below is an example demonstrating how to work with configurations. These configurations are viewed from the client's perspective, and to actually make quorum calls on these configurations, there must be server endpoints to connect to. -We ignore the construction of `mgr` and error handling (except for the last configuration). +Error handling is omitted for brevity except where the result is used. In the example below, we simply use fixed quorum sizes. @@ -1207,56 +1200,49 @@ func ExampleConfigClient() { "127.0.0.1:8081", "127.0.0.1:8082", } - // Make configuration c1 from addrs, giving |c1| = |addrs| = 3 - c1, _ := NewConfiguration(mgr, + // Create base configuration c1 from addrs, giving |c1| = 3. + c1, err := gorums.NewConfig( gorums.WithNodeList(addrs), + gorums.WithDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), ) + if err != nil { + log.Fatalln("error creating configuration:", err) + } + defer c1.Close() newAddrs := []string{ "127.0.0.1:9080", "127.0.0.1:9081", } - // Make configuration c2 from newAddrs, giving |c2| = |newAddrs| = 2 - c2, _ := NewConfiguration(mgr, - gorums.WithNodeList(newAddrs), - ) + // Extend c1 with newAddrs; c2 shares c1's connection pool, |c2| = |c1| + |newAddrs| = 5. + c2, _ := c1.Extend(gorums.WithNodeList(newAddrs)) - // Make new configuration c3 from c1 and newAddrs, giving |c3| = |c1| + |newAddrs| = 3+2=5 - c3, _ := NewConfiguration(mgr, - c1.WithNewNodes(gorums.WithNodeList(newAddrs)), - ) + // c3 = nodes in c2 not in c1, giving |c3| = |newAddrs| = 2. + c3 := c2.Difference(c1) - // Make new configuration c4 from c1 and c2, giving |c4| = |c1| + |c2| = 3+2=5 - c4, _ := NewConfiguration(mgr, - c1.And(c2), - ) + // c4 = union of c1 and c3, giving |c4| = |c1| + |c3| = 3+2 = 5. + c4 := c1.Union(c3) - // Make new configuration c5 from c1 except the first node from c1, giving |c5| = |c1| - 1 = 3-1 = 2 - c5, _ := NewConfiguration(mgr, - c1.WithoutNodes(c1.NodeIDs()[0]), - ) + // c5 = c1 without its first node, giving |c5| = |c1| - 1 = 2. + c5 := c1.Remove(c1.NodeIDs()[0]) - // Make new configuration c6 from c3 except c1, giving |c6| = |c3| - |c1| = 5-3 = 2 - c6, _ := NewConfiguration(mgr, - c3.Except(c1), - ) + // c6 = c2 without c1, giving |c6| = |c2| - |c1| = 5-3 = 2. + c6 := c2.Difference(c1) // Example: Handling quorum call failures and creating a new configuration - // without failed nodes - cfgCtx := c1.Context(ctx) + // without failed nodes. + cfgCtx := c1.Context(context.Background()) state, err := ReadQC(cfgCtx, &ReadRequest{}).Majority() if err != nil { var qcErr gorums.QuorumCallError if errors.As(err, &qcErr) { - // Create a new configuration excluding all nodes that failed - c7, _ := NewConfiguration(mgr, - c1.WithoutErrors(qcErr), - ) - - // Or exclude only nodes with specific error types (e.g., timeout errors) - c8, _ := NewConfiguration(mgr, - c1.WithoutErrors(qcErr, context.DeadlineExceeded), - ) + // Create a new configuration excluding all nodes that failed. + c7 := c1.WithoutErrors(qcErr) + + // Or exclude only nodes with specific error types (e.g., timeout errors). + c8 := c1.WithoutErrors(qcErr, context.DeadlineExceeded) } } } @@ -1394,14 +1380,14 @@ Without `Release()`, the server would block all other inbound messages until the // ReadNestedQC is a quorum-call handler that fans out a nested ReadQC // to all known connected peers and returns the most recent value. func (s *storageServer) ReadNestedQC(ctx gorums.ServerCtx, req *pb.ReadRequest) (*pb.ReadResponse, error) { - cfg := ctx.Config() - if len(cfg) == 0 { + config := ctx.Config() + if len(config) == 0 { return nil, fmt.Errorf("ReadNestedQC requires a server peer configuration") } // Release the handler lock before making nested outbound calls to avoid // blocking inbound message processing on this server. ctx.Release() - return newestValue(pb.ReadQC(cfg.Context(ctx), req)) + return newestValue(pb.ReadQC(config.Context(ctx), req)) } ``` @@ -1409,13 +1395,13 @@ The same pattern applies to nested multicast: ```go func (s *storageServer) WriteNestedMulticast(ctx gorums.ServerCtx, req *pb.WriteRequest) (*pb.WriteResponse, error) { - cfg := ctx.Config() - if len(cfg) == 0 { - return pb.WriteResponse_builder{New: false}.Build(), fmt.Errorf("WriteNestedMulticast requires a server peer configuration") + config := ctx.Config() + if len(config) == 0 { + return nil, fmt.Errorf("write_nested_multicast: requires server peer configuration") } ctx.Release() - if err := pb.WriteMulticast(cfg.Context(ctx), req); err != nil { - return pb.WriteResponse_builder{New: false}.Build(), err + if err := pb.WriteMulticast(config.Context(ctx), req); err != nil { + return nil, fmt.Errorf("write_nested_multicast: %w", err) } return pb.WriteResponse_builder{New: true}.Build(), nil } @@ -1499,7 +1485,7 @@ clientSrv := gorums.NewServer() clientSrv.RegisterHandler(pb.MyMethod, myHandler) // Connect to the server; NewConfig wires up the back-channel dispatcher automatically. -cfg, err := clientSrv.NewConfig( +config, err := clientSrv.NewConfig( gorums.WithNodeList(serverAddrs), gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), ) @@ -1532,12 +1518,12 @@ The handler reads `ctx.ClientConfig()` to reach all currently connected client p ```go // ReadNestedQC fans out a ReadQC to all clients that have connected. func (s *storageServer) ReadNestedQC(ctx gorums.ServerCtx, req *pb.ReadRequest) (*pb.ReadResponse, error) { - cfg := ctx.ClientConfig() - if len(cfg) == 0 { + config := ctx.ClientConfig() + if len(config) == 0 { return nil, fmt.Errorf("ReadNestedQC: no client peers connected") } ctx.Release() - return newestValue(pb.ReadQC(cfg.Context(ctx), req)) + return newestValue(pb.ReadQC(config.Context(ctx), req)) } ``` diff --git a/examples/storage/repl.go b/examples/storage/repl.go index 92b2453f..b37dd51c 100644 --- a/examples/storage/repl.go +++ b/examples/storage/repl.go @@ -438,9 +438,9 @@ func (r repl) parseConfiguration(cfgStr string) (pb.Configuration, error) { } nodes := make([]*pb.Node, 0, len(indices)) - mgrNodes := r.cfg.Nodes() + cfgNodes := r.cfg.Nodes() for _, i := range indices { - nodes = append(nodes, mgrNodes[i]) + nodes = append(nodes, cfgNodes[i]) } gorums.OrderedBy(gorums.ID).Sort(nodes) return pb.Configuration(nodes), nil diff --git a/inbound_manager_test.go b/inbound_manager_test.go index 828d2469..64bc50ff 100644 --- a/inbound_manager_test.go +++ b/inbound_manager_test.go @@ -423,19 +423,19 @@ func peerNodes() NodeListOption { }) } -// connectAsPeer creates a Manager that identifies itself as peerID by sending -// gorumsNodeIDKey metadata, connects to addrs, and returns the manager. -// Manager cleanup is registered via t.Cleanup; callers may also close it +// connectAsPeer creates a Configuration that identifies itself as peerID by sending +// gorumsNodeIDKey metadata, connects to addrs, and returns the configuration. +// Configuration cleanup is registered via t.Cleanup; callers may also close it // explicitly (e.g., to test disconnect) — Close is idempotent. -func connectAsPeer(t *testing.T, peerID uint32, addrs []string) *Manager { +func connectAsPeer(t *testing.T, peerID uint32, addrs []string) Configuration { t.Helper() peerMD := metadata.Pairs(gorumsNodeIDKey, strconv.FormatUint(uint64(peerID), 10)) - mgr := TestManager(t, WithMetadata(peerMD)) - _, err := NewConfiguration(mgr, WithNodeList(addrs)) + cfg, err := NewConfig(WithNodeList(addrs), TestDialOptions(t), WithMetadata(peerMD)) if err != nil { - t.Fatalf("NewConfiguration() error: %v", err) + t.Fatalf("NewConfig() error: %v", err) } - return mgr + t.Cleanup(Closer(t, cfg)) + return cfg } // TestKnownPeerConnects verifies the end-to-end path: @@ -459,13 +459,13 @@ func TestKnownPeerConnects(t *testing.T) { func TestKnownPeerDisconnects(t *testing.T) { srv, addrs := testPeerServer(t) - mgr := connectAsPeer(t, 2, addrs) + cfg := connectAsPeer(t, 2, addrs) WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) - // Close the peer manager to trigger disconnect; Close is idempotent so - // t.Cleanup (registered by connectAsPeer via TestManager) is harmless. - if err := mgr.Close(); err != nil { - t.Fatalf("mgr.Close() error: %v", err) + // Close the configuration to trigger disconnect; Close is idempotent so + // t.Cleanup (registered by connectAsPeer) is harmless. + if err := cfg.Close(); err != nil { + t.Fatalf("cfg.Close() error: %v", err) } WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1})) checkIDs(t, srv.Config(), []uint32{1}, "after disconnect") @@ -477,11 +477,11 @@ func TestUnknownPeerIgnored(t *testing.T) { srv, addrs := testPeerServer(t) // Connect without metadata (external client) and with an unknown ID. - external := TestManager(t) - _, err := NewConfiguration(external, WithNodeList(addrs)) + cfg, err := NewConfig(WithNodeList(addrs), TestDialOptions(t)) if err != nil { - t.Fatalf("NewConfiguration() error: %v", err) + t.Fatalf("NewConfig() error: %v", err) } + t.Cleanup(Closer(t, cfg)) connectAsPeer(t, 99, addrs) // ID 99 not in known set @@ -490,30 +490,6 @@ func TestUnknownPeerIgnored(t *testing.T) { checkIDs(t, srv.Config(), []uint32{1}, "external and unknown peers must not appear") } -type mockRequestHandler struct { - handlers map[string]Handler -} - -func (m mockRequestHandler) HandleRequest(ctx context.Context, msg *stream.Message, release func(), send func(*stream.Message)) { - srvCtx := ServerCtx{Context: ctx, release: release, send: send} - handler, ok := m.handlers[msg.GetMethod()] - if !ok { - release() - return - } - defer release() - inMsg, err := unmarshalRequest(msg) - in := &Message{Msg: inMsg, Message: msg} - if err != nil { - _ = srvCtx.SendMessage(MessageWithError(in, nil, err)) - return - } - out, err := handler(srvCtx, in) - if out != nil || err != nil { - _ = srvCtx.SendMessage(MessageWithError(in, out, err)) - } -} - // TestKnownPeerServerCallsClient verifies the full symmetric communication path: // server sends a request to a connected client via an inbound channel, // the client's Channel.receiver dispatches to a registered handler, @@ -521,19 +497,18 @@ func (m mockRequestHandler) HandleRequest(ctx context.Context, msg *stream.Messa func TestKnownPeerServerCallsClient(t *testing.T) { srv, addrs := testPeerServer(t) - // Client connects as peer 2 with a handler injected via withRequestHandler. - clientHandlers := map[string]Handler{ - mock.TestMethod: func(_ ServerCtx, in *Message) (*Message, error) { - req := AsProto[*pb.StringValue](in) - return NewResponseMessage(in, pb.String("echo: "+req.GetValue())), nil - }, - } + // Client connects as peer 2 with handlers registered on a server via WithServer. + clientSrv := NewServer() + clientSrv.RegisterHandler(mock.TestMethod, func(_ ServerCtx, in *Message) (*Message, error) { + req := AsProto[*pb.StringValue](in) + return NewResponseMessage(in, pb.String("echo: "+req.GetValue())), nil + }) peerMD := metadata.Pairs(gorumsNodeIDKey, "2") - mgr := TestManager(t, WithMetadata(peerMD), withRequestHandler(mockRequestHandler{handlers: clientHandlers}, 0)) - _, err := NewConfiguration(mgr, WithNodeList(addrs)) + cfg, err := NewConfig(WithNodeList(addrs), TestDialOptions(t), WithMetadata(peerMD), WithServer(clientSrv)) if err != nil { - t.Fatalf("NewConfiguration() error: %v", err) + t.Fatalf("NewConfig() error: %v", err) } + t.Cleanup(Closer(t, cfg)) // Wait for the peer to appear in the inbound config. WaitForConfigCondition(t, srv.Config, equalNodeIDs([]uint32{1, 2})) @@ -601,18 +576,18 @@ func testClientServer(t *testing.T) (*Server, []string) { return srv, addrs } -// connectAsPeerClient creates a Manager that advertises back-channel -// capability by sending the gorums-node-id key (via [withRequestHandler]), -// connects to addrs, and returns the manager. The server will include it in +// connectAsPeerClient creates a Configuration that advertises back-channel +// capability by sending the gorums-node-id key (via [WithServer]), +// connects to addrs, and returns the configuration. The server will include it in // ClientConfig and may dispatch server-initiated calls to it. -func connectAsPeerClient(t *testing.T, addrs []string) *Manager { +func connectAsPeerClient(t *testing.T, addrs []string) Configuration { t.Helper() - mgr := TestManager(t, withRequestHandler(NewServer(), 0)) - _, err := NewConfiguration(mgr, WithNodeList(addrs)) + cfg, err := NewConfig(WithNodeList(addrs), TestDialOptions(t), WithServer(NewServer())) if err != nil { - t.Fatalf("NewConfiguration() error: %v", err) + t.Fatalf("NewConfig() error: %v", err) } - return mgr + t.Cleanup(Closer(t, cfg)) + return cfg } // TestClientConfigConnects verifies that a server accepts a peer-capable @@ -641,7 +616,7 @@ func TestClientConfigConnects(t *testing.T) { func TestClientConfigDisconnects(t *testing.T) { srv, addrs := testClientServer(t) - mgr := connectAsPeerClient(t, addrs) + cfg := connectAsPeerClient(t, addrs) // Wait for the client peer to appear. WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) @@ -650,8 +625,8 @@ func TestClientConfigDisconnects(t *testing.T) { } // Disconnect the client peer. - if err := mgr.Close(); err != nil { - t.Fatalf("mgr.Close() error: %v", err) + if err := cfg.Close(); err != nil { + t.Fatalf("cfg.Close() error: %v", err) } // Wait for config to become empty. @@ -709,17 +684,17 @@ func TestClientConfigServerCallsClient(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - // Client: a Server whose reverse-direction mock.Stream handler is wired in via withRequestHandler. + // Client: a Server whose reverse-direction mock.Stream handler is wired in via WithServer. clientSrv := NewServer() clientSrv.RegisterHandler(mock.Stream, func(_ ServerCtx, _ *Message) (*Message, error) { wg.Done() return nil, nil }) - mgr := TestManager(t, withRequestHandler(clientSrv, 0)) - clientConfig, err := NewConfiguration(mgr, WithNodeList(addrs)) + clientConfig, err := NewConfig(WithNodeList(addrs), TestDialOptions(t), WithServer(clientSrv)) if err != nil { - t.Fatalf("NewConfiguration() error: %v", err) + t.Fatalf("NewConfig() error: %v", err) } + t.Cleanup(Closer(t, clientConfig)) // Wait for the client to appear in the server's ClientConfig. WaitForConfigCondition(t, srv.ClientConfig, func(cfg Configuration) bool { return len(cfg) > 0 }) diff --git a/internal/tests/config/config_test.go b/internal/tests/config/config_test.go index 2027aa98..a65d0907 100644 --- a/internal/tests/config/config_test.go +++ b/internal/tests/config/config_test.go @@ -40,22 +40,22 @@ func TestConfig(t *testing.T) { } } - c1 := gorums.TestConfiguration(t, 4, serverFn) + c1 := gorums.TestConfiguration(t, 6, serverFn) fmt.Println("--- c1 ", c1.Nodes()) callRPC(c1) - // Create a new configuration c2 with 2 new nodes not in c1, using the same manager as c1. - c2 := gorums.TestConfiguration(t, 2, serverFn, gorums.WithManager(t, c1)) + // Create c2 by removing 2 nodes from c1. + c2 := c1.Remove(1, 2) fmt.Println("--- c2 ", c2.Nodes()) callRPC(c2) - // Create c3 = c1 ∪ c2, using the same manager as c1 (and c2). + // Create c3 = c1 ∪ c2 c3 := c1.Union(c2) fmt.Println("--- c3 ", c3.Nodes()) callRPC(c3) - // Create c4 = c3 \ c1, using the same manager as c1 (and c2, c3). - c4 := c3.Difference(c1) + // Create c4 = c3 \ c2 + c4 := c3.Difference(c2) fmt.Println("--- c4 ", c4.Nodes()) callRPC(c4) } diff --git a/mgr.go b/mgr.go index b82abdd1..e471cc58 100644 --- a/mgr.go +++ b/mgr.go @@ -23,10 +23,6 @@ type outboundManager struct { nextMsgID uint64 } -// Deprecated: Manager is an alias for outboundManager and will be removed in a -// future release. Use [Configuration] instead. -type Manager = outboundManager - // newOutboundManager returns a new outboundManager for managing connection to // nodes added to the manager. func newOutboundManager(opts ...DialOption) *outboundManager { @@ -51,11 +47,6 @@ func newOutboundManager(opts ...DialOption) *outboundManager { return m } -// Deprecated: Use [NewConfig] instead. -func NewManager(opts ...DialOption) *Manager { - return newOutboundManager(opts...) -} - // Close closes all node connections and any client streams. func (m *outboundManager) Close() error { var err error @@ -67,18 +58,6 @@ func (m *outboundManager) Close() error { return err } -// NodeIDs returns the identifier of each available node. IDs are returned in -// the same order as they were provided in the creation of the Manager. -func (m *outboundManager) NodeIDs() []uint32 { - m.mu.Lock() - defer m.mu.Unlock() - ids := make([]uint32, 0, len(m.nodes)) - for _, node := range m.nodes { - ids = append(ids, node.ID()) - } - return ids -} - // Node returns the node with the given identifier if present. func (m *outboundManager) Node(id uint32) (node *Node, found bool) { m.mu.Lock() @@ -95,13 +74,6 @@ func (m *outboundManager) Nodes() []*Node { return m.nodes } -// Size returns the number of nodes in the Manager. -func (m *outboundManager) Size() (nodes int) { - m.mu.Lock() - defer m.mu.Unlock() - return len(m.nodes) -} - func (m *outboundManager) addNode(node *Node) { m.mu.Lock() defer m.mu.Unlock() diff --git a/node.go b/node.go index 7e552078..c8a85261 100644 --- a/node.go +++ b/node.go @@ -48,7 +48,7 @@ type Node struct { // Only assigned at creation. id uint32 addr string - mgr *outboundManager // only used for backward compatibility to allow Configuration.Manager() + mgr *outboundManager // owning manager for this node msgIDGen func() uint64 router *stream.MessageRouter @@ -78,7 +78,7 @@ type nodeOptions struct { PerNodeMD func(uint32) metadata.MD DialOpts []grpc.DialOption RequestHandler stream.RequestHandler - Manager *outboundManager // only used for backward compatibility to allow Configuration.Manager() + Manager *outboundManager // owning manager } // newOutboundNode creates a new node using the provided options. It establishes diff --git a/opts.go b/opts.go index 663d2158..9e8e3a62 100644 --- a/opts.go +++ b/opts.go @@ -22,11 +22,6 @@ type DialOption func(*dialOptions) func (DialOption) isOption() {} -// ManagerOption is a deprecated alias for [DialOption]. -// -// Deprecated: Use [DialOption] instead. -type ManagerOption = DialOption - type dialOptions struct { grpcDialOpts []grpc.DialOption logger *log.Logger diff --git a/server_test.go b/server_test.go index 1cfcb495..b6d36fb1 100644 --- a/server_test.go +++ b/server_test.go @@ -24,9 +24,9 @@ func TestServerCallback(t *testing.T) { message = m.Get("message")[0] signal <- struct{}{} }) - mgrOption := gorums.WithMetadata(metadata.New(map[string]string{"message": "hello"})) + dialOption := gorums.WithMetadata(metadata.New(map[string]string{"message": "hello"})) - gorums.TestNode(t, nil, srvOption, mgrOption) + gorums.TestNode(t, nil, srvOption, dialOption) select { case <-time.After(100 * time.Millisecond): diff --git a/testing_bufconn.go b/testing_bufconn.go index 2e3fd758..d7f12553 100644 --- a/testing_bufconn.go +++ b/testing_bufconn.go @@ -116,33 +116,21 @@ func testSetupServers(t testing.TB, numServers int, srvFn func(int) ServerIface) return setupServers(t, numServers, srvFn, listenFn) } -// getOrCreateManager returns the existing manager or creates a new one with bufconn dialing. -// If a new manager is created, its cleanup is registered via t.Cleanup. -func (to *testOptions) getOrCreateManager(t testing.TB) *outboundManager { - if to.existingCfg != nil { - // Don't register cleanup - caller is responsible for closing the configuration - return to.existingCfg.mgr() - } - - // Create an indirect dialer that looks up from the registry at dial time. - // This allows the manager to dial addresses that are registered after manager creation. - bufconnDialer := func(ctx context.Context, addr string) (net.Conn, error) { - dialer, err := globalBufconnRegistry.getDialer(t) +// TestDialOptions returns a [DialOption] that configures a bufconn-based in-memory +// dialer for tests. The dialer looks up the registered listener at dial time, so +// tests may register listeners after calling TestDialOptions. +func TestDialOptions(t testing.TB) DialOption { + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + d, err := globalBufconnRegistry.getDialer(t) if err != nil { return nil, err } - return dialer(ctx, addr) + return d(ctx, addr) } - - // Create manager with bufconn dialer and register its cleanup LAST so it runs FIRST (LIFO) - dialOpts := []grpc.DialOption{ - grpc.WithContextDialer(bufconnDialer), + return WithDialOptions( + grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), - } - mgrOpts := append([]DialOption{WithDialOptions(dialOpts...)}, to.managerOpts...) - mgr := newOutboundManager(mgrOpts...) - t.Cleanup(func() { Closer(t, mgr)() }) - return mgr + ) } // bufconnListener wraps bufconn.Listener to implement net.Listener diff --git a/testing_integration.go b/testing_integration.go index 0c9cb504..55b2afc3 100644 --- a/testing_integration.go +++ b/testing_integration.go @@ -22,16 +22,7 @@ func testSetupServers(t testing.TB, numServers int, srvFn func(i int) ServerIfac return setupServers(t, numServers, srvFn, listenFn) } -// getOrCreateManager returns the existing manager or creates a new one with real network dialing. -// If a new manager is created, its cleanup is registered via t.Cleanup. -func (to *testOptions) getOrCreateManager(t testing.TB) *outboundManager { - if to.existingCfg != nil { - // Don't register cleanup - caller is responsible for closing the configuration - return to.existingCfg.mgr() - } - // Create manager and register its cleanup LAST so it runs FIRST (LIFO) - mgrOpts := append([]DialOption{InsecureDialOptions(t)}, to.managerOpts...) - mgr := newOutboundManager(mgrOpts...) - t.Cleanup(Closer(t, mgr)) - return mgr +// TestDialOptions returns a [DialOption] with insecure TCP credentials for integration tests. +func TestDialOptions(t testing.TB) DialOption { + return InsecureDialOptions(t) } diff --git a/testing_shared.go b/testing_shared.go index f49f1204..2a6d82b0 100644 --- a/testing_shared.go +++ b/testing_shared.go @@ -72,14 +72,6 @@ func TestQuorumCallError(_ testing.TB, nodeErrors map[uint32]error) QuorumCallEr return QuorumCallError{cause: ErrIncomplete, errors: errs} } -// TestManager creates a new Manager with real network dial support and any additional -// DialOption (e.g., WithMetadata). The manager is automatically closed via t.Cleanup. -func TestManager(t testing.TB, opts ...DialOption) *outboundManager { - t.Helper() - to := &testOptions{managerOpts: opts} - return to.getOrCreateManager(t) -} - // TestConfiguration creates servers and a configuration for testing. // Both server and manager cleanup are handled via t.Cleanup in the correct order: // manager is closed first, then servers are stopped. @@ -121,11 +113,13 @@ func TestConfiguration(t testing.TB, numServers int, srvFn func(i int) ServerIfa testOpts.preConnectHook(stopAllFn) } - mgr := testOpts.getOrCreateManager(t) - cfg, err := NewConfiguration(mgr, testOpts.nodeListOption(addrs)) + // Create configuration and register its cleanup LAST so it runs FIRST (LIFO) + dialOptions := append([]DialOption{TestDialOptions(t)}, testOpts.managerOpts...) + cfg, err := NewConfig(testOpts.nodeListOption(addrs), dialOptions...) if err != nil { t.Fatal(err) } + t.Cleanup(Closer(t, cfg)) return cfg } @@ -158,8 +152,8 @@ func TestNode(t testing.TB, srvFn func(i int) ServerIface, opts ...TestOption) * // Example usage: // // addrs := gorums.TestServers(t, 3, serverFn) -// mgr := gorums.NewManager(gorums.InsecureDialOptions(t)) -// t.Cleanup(gorums.Closer(t, mgr)) +// cfg, err := gorums.NewConfig(gorums.WithNodeList(addrs), gorums.TestDialOptions(t)) +// t.Cleanup(gorums.Closer(t, cfg)) // ... // // This function can be used by other packages for testing purposes, as long as diff --git a/testopts.go b/testopts.go index 9ed3ed7e..717b1c62 100644 --- a/testopts.go +++ b/testopts.go @@ -5,7 +5,7 @@ import "testing" // TestOption is a marker interface that can hold DialOption, // ServerOption, or NodeListOption. This allows test helpers to accept // a single variadic parameter that can be filtered and passed to the -// appropriate constructors (NewManager, NewServer, NewConfiguration). +// appropriate constructors: NewServer or NewConfig. // // Each option type (DialOption, ServerOption, NodeListOption) embeds // this interface, so they can be passed directly without wrapping: @@ -22,17 +22,14 @@ type testOptions struct { managerOpts []DialOption serverOpts []ServerOption nodeListOpts []NodeListOption - existingCfg Configuration stopFuncPtr *func(...int) // pointer to capture the variadic stop function preConnectHook func(stopFn func()) // called before connecting to servers skipGoleak bool // skip goleak checks (useful for synctest) } // shouldSkipGoleak returns true if goleak checks should be skipped. -// This includes cases where an existing configuration is reused (since it may -// already have its own goleak checks) or when SkipGoleak option is set. func (to *testOptions) shouldSkipGoleak() bool { - return to.existingCfg != nil || to.skipGoleak + return to.skipGoleak } // serverFunc returns a server creation function based on the server options. @@ -76,8 +73,6 @@ func extractTestOptions(opts []TestOption) testOptions { result.serverOpts = append(result.serverOpts, o) case NodeListOption: result.nodeListOpts = append(result.nodeListOpts, o) - case Configuration: - result.existingCfg = o case stopFuncProvider: result.stopFuncPtr = o.stopFunc case preConnectProvider: @@ -89,21 +84,6 @@ func extractTestOptions(opts []TestOption) testOptions { return result } -// WithManager returns a TestOption that provides an existing configuration whose -// manager will be reused instead of creating a new one. This is useful when -// creating multiple configurations that should share the same manager. -// -// When using WithManager, the caller is responsible for closing the original -// configuration. SetupConfiguration will NOT register a cleanup function for the manager. -// -// This option is intended for testing purposes only. -func WithManager(_ testing.TB, cfg Configuration) TestOption { - if cfg == nil { - panic("gorums: WithManager called with nil configuration") - } - return cfg -} - // stopFuncProvider is a TestOption that captures the server stop function. type stopFuncProvider struct { stopFunc *func(...int)