From fc631d837516d6c7b15573637637527ac3a476b7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 18 Dec 2025 15:37:50 +0100 Subject: [PATCH 01/19] feat(ocap-kernel): add resource limits for remote communications - Add connection limit (default 100 concurrent connections) - Add message size limit (default 1MB per message) - Add stale peer cleanup (removes data for peers disconnected >1 hour) - Make all limits configurable via RemoteCommsOptions - Add ResourceLimitError for limit violations - Add comprehensive tests for all resource limits This prevents memory exhaustion and manages system resources by: - Rejecting new connections when limit is reached - Rejecting messages exceeding size limit - Periodically cleaning up stale peer data --- packages/kernel-errors/src/constants.ts | 1 + .../src/errors/ResourceLimitError.ts | 66 ++++++ packages/kernel-errors/src/errors/index.ts | 2 + packages/kernel-errors/src/index.test.ts | 1 + packages/kernel-errors/src/index.ts | 1 + .../ocap-kernel/src/remotes/network.test.ts | 219 +++++++++++++++++- packages/ocap-kernel/src/remotes/network.ts | 156 ++++++++++++- packages/ocap-kernel/src/remotes/types.ts | 20 ++ vitest.config.ts | 14 +- 9 files changed, 469 insertions(+), 11 deletions(-) create mode 100644 packages/kernel-errors/src/errors/ResourceLimitError.ts diff --git a/packages/kernel-errors/src/constants.ts b/packages/kernel-errors/src/constants.ts index 2c98f6b43..fc5c55f8d 100644 --- a/packages/kernel-errors/src/constants.ts +++ b/packages/kernel-errors/src/constants.ts @@ -33,6 +33,7 @@ export const ErrorCode = { SubclusterNotFound: 'SUBCLUSTER_NOT_FOUND', SampleGenerationError: 'SAMPLE_GENERATION_ERROR', InternalError: 'INTERNAL_ERROR', + ResourceLimitError: 'RESOURCE_LIMIT_ERROR', } as const; export type ErrorCode = (typeof ErrorCode)[keyof typeof ErrorCode]; diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts new file mode 100644 index 000000000..1d11c8f14 --- /dev/null +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -0,0 +1,66 @@ +import { + assert, + literal, + number, + object, + optional, + union, +} from '@metamask/superstruct'; + +import { BaseError } from '../BaseError.ts'; +import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; +import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; + +export class ResourceLimitError extends BaseError { + constructor( + message: string, + options?: ErrorOptionsWithStack & { + data?: { + limitType?: 'connection' | 'messageSize'; + current?: number; + limit?: number; + }; + }, + ) { + super(ErrorCode.ResourceLimitError, message, { + ...options, + }); + harden(this); + } + + /** + * A superstruct struct for validating marshaled {@link ResourceLimitError} instances. + */ + public static struct = object({ + ...marshaledErrorSchema, + code: literal(ErrorCode.ResourceLimitError), + data: optional( + object({ + limitType: optional( + union([literal('connection'), literal('messageSize')]), + ), + current: optional(number()), + limit: optional(number()), + }), + ), + }); + + /** + * Unmarshals a {@link MarshaledError} into a {@link ResourceLimitError}. + * + * @param marshaledError - The marshaled error to unmarshal. + * @param unmarshalErrorOptions - The function to unmarshal the error options. + * @returns The unmarshaled error. + */ + public static unmarshal( + marshaledError: MarshaledOcapError, + unmarshalErrorOptions: ( + marshaledError: MarshaledOcapError, + ) => ErrorOptionsWithStack, + ): ResourceLimitError { + assert(marshaledError, this.struct); + const options = unmarshalErrorOptions(marshaledError); + return new ResourceLimitError(marshaledError.message, options); + } +} +harden(ResourceLimitError); diff --git a/packages/kernel-errors/src/errors/index.ts b/packages/kernel-errors/src/errors/index.ts index adc182cb3..235abe9c4 100644 --- a/packages/kernel-errors/src/errors/index.ts +++ b/packages/kernel-errors/src/errors/index.ts @@ -1,6 +1,7 @@ import { AbortError } from './AbortError.ts'; import { DuplicateEndowmentError } from './DuplicateEndowmentError.ts'; import { EvaluatorError } from './EvaluatorError.ts'; +import { ResourceLimitError } from './ResourceLimitError.ts'; import { SampleGenerationError } from './SampleGenerationError.ts'; import { StreamReadError } from './StreamReadError.ts'; import { VatAlreadyExistsError } from './VatAlreadyExistsError.ts'; @@ -19,4 +20,5 @@ export const errorClasses = { [ErrorCode.SubclusterNotFound]: SubclusterNotFoundError, [ErrorCode.SampleGenerationError]: SampleGenerationError, [ErrorCode.InternalError]: EvaluatorError, + [ErrorCode.ResourceLimitError]: ResourceLimitError, } as const; diff --git a/packages/kernel-errors/src/index.test.ts b/packages/kernel-errors/src/index.test.ts index 0e55551ea..96a79688c 100644 --- a/packages/kernel-errors/src/index.test.ts +++ b/packages/kernel-errors/src/index.test.ts @@ -13,6 +13,7 @@ describe('index', () => { 'EvaluatorError', 'MarshaledErrorStruct', 'MarshaledOcapErrorStruct', + 'ResourceLimitError', 'SampleGenerationError', 'StreamReadError', 'SubclusterNotFoundError', diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index 43981ae55..9f3e40694 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -8,6 +8,7 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts'; export { StreamReadError } from './errors/StreamReadError.ts'; export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts'; export { AbortError } from './errors/AbortError.ts'; +export { ResourceLimitError } from './errors/ResourceLimitError.ts'; export { ErrorCode, ErrorSentinel, diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index d051f9528..7388338a6 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -1,4 +1,4 @@ -import { AbortError } from '@metamask/kernel-errors'; +import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; import { describe, @@ -144,6 +144,12 @@ vi.mock('@metamask/kernel-errors', () => ({ this.name = 'AbortError'; } }, + ResourceLimitError: class MockResourceLimitError extends Error { + constructor(message: string) { + super(message); + this.name = 'ResourceLimitError'; + } + }, isRetryableNetworkError: vi.fn().mockImplementation((error: unknown) => { const errorWithCode = error as { code?: string }; return ( @@ -202,8 +208,10 @@ describe('network.initNetwork', () => { }); afterEach(() => { - // Clear mocks after each test - vi.clearAllMocks(); + if (vi.isFakeTimers()) { + vi.clearAllTimers(); + vi.useRealTimers(); + } }); const createMockChannel = (peerId: string): MockChannel => ({ @@ -1934,4 +1942,209 @@ describe('network.initNetwork', () => { expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(2); }); }); + + describe('connection limit', () => { + it('enforces maximum concurrent connections', async () => { + const mockChannels: MockChannel[] = []; + // Create 100 mock channels + for (let i = 0; i < 100; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish 100 connections + for (let i = 0; i < 100; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt to establish 101st connection should fail + await expect(sendRemoteMessage('peer-101', 'msg')).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(100); + }); + + it('respects custom maxConcurrentConnections option', async () => { + const customLimit = 5; + const mockChannels: MockChannel[] = []; + // Create mock channels up to custom limit + for (let i = 0; i < customLimit; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Establish connections up to custom limit + for (let i = 0; i < customLimit; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt to establish connection beyond custom limit should fail + await expect(sendRemoteMessage('peer-exceed', 'msg')).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes( + customLimit, + ); + }); + + it('rejects inbound connections when limit reached', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler) => { + inboundHandler = handler; + }, + ); + const mockChannels: MockChannel[] = []; + // Create 100 mock channels for outbound connections + for (let i = 0; i < 100; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish 100 outbound connections + for (let i = 0; i < 100; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt inbound connection should be rejected + const inboundChannel = createMockChannel('inbound-peer'); + inboundHandler?.(inboundChannel); + // Should not add to channels (connection rejected) + expect(mockLogger.log).toHaveBeenCalledWith( + 'inbound-peer:: rejecting inbound connection due to connection limit', + ); + }); + }); + + describe('message size limit', () => { + it('rejects messages exceeding 1MB size limit', async () => { + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message larger than 1MB + const largeMessage = 'x'.repeat(1024 * 1024 + 1); // 1MB + 1 byte + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).not.toHaveBeenCalled(); + expect(mockMessageQueue.enqueue).not.toHaveBeenCalled(); + }); + + it('allows messages at exactly 1MB size limit', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message exactly 1MB + const exactSizeMessage = 'x'.repeat(1024 * 1024); + await sendRemoteMessage('peer-1', exactSizeMessage); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + }); + + it('validates message size before queueing during reconnection', async () => { + mockReconnectionManager.isReconnecting.mockReturnValue(true); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message larger than 1MB + const largeMessage = 'x'.repeat(1024 * 1024 + 1); + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + // Should not queue the message + expect(mockMessageQueue.enqueue).not.toHaveBeenCalled(); + }); + + it('respects custom maxMessageSizeBytes option', async () => { + const customLimit = 500 * 1024; // 500KB + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxMessageSizeBytes: customLimit }, + vi.fn(), + ); + // Create a message larger than custom limit + const largeMessage = 'x'.repeat(customLimit + 1); + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + // Create a message at exactly custom limit + const exactSizeMessage = 'x'.repeat(customLimit); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + await sendRemoteMessage('peer-1', exactSizeMessage); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + }); + }); + + describe('stale peer cleanup', () => { + it('sets up periodic cleanup interval', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + await initNetwork('0x1234', {}, vi.fn()); + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + 15 * 60 * 1000, + ); + expect(intervalFn).toBeDefined(); + setIntervalSpy.mockRestore(); + }); + + it('cleans up interval on stop', async () => { + const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((_fn: () => void, _ms?: number) => { + return 42 as unknown as NodeJS.Timeout; + }); + const { stop } = await initNetwork('0x1234', {}, vi.fn()); + await stop(); + expect(clearIntervalSpy).toHaveBeenCalledWith(42); + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + }); + + it('does not clean up peers with active connections', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish connection (sets lastConnectionTime) + await sendRemoteMessage('peer-1', 'msg'); + // Run cleanup immediately; should not remove active peer + intervalFn?.(); + await sendRemoteMessage('peer-1', 'msg2'); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(1); + setIntervalSpy.mockRestore(); + }); + + it('does not clean up peers currently reconnecting', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + mockReconnectionManager.isReconnecting.mockReturnValue(true); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + await sendRemoteMessage('peer-1', 'msg'); + // Run cleanup immediately; reconnecting peer should not be cleaned + intervalFn?.(); + expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg'); + setIntervalSpy.mockRestore(); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index fb6cebadf..45b8d4e7a 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -1,4 +1,8 @@ -import { AbortError, isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + AbortError, + isRetryableNetworkError, + ResourceLimitError, +} from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -22,6 +26,18 @@ import type { /** Default upper bound for queued outbound messages while reconnecting */ const DEFAULT_MAX_QUEUE = 200; +/** Default maximum number of concurrent connections */ +const DEFAULT_MAX_CONCURRENT_CONNECTIONS = 100; + +/** Default maximum message size in bytes (1MB) */ +const DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024; + +/** Default stale peer cleanup interval in milliseconds (15 minutes) */ +const DEFAULT_CLEANUP_INTERVAL_MS = 15 * 60 * 1000; + +/** Default stale peer timeout in milliseconds (1 hour) */ +const DEFAULT_STALE_PEER_TIMEOUT_MS = 60 * 60 * 1000; + /** * Initialize the remote comm system with information that must be provided by the kernel. * @@ -30,6 +46,10 @@ const DEFAULT_MAX_QUEUE = 200; * @param options.relays - PeerIds/Multiaddrs of known message relays. * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). * @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200). + * @param options.maxConcurrentConnections - Maximum number of concurrent connections (default: 100). + * @param options.maxMessageSizeBytes - Maximum message size in bytes (default: 1MB). + * @param options.cleanupIntervalMs - Stale peer cleanup interval in milliseconds (default: 15 minutes). + * @param options.stalePeerTimeoutMs - Stale peer timeout in milliseconds (default: 1 hour). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @@ -51,6 +71,10 @@ export async function initNetwork( relays = [], maxRetryAttempts, maxQueue = DEFAULT_MAX_QUEUE, + maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS, + maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, + cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_MS, + stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -60,6 +84,7 @@ export async function initNetwork( const reconnectionManager = new ReconnectionManager(); const messageQueues = new Map(); // One queue per peer const intentionallyClosed = new Set(); // Track peers that intentionally closed connections + const lastConnectionTime = new Map(); // Track last connection time for cleanup const connectionFactory = await ConnectionFactory.make( keySeed, relays, @@ -68,6 +93,8 @@ export async function initNetwork( maxRetryAttempts, ); const locationHints = new Map(); + let cleanupIntervalId: ReturnType | undefined; + /** * Output an error message. * @@ -285,6 +312,7 @@ export async function initNetwork( // Add channel to manager channels.set(peerId, channel); + lastConnectionTime.set(peerId, Date.now()); logger.log(`${peerId}:: reconnection successful`); @@ -368,6 +396,98 @@ export async function initNetwork( } } + /** + * Validate message size before sending or queuing. + * + * @param message - The message to validate. + * @throws ResourceLimitError if message exceeds size limit. + */ + function validateMessageSize(message: string): void { + const messageSizeBytes = new TextEncoder().encode(message).length; + if (messageSizeBytes > maxMessageSizeBytes) { + throw new ResourceLimitError( + `Message size ${messageSizeBytes} bytes exceeds limit of ${maxMessageSizeBytes} bytes`, + { + data: { + limitType: 'messageSize', + current: messageSizeBytes, + limit: maxMessageSizeBytes, + }, + }, + ); + } + } + + /** + * Check if we can establish a new connection (within connection limit). + * + * @throws ResourceLimitError if connection limit is reached. + */ + function checkConnectionLimit(): void { + const currentConnections = channels.size; + if (currentConnections >= maxConcurrentConnections) { + throw new ResourceLimitError( + `Connection limit reached: ${currentConnections}/${maxConcurrentConnections} concurrent connections`, + { + data: { + limitType: 'connection', + current: currentConnections, + limit: maxConcurrentConnections, + }, + }, + ); + } + } + + /** + * Clean up stale peer data for peers disconnected for more than 1 hour. + */ + function cleanupStalePeers(): void { + const now = Date.now(); + const stalePeers: string[] = []; + + // Check all tracked peers + for (const [peerId, lastTime] of lastConnectionTime.entries()) { + const timeSinceLastConnection = now - lastTime; + const hasActiveChannel = channels.has(peerId); + const isReconnecting = reconnectionManager.isReconnecting(peerId); + + // Consider peer stale if: + // - No active channel + // - Not currently reconnecting + // - Disconnected for more than stalePeerTimeoutMs + if ( + !hasActiveChannel && + !isReconnecting && + timeSinceLastConnection > stalePeerTimeoutMs + ) { + stalePeers.push(peerId); + } + } + + // Clean up stale peer data + for (const peerId of stalePeers) { + const lastTime = lastConnectionTime.get(peerId); + if (lastTime !== undefined) { + const minutesSinceDisconnect = Math.round((now - lastTime) / 1000 / 60); + logger.log( + `${peerId}:: cleaning up stale peer data (disconnected for ${minutesSinceDisconnect} minutes)`, + ); + } + + // Remove from all tracking structures + lastConnectionTime.delete(peerId); + messageQueues.delete(peerId); + locationHints.delete(peerId); + + // Clear reconnection state if any + if (reconnectionManager.isReconnecting(peerId)) { + reconnectionManager.stopReconnection(peerId); + } + reconnectionManager.clearPeer(peerId); + } + } + /** * Send a message to a peer. * @@ -382,6 +502,9 @@ export async function initNetwork( return; } + // Validate message size before processing + validateMessageSize(message); + // Check if peer is intentionally closed if (intentionallyClosed.has(targetPeerId)) { throw new Error('Message delivery failed after intentional close'); @@ -400,6 +523,9 @@ export async function initNetwork( let channel = channels.get(targetPeerId); if (!channel) { + // Check connection limit before dialing new connection + checkConnectionLimit(); + try { const hints = locationHints.get(targetPeerId) ?? []; channel = await connectionFactory.dialIdempotent( @@ -419,6 +545,7 @@ export async function initNetwork( } channels.set(targetPeerId, channel); + lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); @@ -435,6 +562,7 @@ export async function initNetwork( logger.log(`${targetPeerId}:: send ${message}`); await writeWithTimeout(channel, fromString(message), 10_000); reconnectionManager.resetBackoff(targetPeerId); + lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { outputError(targetPeerId, `sending message`, problem); handleConnectionLoss(targetPeerId); @@ -460,7 +588,20 @@ export async function initNetwork( // Don't add to channels map and don't start reading - connection will naturally close return; } + + // Check connection limit for inbound connections + try { + checkConnectionLimit(); + } catch { + logger.log( + `${channel.peerId}:: rejecting inbound connection due to connection limit`, + ); + // Don't add to channels map - connection will naturally close + return; + } + channels.set(channel.peerId, channel); + lastConnectionTime.set(channel.peerId, Date.now()); readChannel(channel).catch((error) => { outputError(channel.peerId, 'error in inbound channel read', error); }); @@ -469,6 +610,13 @@ export async function initNetwork( // Install wake detector to reset backoff on sleep/wake cleanupWakeDetector = installWakeDetector(handleWakeFromSleep); + // Start periodic cleanup task for stale peers + cleanupIntervalId = setInterval(() => { + if (!signal.aborted) { + cleanupStalePeers(); + } + }, cleanupIntervalMs); + /** * Explicitly close a connection to a peer. * Marks the peer as intentionally closed to prevent automatic reconnection. @@ -541,12 +689,18 @@ export async function initNetwork( cleanupWakeDetector(); cleanupWakeDetector = undefined; } + // Stop cleanup interval + if (cleanupIntervalId) { + clearInterval(cleanupIntervalId); + cleanupIntervalId = undefined; + } stopController.abort(); // cancels all delays and dials await connectionFactory.stop(); channels.clear(); reconnectionManager.clear(); messageQueues.clear(); intentionallyClosed.clear(); + lastConnectionTime.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 2b895fbb2..66f51c626 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -44,6 +44,26 @@ export type RemoteCommsOptions = { * If not provided, uses the default MAX_QUEUE value. */ maxQueue?: number | undefined; + /** + * Maximum number of concurrent connections. + * If not provided, uses the default MAX_CONCURRENT_CONNECTIONS value (100). + */ + maxConcurrentConnections?: number | undefined; + /** + * Maximum message size in bytes. + * If not provided, uses the default MAX_MESSAGE_SIZE_BYTES value (1MB). + */ + maxMessageSizeBytes?: number | undefined; + /** + * Stale peer cleanup interval in milliseconds. + * If not provided, uses the default CLEANUP_INTERVAL_MS value (15 minutes). + */ + cleanupIntervalMs?: number | undefined; + /** + * Stale peer timeout in milliseconds (time before a disconnected peer is considered stale). + * If not provided, uses the default STALE_PEER_TIMEOUT_MS value (1 hour). + */ + stalePeerTimeoutMs?: number | undefined; }; export type RemoteInfo = { diff --git a/vitest.config.ts b/vitest.config.ts index 852d14fc6..2ac4cd2b2 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -92,10 +92,10 @@ export default defineConfig({ lines: 87.52, }, 'packages/kernel-errors/**': { - statements: 100, - functions: 100, + statements: 97.57, + functions: 96.07, branches: 100, - lines: 100, + lines: 97.57, }, 'packages/kernel-language-model-service/**': { statements: 98.35, @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.53, - functions: 98.54, - branches: 97.59, - lines: 96.53, + statements: 96.25, + functions: 98.55, + branches: 97.32, + lines: 96.25, }, 'packages/omnium-gatherum/**': { statements: 5.67, From ba2ab0314f0e9dd2a891584f67ee73daf0d3f0b0 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 18 Dec 2025 16:01:07 +0100 Subject: [PATCH 02/19] fix bugs --- .../src/errors/ResourceLimitError.test.ts | 233 ++++++++++++++++++ .../src/errors/ResourceLimitError.ts | 12 +- .../ocap-kernel/src/remotes/network.test.ts | 145 ++++++++++- packages/ocap-kernel/src/remotes/network.ts | 38 +++ vitest.config.ts | 14 +- 5 files changed, 433 insertions(+), 9 deletions(-) create mode 100644 packages/kernel-errors/src/errors/ResourceLimitError.test.ts diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts new file mode 100644 index 000000000..ae2bf6da4 --- /dev/null +++ b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts @@ -0,0 +1,233 @@ +import { describe, it, expect } from 'vitest'; + +import { ResourceLimitError } from './ResourceLimitError.ts'; +import { ErrorCode, ErrorSentinel } from '../constants.ts'; +import { unmarshalErrorOptions } from '../marshal/unmarshalError.ts'; +import type { MarshaledOcapError } from '../types.ts'; + +describe('ResourceLimitError', () => { + it('creates a ResourceLimitError with the correct properties', () => { + const error = new ResourceLimitError('Connection limit exceeded'); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Connection limit exceeded'); + expect(error.data).toBeUndefined(); + }); + + it('creates a ResourceLimitError with connection limit data', () => { + const error = new ResourceLimitError('Connection limit exceeded', { + data: { + limitType: 'connection', + current: 100, + limit: 100, + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Connection limit exceeded'); + expect(error.data).toStrictEqual({ + limitType: 'connection', + current: 100, + limit: 100, + }); + }); + + it('creates a ResourceLimitError with message size limit data', () => { + const error = new ResourceLimitError('Message size limit exceeded', { + data: { + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Message size limit exceeded'); + expect(error.data).toStrictEqual({ + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }); + }); + + it('creates a ResourceLimitError with partial data', () => { + const error = new ResourceLimitError('Resource limit exceeded', { + data: { + limitType: 'connection', + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.data).toStrictEqual({ + limitType: 'connection', + }); + }); + + it('creates a ResourceLimitError with a cause', () => { + const cause = new Error('Original error'); + const error = new ResourceLimitError('Resource limit exceeded', { cause }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.cause).toBe(cause); + }); + + it('creates a ResourceLimitError with a custom stack', () => { + const customStack = 'custom stack trace'; + const error = new ResourceLimitError('Resource limit exceeded', { + stack: customStack, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.stack).toBe(customStack); + }); + + it('unmarshals a valid marshaled ResourceLimitError with connection limit data', () => { + const marshaledError: MarshaledOcapError = { + [ErrorSentinel]: true, + message: 'Connection limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 100, + limit: 100, + }, + stack: 'stack trace', + }; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Connection limit exceeded'); + expect(unmarshaledError.stack).toBe('stack trace'); + expect(unmarshaledError.data).toStrictEqual({ + limitType: 'connection', + current: 100, + limit: 100, + }); + }); + + it('unmarshals a valid marshaled ResourceLimitError with message size limit data', () => { + const marshaledError: MarshaledOcapError = { + [ErrorSentinel]: true, + message: 'Message size limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }, + stack: 'stack trace', + }; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Message size limit exceeded'); + expect(unmarshaledError.data).toStrictEqual({ + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }); + }); + + it('unmarshals a valid marshaled ResourceLimitError without data', () => { + const marshaledError = { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + stack: 'stack trace', + } as unknown as MarshaledOcapError; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Resource limit exceeded'); + expect(unmarshaledError.data).toBeUndefined(); + }); + + it.each([ + { + name: 'invalid limitType value', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'invalid', + current: 100, + limit: 100, + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"', + }, + { + name: 'invalid current type', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 'not a number', + limit: 100, + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.current -- Expected a number, but received: "not a number"', + }, + { + name: 'invalid limit type', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 100, + limit: 'not a number', + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.limit -- Expected a number, but received: "not a number"', + }, + { + name: 'wrong error code', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: 'WRONG_ERROR_CODE' as ErrorCode, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"RESOURCE_LIMIT_ERROR"`, but received: "WRONG_ERROR_CODE"', + }, + { + name: 'missing required fields', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + // Missing code field + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"RESOURCE_LIMIT_ERROR"`, but received: undefined', + }, + ])( + 'throws an error when unmarshaling with $name', + ({ marshaledError, expectedError }) => { + expect(() => + ResourceLimitError.unmarshal(marshaledError, unmarshalErrorOptions), + ).toThrow(expectedError); + }, + ); +}); diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts index 1d11c8f14..cec6fcef8 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -60,7 +60,17 @@ export class ResourceLimitError extends BaseError { ): ResourceLimitError { assert(marshaledError, this.struct); const options = unmarshalErrorOptions(marshaledError); - return new ResourceLimitError(marshaledError.message, options); + const data = marshaledError.data as + | { + limitType?: 'connection' | 'messageSize'; + current?: number; + limit?: number; + } + | undefined; + return new ResourceLimitError(marshaledError.message, { + ...options, + ...(data !== undefined && { data }), + }); } } harden(ResourceLimitError); diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 7388338a6..03d0c3f76 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -1,5 +1,5 @@ import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; -import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; +import { delay, makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; import { describe, expect, @@ -2147,4 +2147,147 @@ describe('network.initNetwork', () => { setIntervalSpy.mockRestore(); }); }); + + describe('reconnection respects connection limit', () => { + it('blocks reconnection when connection limit is reached', async () => { + const customLimit = 2; + const mockChannels: MockChannel[] = []; + // Create mock channels + for (let i = 0; i < customLimit; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + } + // Set up reconnection state + let reconnecting = false; + mockReconnectionManager.isReconnecting.mockImplementation( + () => reconnecting, + ); + mockReconnectionManager.startReconnection.mockImplementation(() => { + reconnecting = true; + }); + mockReconnectionManager.stopReconnection.mockImplementation(() => { + reconnecting = false; + }); + mockReconnectionManager.shouldRetry.mockReturnValue(true); + mockReconnectionManager.incrementAttempt.mockReturnValue(1); + mockReconnectionManager.calculateBackoff.mockReturnValue(100); // Small delay to ensure ordering + const { abortableDelay } = await import('@metamask/kernel-utils'); + (abortableDelay as ReturnType).mockImplementation( + async (ms: number) => { + // Use real delay to allow other operations to complete + await new Promise((resolve) => setTimeout(resolve, ms)); + }, + ); + // Set up dial mocks - initial connections + mockConnectionFactory.dialIdempotent + .mockResolvedValueOnce(mockChannels[0]) // peer-0 + .mockResolvedValueOnce(mockChannels[1]); // peer-1 + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Establish connections up to limit + await sendRemoteMessage('peer-0', 'msg'); + await sendRemoteMessage('peer-1', 'msg'); + // Disconnect peer-0 (simulate connection loss) + const peer0Channel = mockChannels[0] as MockChannel; + peer0Channel.msgStream.write.mockRejectedValueOnce( + Object.assign(new Error('Connection lost'), { code: 'ECONNRESET' }), + ); + // Trigger reconnection for peer-0 (this will remove peer-0 from channels) + await sendRemoteMessage('peer-0', 'msg2'); + // Wait for connection loss to be handled (channel removed) + await vi.waitFor( + () => { + expect( + mockReconnectionManager.startReconnection, + ).toHaveBeenCalledWith('peer-0'); + }, + { timeout: 1000 }, + ); + // Now fill the connection limit with a new peer (peer-0 is removed, so we have space) + // Ensure new-peer is NOT in reconnecting state + mockReconnectionManager.isReconnecting.mockImplementation((peerId) => { + return peerId === 'peer-0'; // Only peer-0 is reconnecting + }); + const newPeerChannel = createMockChannel('new-peer'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce( + newPeerChannel, + ); + await sendRemoteMessage('new-peer', 'msg'); + // Wait a bit to ensure new-peer connection is fully established in channels map + await delay(50); + // Mock successful dial for reconnection attempt (but limit will block it) + const reconnectChannel = createMockChannel('peer-0'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce( + reconnectChannel, + ); + // Verify reconnection started + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-0', + ); + // Wait for reconnection attempt to be blocked + await vi.waitFor( + () => { + // Should have logged that reconnection was blocked by limit + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining( + 'peer-0:: reconnection blocked by connection limit', + ), + ); + }, + { timeout: 5000 }, + ); + // Verify reconnection continues (doesn't stop) - shouldRetry should be called + // meaning the loop continues after the limit check fails + expect(mockReconnectionManager.shouldRetry).toHaveBeenCalled(); + }, 10000); + }); + + describe('connection limit race condition', () => { + it('prevents exceeding limit when multiple concurrent dials occur', async () => { + const customLimit = 2; + const mockChannels: MockChannel[] = []; + + // Create mock channels + for (let i = 0; i < customLimit + 1; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + } + + // Set up dial mocks - all dials will succeed + mockConnectionFactory.dialIdempotent.mockImplementation( + async (peerId: string) => { + // Simulate async dial delay + await delay(10); + return mockChannels.find((ch) => ch.peerId === peerId) as MockChannel; + }, + ); + + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Start multiple concurrent dials that all pass the initial limit check + const sendPromises = Promise.all([ + sendRemoteMessage('peer-0', 'msg0'), + sendRemoteMessage('peer-1', 'msg1'), + sendRemoteMessage('peer-2', 'msg2'), // This should be blocked after dial + ]); + await sendPromises; + // Verify that only 2 channels were added (the limit) + // The third one should have been rejected after dial completed + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-2:: connection limit reached after dial'), + ); + // Verify that peer-2's message was queued + expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg2'); + // Verify that reconnection was started for peer-2 (to retry later) + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-2', + ); + }, 10000); + }); }); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 45b8d4e7a..ee77ec3d0 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -310,6 +310,27 @@ export async function initNetwork( false, // No retry here, we're already in a retry loop ); + // Check connection limit before adding channel + // This prevents exceeding the limit if other connections were established + // during the reconnection delay + try { + checkConnectionLimit(); + } catch (limitError) { + // Connection limit reached - treat as retryable and continue loop + // The limit might free up when other connections close + logger.log( + `${peerId}:: reconnection blocked by connection limit, will retry`, + ); + outputError( + peerId, + `reconnection attempt ${nextAttempt}`, + limitError, + ); + // Don't add channel - connection will naturally close + // Continue the reconnection loop + continue; + } + // Add channel to manager channels.set(peerId, channel); lastConnectionTime.set(peerId, Date.now()); @@ -524,6 +545,7 @@ export async function initNetwork( let channel = channels.get(targetPeerId); if (!channel) { // Check connection limit before dialing new connection + // (Early check to fail fast, but we'll check again after dial to prevent race conditions) checkConnectionLimit(); try { @@ -544,6 +566,22 @@ export async function initNetwork( return; } + // Re-check connection limit after dial completes to prevent race conditions + // Multiple concurrent dials could all pass the initial check, then all add channels + try { + checkConnectionLimit(); + } catch { + // Connection limit reached - close the dialed channel and queue the message + logger.log( + `${targetPeerId}:: connection limit reached after dial, queueing message`, + ); + queue.enqueue(message); + // Don't add channel - connection will naturally close + // Start reconnection to retry later when limit might free up + handleConnectionLoss(targetPeerId); + return; + } + channels.set(targetPeerId, channel); lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { diff --git a/vitest.config.ts b/vitest.config.ts index 2ac4cd2b2..21f799b64 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -92,10 +92,10 @@ export default defineConfig({ lines: 87.52, }, 'packages/kernel-errors/**': { - statements: 97.57, - functions: 96.07, + statements: 100, + functions: 100, branches: 100, - lines: 97.57, + lines: 100, }, 'packages/kernel-language-model-service/**': { statements: 98.35, @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.25, + statements: 96.27, functions: 98.55, - branches: 97.32, - lines: 96.25, + branches: 97.33, + lines: 96.27, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 9f83890aecbfdc8797c7ec952fe5a753f4f4fa97 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 18 Dec 2025 16:05:33 +0100 Subject: [PATCH 03/19] merge --- vitest.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vitest.config.ts b/vitest.config.ts index 21f799b64..dbcc42b01 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { From b5d7b3c915b3e16f56470b9bc1793797dad55912 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:06:42 +0100 Subject: [PATCH 04/19] close channel to release network resources --- .../src/remotes/ConnectionFactory.test.ts | 48 ++++++++ .../src/remotes/ConnectionFactory.ts | 44 ++++++++ .../ocap-kernel/src/remotes/network.test.ts | 106 ++++++++++++++++++ packages/ocap-kernel/src/remotes/network.ts | 21 +++- vitest.config.ts | 8 +- 5 files changed, 220 insertions(+), 7 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts b/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts index ff7841661..fe9bb320a 100644 --- a/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts +++ b/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts @@ -1111,6 +1111,54 @@ describe('ConnectionFactory', () => { }); }); + describe('closeChannel', () => { + it('closes underlying stream when close is available', async () => { + factory = await createFactory(); + const close = vi.fn().mockResolvedValue(undefined); + const channel = { + peerId: 'peer-close', + msgStream: { + unwrap: () => ({ close }), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(close).toHaveBeenCalled(); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: closed channel stream`, + ); + }); + + it('aborts underlying stream when abort is available', async () => { + factory = await createFactory(); + const abort = vi.fn(); + const channel = { + peerId: 'peer-abort', + msgStream: { + unwrap: () => ({ abort }), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(abort).toHaveBeenCalledWith(expect.any(AbortError)); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: aborted channel stream`, + ); + }); + + it('logs when underlying stream lacks close and abort', async () => { + factory = await createFactory(); + const channel = { + peerId: 'peer-none', + msgStream: { + unwrap: () => ({}), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: channel stream lacks close/abort, relying on natural closure`, + ); + }); + }); + describe('integration scenarios', () => { it('handles complete connection lifecycle', async () => { createLibp2p.mockImplementation(async () => ({ diff --git a/packages/ocap-kernel/src/remotes/ConnectionFactory.ts b/packages/ocap-kernel/src/remotes/ConnectionFactory.ts index e833fd788..db3ffe2a0 100644 --- a/packages/ocap-kernel/src/remotes/ConnectionFactory.ts +++ b/packages/ocap-kernel/src/remotes/ConnectionFactory.ts @@ -323,6 +323,50 @@ export class ConnectionFactory { return promise; } + /** + * Close a channel's underlying stream to release network resources. + * + * @param channel - The channel to close. + * @param peerId - The peer ID for logging. + */ + async closeChannel(channel: Channel, peerId: string): Promise { + try { + // ByteStream.unwrap() returns the underlying libp2p stream. + const maybeWrapper = channel.msgStream as unknown as { + unwrap?: () => unknown; + }; + const underlying = + typeof maybeWrapper.unwrap === 'function' + ? maybeWrapper.unwrap() + : undefined; + + const closable = underlying as + | { close?: () => Promise } + | undefined; + if (closable?.close) { + await closable.close(); + this.#logger.log(`${peerId}:: closed channel stream`); + return; + } + + const abortable = underlying as + | { abort?: (error?: Error) => void } + | undefined; + if (abortable?.abort) { + abortable.abort(new AbortError()); + this.#logger.log(`${peerId}:: aborted channel stream`); + return; + } + + // If we cannot explicitly close/abort, log and rely on natural closure. + this.#logger.log( + `${peerId}:: channel stream lacks close/abort, relying on natural closure`, + ); + } catch (problem) { + this.#outputError(peerId, 'closing channel stream', problem); + } + } + /** * Output an error message. * diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 03d0c3f76..df55e88e4 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -60,6 +60,7 @@ const mockReconnectionManager = { resetBackoff: vi.fn(), resetAllBackoffs: vi.fn(), clear: vi.fn(), + clearPeer: vi.fn(), }; vi.mock('./ReconnectionManager.ts', () => { @@ -81,6 +82,8 @@ vi.mock('./ReconnectionManager.ts', () => { resetAllBackoffs = mockReconnectionManager.resetAllBackoffs; clear = mockReconnectionManager.clear; + + clearPeer = mockReconnectionManager.clearPeer; } return { ReconnectionManager: MockReconnectionManager, @@ -100,6 +103,7 @@ const mockConnectionFactory = { dialIdempotent: vi.fn(), onInboundConnection: vi.fn(), stop: vi.fn().mockResolvedValue(undefined), + closeChannel: vi.fn().mockResolvedValue(undefined), }; vi.mock('./ConnectionFactory.ts', () => { @@ -183,10 +187,12 @@ describe('network.initNetwork', () => { mockReconnectionManager.resetBackoff.mockClear(); mockReconnectionManager.resetAllBackoffs.mockClear(); mockReconnectionManager.clear.mockClear(); + mockReconnectionManager.clearPeer.mockClear(); mockConnectionFactory.dialIdempotent.mockClear(); mockConnectionFactory.onInboundConnection.mockClear(); mockConnectionFactory.stop.mockClear(); + mockConnectionFactory.closeChannel.mockClear(); mockLogger.log.mockClear(); mockLogger.error.mockClear(); @@ -2146,6 +2152,101 @@ describe('network.initNetwork', () => { expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg'); setIntervalSpy.mockRestore(); }); + + it('cleans up stale peers and calls clearPeer', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + // Stale cleanup only applies when there is no active channel. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const stalePeerTimeoutMs = 1; + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { stalePeerTimeoutMs }, + vi.fn(), + ); + // Establish connection (sets lastConnectionTime) + await sendRemoteMessage('peer-1', 'msg'); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Ensure enough wall-clock time passes to exceed stalePeerTimeoutMs. + await delay(stalePeerTimeoutMs + 5); + // Run cleanup; stale peer should be cleaned + intervalFn?.(); + // Verify clearPeer was called + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + // Verify cleanup log message + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-1:: cleaning up stale peer data'), + ); + setIntervalSpy.mockRestore(); + }); + + it('respects custom cleanupIntervalMs option', async () => { + const customInterval = 30 * 60 * 1000; // 30 minutes + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((_fn: () => void, _ms?: number) => { + return 1 as unknown as NodeJS.Timeout; + }); + await initNetwork( + '0x1234', + { cleanupIntervalMs: customInterval }, + vi.fn(), + ); + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + customInterval, + ); + setIntervalSpy.mockRestore(); + }); + + it('respects custom stalePeerTimeoutMs option', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const customTimeout = 50; + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { + stalePeerTimeoutMs: customTimeout, + }, + vi.fn(), + ); + // Establish connection + await sendRemoteMessage('peer-1', 'msg'); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Run cleanup quickly; peer should not be stale yet. + intervalFn?.(); + // Peer should not be cleaned (not stale yet) + expect(mockReconnectionManager.clearPeer).not.toHaveBeenCalled(); + // Wait beyond the custom timeout, then run cleanup again. + await delay(customTimeout + 10); + intervalFn?.(); + // Now peer should be cleaned + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + setIntervalSpy.mockRestore(); + }); }); describe('reconnection respects connection limit', () => { @@ -2236,6 +2337,11 @@ describe('network.initNetwork', () => { 'peer-0:: reconnection blocked by connection limit', ), ); + // Verify closeChannel was called to release network resources + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + reconnectChannel, + 'peer-0', + ); }, { timeout: 5000 }, ); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index ee77ec3d0..06337e8d2 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -326,7 +326,8 @@ export async function initNetwork( `reconnection attempt ${nextAttempt}`, limitError, ); - // Don't add channel - connection will naturally close + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, peerId); // Continue the reconnection loop continue; } @@ -575,8 +576,9 @@ export async function initNetwork( logger.log( `${targetPeerId}:: connection limit reached after dial, queueing message`, ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, targetPeerId); queue.enqueue(message); - // Don't add channel - connection will naturally close // Start reconnection to retry later when limit might free up handleConnectionLoss(targetPeerId); return; @@ -634,7 +636,20 @@ export async function initNetwork( logger.log( `${channel.peerId}:: rejecting inbound connection due to connection limit`, ); - // Don't add to channels map - connection will naturally close + // Explicitly close the channel to release network resources + const closePromise = connectionFactory.closeChannel( + channel, + channel.peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); + } return; } diff --git a/vitest.config.ts b/vitest.config.ts index dbcc42b01..85ef9b3b9 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.27, + statements: 96.4, functions: 98.55, - branches: 97.33, - lines: 96.27, + branches: 97.27, + lines: 96.4, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 89c09f6a97b2243b2d21455a91ab29892184b4b7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:20:17 +0100 Subject: [PATCH 05/19] small refactor --- packages/ocap-kernel/src/remotes/network.ts | 62 ++++++++++++--------- vitest.config.ts | 6 +- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 06337e8d2..9be7a50a9 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -276,9 +276,7 @@ export async function initNetwork( logger.log( `${peerId}:: max reconnection attempts (${maxAttempts}) reached, giving up`, ); - reconnectionManager.stopReconnection(peerId); - queue.clear(); - onRemoteGiveUp?.(peerId); + giveUpOnPeer(peerId, queue); return; } @@ -332,17 +330,11 @@ export async function initNetwork( continue; } - // Add channel to manager - channels.set(peerId, channel); - lastConnectionTime.set(peerId, Date.now()); + // Register channel and start reading + registerChannel(peerId, channel); logger.log(`${peerId}:: reconnection successful`); - // Start reading from the new channel - readChannel(channel).catch((problem) => { - outputError(peerId, `reading channel to`, problem); - }); - // Flush queued messages await flushQueuedMessages(peerId, channel, queue); @@ -365,9 +357,7 @@ export async function initNetwork( } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); - reconnectionManager.stopReconnection(peerId); - queue.clear(); - onRemoteGiveUp?.(peerId); + giveUpOnPeer(peerId, queue); return; } outputError(peerId, `reconnection attempt ${nextAttempt}`, problem); @@ -461,6 +451,37 @@ export async function initNetwork( } } + /** + * Register a channel and start reading from it. + * + * @param peerId - The peer ID for the channel. + * @param channel - The channel to register. + * @param errorContext - Optional context for error messages when reading fails. + */ + function registerChannel( + peerId: string, + channel: Channel, + errorContext = 'reading channel to', + ): void { + channels.set(peerId, channel); + lastConnectionTime.set(peerId, Date.now()); + readChannel(channel).catch((problem) => { + outputError(peerId, errorContext, problem); + }); + } + + /** + * Give up on a peer after max retries or non-retryable error. + * + * @param peerId - The peer ID to give up on. + * @param queue - The message queue for the peer. + */ + function giveUpOnPeer(peerId: string, queue: MessageQueue): void { + reconnectionManager.stopReconnection(peerId); + queue.clear(); + onRemoteGiveUp?.(peerId); + } + /** * Clean up stale peer data for peers disconnected for more than 1 hour. */ @@ -584,18 +605,13 @@ export async function initNetwork( return; } - channels.set(targetPeerId, channel); - lastConnectionTime.set(targetPeerId, Date.now()); + registerChannel(targetPeerId, channel); } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); queue.enqueue(message); return; } - - readChannel(channel).catch((problem) => { - outputError(targetPeerId, `reading channel to`, problem); - }); } try { @@ -653,11 +669,7 @@ export async function initNetwork( return; } - channels.set(channel.peerId, channel); - lastConnectionTime.set(channel.peerId, Date.now()); - readChannel(channel).catch((error) => { - outputError(channel.peerId, 'error in inbound channel read', error); - }); + registerChannel(channel.peerId, channel, 'error in inbound channel read'); }); // Install wake detector to reset backoff on sleep/wake diff --git a/vitest.config.ts b/vitest.config.ts index 85ef9b3b9..6cd1d386b 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.4, - functions: 98.55, + statements: 96.41, + functions: 98.56, branches: 97.27, - lines: 96.4, + lines: 96.41, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 5e4e59e8e0397edba6b06b5b186abbef9166b6a7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:29:20 +0100 Subject: [PATCH 06/19] update last timestamp on inbound message receipt --- packages/ocap-kernel/src/remotes/network.ts | 1 + vitest.config.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 9be7a50a9..a7dc568ab 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -218,6 +218,7 @@ export async function initNetwork( if (readBuf) { reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); + lastConnectionTime.set(channel.peerId, Date.now()); // update timestamp on inbound activity } else { // Stream ended (returned undefined), exit the read loop logger.log(`${channel.peerId}:: stream ended`); diff --git a/vitest.config.ts b/vitest.config.ts index 6cd1d386b..a01f220b9 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { From a44644ab43e741097ff35745fb7f3fd0cb1ff1a4 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:47:12 +0100 Subject: [PATCH 07/19] fix yet another bug --- packages/ocap-kernel/src/remotes/network.ts | 42 ++++++++++++--------- vitest.config.ts | 6 +-- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index a7dc568ab..2bb5ea30a 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -589,24 +589,32 @@ export async function initNetwork( return; } - // Re-check connection limit after dial completes to prevent race conditions - // Multiple concurrent dials could all pass the initial check, then all add channels - try { - checkConnectionLimit(); - } catch { - // Connection limit reached - close the dialed channel and queue the message - logger.log( - `${targetPeerId}:: connection limit reached after dial, queueing message`, - ); - // Explicitly close the channel to release network resources - await connectionFactory.closeChannel(channel, targetPeerId); - queue.enqueue(message); - // Start reconnection to retry later when limit might free up - handleConnectionLoss(targetPeerId); - return; - } + // Check if a concurrent call already registered a channel for this peer + // (dialIdempotent may return the same channel due to deduplication) + const existingChannel = channels.get(targetPeerId); + if (existingChannel) { + // Another concurrent call already registered the channel, use it + channel = existingChannel; + } else { + // Re-check connection limit after dial completes to prevent race conditions + // Multiple concurrent dials could all pass the initial check, then all add channels + try { + checkConnectionLimit(); + } catch { + // Connection limit reached - close the dialed channel and queue the message + logger.log( + `${targetPeerId}:: connection limit reached after dial, queueing message`, + ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, targetPeerId); + queue.enqueue(message); + // Start reconnection to retry later when limit might free up + handleConnectionLoss(targetPeerId); + return; + } - registerChannel(targetPeerId, channel); + registerChannel(targetPeerId, channel); + } } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); diff --git a/vitest.config.ts b/vitest.config.ts index a01f220b9..fa9f43251 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.41, + statements: 96.42, functions: 98.56, branches: 97.27, - lines: 96.41, + lines: 96.42, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 2d8437ec1e91801ee4061430e499349c3be1ddf6 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 21:14:18 +0100 Subject: [PATCH 08/19] Check connection limit for inbound connections --- packages/ocap-kernel/src/remotes/network.ts | 45 +++++++++++---------- vitest.config.ts | 2 +- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 2bb5ea30a..20f20a005 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -654,28 +654,31 @@ export async function initNetwork( return; } - // Check connection limit for inbound connections - try { - checkConnectionLimit(); - } catch { - logger.log( - `${channel.peerId}:: rejecting inbound connection due to connection limit`, - ); - // Explicitly close the channel to release network resources - const closePromise = connectionFactory.closeChannel( - channel, - channel.peerId, - ); - if (typeof closePromise?.catch === 'function') { - closePromise.catch((problem) => { - outputError( - channel.peerId, - 'closing rejected inbound channel', - problem, - ); - }); + // Check connection limit for inbound connections only if no existing channel + // If a channel already exists, this is likely a reconnection and the peer already has a slot + if (!channels.has(channel.peerId)) { + try { + checkConnectionLimit(); + } catch { + logger.log( + `${channel.peerId}:: rejecting inbound connection due to connection limit`, + ); + // Explicitly close the channel to release network resources + const closePromise = connectionFactory.closeChannel( + channel, + channel.peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); + } + return; } - return; } registerChannel(channel.peerId, channel, 'error in inbound channel read'); diff --git a/vitest.config.ts b/vitest.config.ts index fa9f43251..2f4da3937 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { From ca35a69a2b5829fac7dfd499f6e904c33cf8414e Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 5 Jan 2026 16:31:58 +0100 Subject: [PATCH 09/19] remove redundant check --- packages/ocap-kernel/src/remotes/network.ts | 6 +----- vitest.config.ts | 6 +++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 20f20a005..6397d19f3 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -523,11 +523,7 @@ export async function initNetwork( lastConnectionTime.delete(peerId); messageQueues.delete(peerId); locationHints.delete(peerId); - - // Clear reconnection state if any - if (reconnectionManager.isReconnecting(peerId)) { - reconnectionManager.stopReconnection(peerId); - } + // Clear reconnection state reconnectionManager.clearPeer(peerId); } } diff --git a/vitest.config.ts b/vitest.config.ts index 2f4da3937..88ac4d733 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.42, + statements: 96.45, functions: 98.56, - branches: 97.27, - lines: 96.42, + branches: 97.34, + lines: 96.45, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 799b1d3173c08ccf608ab1f435f05d98514c8072 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 5 Jan 2026 16:35:41 +0100 Subject: [PATCH 10/19] thresholds --- vitest.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vitest.config.ts b/vitest.config.ts index 88ac4d733..e919fcb3f 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { From 8323837f1a1349351991f5b1568e942272d461a7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 5 Jan 2026 21:50:48 +0100 Subject: [PATCH 11/19] close channel after dials --- packages/ocap-kernel/src/remotes/network.ts | 7 +++++++ vitest.config.ts | 10 +++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 6397d19f3..15dc21c85 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -582,6 +582,9 @@ export async function initNetwork( `${targetPeerId}:: reconnection started during dial, queueing message ` + `(${queue.length}/${maxQueue}): ${message}`, ); + // Explicitly close the channel to release network resources + // The reconnection loop will dial its own new channel + await connectionFactory.closeChannel(channel, targetPeerId); return; } @@ -589,6 +592,10 @@ export async function initNetwork( // (dialIdempotent may return the same channel due to deduplication) const existingChannel = channels.get(targetPeerId); if (existingChannel) { + // Close the dialed channel if it's different from the existing one + if (channel !== existingChannel) { + await connectionFactory.closeChannel(channel, targetPeerId); + } // Another concurrent call already registered the channel, use it channel = existingChannel; } else { diff --git a/vitest.config.ts b/vitest.config.ts index e919fcb3f..c1e668b6b 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -106,7 +106,7 @@ export default defineConfig({ 'packages/kernel-platforms/**': { statements: 99.38, functions: 100, - branches: 96.2, + branches: 96.25, lines: 99.38, }, 'packages/kernel-rpc-methods/**': { @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.45, + statements: 96.42, functions: 98.56, - branches: 97.34, - lines: 96.45, + branches: 97.27, + lines: 96.42, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 4b98dfe78c5fa75656e89778edde5f4b5d3d90c9 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 16:50:10 +0100 Subject: [PATCH 12/19] fix bug --- .../ocap-kernel/src/remotes/network.test.ts | 106 ++++++++++++++++++ packages/ocap-kernel/src/remotes/network.ts | 56 +++++++-- vitest.config.ts | 8 +- 3 files changed, 154 insertions(+), 16 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index df55e88e4..d9bd6f290 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -1237,6 +1237,112 @@ describe('network.initNetwork', () => { ); }); }); + + it('reuses existing channel when inbound connection arrives during reconnection dial', async () => { + // Capture inbound handler before init + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler) => { + inboundHandler = handler; + }, + ); + + // Drive reconnection state deterministically + let reconnecting = false; + mockReconnectionManager.isReconnecting.mockImplementation( + () => reconnecting, + ); + mockReconnectionManager.startReconnection.mockImplementation(() => { + reconnecting = true; + }); + mockReconnectionManager.stopReconnection.mockImplementation(() => { + reconnecting = false; + }); + mockReconnectionManager.shouldRetry.mockReturnValue(true); + mockReconnectionManager.incrementAttempt.mockReturnValue(1); + mockReconnectionManager.calculateBackoff.mockReturnValue(0); // No delay for test + + const { abortableDelay } = await import('@metamask/kernel-utils'); + (abortableDelay as ReturnType).mockResolvedValue(undefined); + + // Create two different channels: one for reconnection dial, one for inbound + const reconnectionChannel = createMockChannel('peer-1'); + const inboundChannel = createMockChannel('peer-1'); + reconnectionChannel.msgStream.write.mockResolvedValue(undefined); + inboundChannel.msgStream.write.mockResolvedValue(undefined); + inboundChannel.msgStream.read.mockImplementation( + async () => + new Promise(() => { + /* Never resolves - keeps channel active */ + }), + ); + + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + + // Set up initial connection that will fail on write + const initialChannel = createMockChannel('peer-1'); + initialChannel.msgStream.write + .mockResolvedValueOnce(undefined) // First write succeeds + .mockRejectedValueOnce( + Object.assign(new Error('Connection lost'), { code: 'ECONNRESET' }), + ); // Second write fails, triggering reconnection + + // Make dialIdempotent delay for reconnection to allow inbound connection to arrive first + let dialResolve: ((value: MockChannel) => void) | undefined; + mockConnectionFactory.dialIdempotent + .mockResolvedValueOnce(initialChannel) // Initial connection + .mockImplementation( + async () => + new Promise((resolve) => { + dialResolve = resolve; + }), + ); // Reconnection dial (pending) + + // Establish initial connection + await sendRemoteMessage('peer-1', 'msg-1'); + + // Trigger connection loss to start reconnection + await sendRemoteMessage('peer-1', 'msg-2'); + + // Wait for reconnection to start and begin dialing + await vi.waitFor(() => { + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-1', + ); + }); + + // While reconnection dial is pending, inbound connection arrives and registers channel + inboundHandler?.(inboundChannel); + + // Wait for inbound channel to be registered + await vi.waitFor(() => { + expect(inboundChannel.msgStream.read).toHaveBeenCalled(); + }); + + // Now resolve the reconnection dial + dialResolve?.(reconnectionChannel); + + // Wait for reconnection to complete + await vi.waitFor(() => { + // Should detect existing channel and close the dialed one + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + reconnectionChannel, + 'peer-1', + ); + // Should log that existing channel is being reused + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer-1:: reconnection: channel already exists, reusing existing channel', + ); + // Should stop reconnection (successful) + expect(mockReconnectionManager.stopReconnection).toHaveBeenCalledWith( + 'peer-1', + ); + }); + + // Verify only one channel is active (the inbound one) + // The reconnection channel should have been closed, not registered + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledTimes(1); + }); }); describe('error handling', () => { diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 15dc21c85..cdfcc891b 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -303,7 +303,7 @@ export async function initNetwork( try { const hints = locationHints.get(peerId) ?? []; - const channel = await connectionFactory.dialIdempotent( + let channel = await connectionFactory.dialIdempotent( peerId, hints, false, // No retry here, we're already in a retry loop @@ -331,8 +331,19 @@ export async function initNetwork( continue; } - // Register channel and start reading - registerChannel(peerId, channel); + // Check if a concurrent call already registered a channel for this peer + // (e.g., an inbound connection or another reconnection attempt) + // (dialIdempotent may return the same channel due to deduplication) + const dialedChannel = channel; + channel = await reuseOrReturnChannel(peerId, channel); + if (channel === dialedChannel) { + // Register the new channel and start reading + registerChannel(peerId, channel); + } else { + logger.log( + `${peerId}:: reconnection: channel already exists, reusing existing channel`, + ); + } logger.log(`${peerId}:: reconnection successful`); @@ -471,6 +482,31 @@ export async function initNetwork( }); } + /** + * Check if an existing channel exists for a peer, and if so, reuse it. + * Otherwise, return the dialed channel for the caller to register. + * + * @param peerId - The peer ID for the channel. + * @param dialedChannel - The newly dialed channel. + * @returns The channel to use (either existing or the dialed one). + */ + async function reuseOrReturnChannel( + peerId: string, + dialedChannel: Channel, + ): Promise { + const existingChannel = channels.get(peerId); + if (existingChannel) { + // Close the dialed channel if it's different from the existing one + if (dialedChannel !== existingChannel) { + await connectionFactory.closeChannel(dialedChannel, peerId); + } + // Another concurrent call already registered the channel, use it + return existingChannel; + } + // No existing channel, return the dialed one for caller to register + return dialedChannel; + } + /** * Give up on a peer after max retries or non-retryable error. * @@ -590,15 +626,9 @@ export async function initNetwork( // Check if a concurrent call already registered a channel for this peer // (dialIdempotent may return the same channel due to deduplication) - const existingChannel = channels.get(targetPeerId); - if (existingChannel) { - // Close the dialed channel if it's different from the existing one - if (channel !== existingChannel) { - await connectionFactory.closeChannel(channel, targetPeerId); - } - // Another concurrent call already registered the channel, use it - channel = existingChannel; - } else { + const dialedChannel = channel; + channel = await reuseOrReturnChannel(targetPeerId, channel); + if (channel === dialedChannel) { // Re-check connection limit after dial completes to prevent race conditions // Multiple concurrent dials could all pass the initial check, then all add channels try { @@ -616,8 +646,10 @@ export async function initNetwork( return; } + // Register the new channel and start reading registerChannel(targetPeerId, channel); } + // else: Existing channel reused, nothing more to do } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); diff --git a/vitest.config.ts b/vitest.config.ts index c1e668b6b..440b5e699 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.42, + statements: 96.46, functions: 98.56, - branches: 97.27, - lines: 96.42, + branches: 97.35, + lines: 96.46, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 79bae98f3841fd7f4f6d5277838c248cfb71ffb6 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 17:21:13 +0100 Subject: [PATCH 13/19] fix more bugs oof --- .../ocap-kernel/src/remotes/network.test.ts | 54 ++++++++++++++++ packages/ocap-kernel/src/remotes/network.ts | 64 ++++++++++--------- vitest.config.ts | 4 +- 3 files changed, 89 insertions(+), 33 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index d9bd6f290..08be5e4d0 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -2353,6 +2353,60 @@ describe('network.initNetwork', () => { expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); setIntervalSpy.mockRestore(); }); + + it('cleans up intentionallyClosed entries for stale peers', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const stalePeerTimeoutMs = 1; + const { sendRemoteMessage, closeConnection } = await initNetwork( + '0x1234', + { stalePeerTimeoutMs }, + vi.fn(), + ); + // Establish connection and then intentionally close it + await sendRemoteMessage('peer-1', 'msg'); + await closeConnection('peer-1'); + // Verify peer is marked as intentionally closed + await expect(sendRemoteMessage('peer-1', 'msg2')).rejects.toThrow( + 'Message delivery failed after intentional close', + ); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Ensure enough wall-clock time passes to exceed stalePeerTimeoutMs. + await delay(stalePeerTimeoutMs + 5); + // Run cleanup; stale peer should be cleaned, including intentionallyClosed entry + intervalFn?.(); + // Verify clearPeer was called + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + // Verify cleanup log message + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-1:: cleaning up stale peer data'), + ); + // After cleanup, peer should no longer be in intentionallyClosed + // Verify by attempting to send a message - it should not throw the intentional close error + const newChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(newChannel); + // Should not throw "Message delivery failed after intentional close" + // (it will attempt to dial a new connection instead) + await sendRemoteMessage('peer-1', 'msg-after-cleanup'); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledWith( + 'peer-1', + [], + true, + ); + setIntervalSpy.mockRestore(); + }); }); describe('reconnection respects connection limit', () => { diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index cdfcc891b..9224e57e8 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -309,40 +309,40 @@ export async function initNetwork( false, // No retry here, we're already in a retry loop ); - // Check connection limit before adding channel - // This prevents exceeding the limit if other connections were established - // during the reconnection delay - try { - checkConnectionLimit(); - } catch (limitError) { - // Connection limit reached - treat as retryable and continue loop - // The limit might free up when other connections close - logger.log( - `${peerId}:: reconnection blocked by connection limit, will retry`, - ); - outputError( - peerId, - `reconnection attempt ${nextAttempt}`, - limitError, - ); - // Explicitly close the channel to release network resources - await connectionFactory.closeChannel(channel, peerId); - // Continue the reconnection loop - continue; - } - // Check if a concurrent call already registered a channel for this peer // (e.g., an inbound connection or another reconnection attempt) - // (dialIdempotent may return the same channel due to deduplication) - const dialedChannel = channel; + // Use channels.has() instead of object identity check because dialIdempotent + // may return the same channel object due to deduplication, making + // channel === dialedChannel true even when the channel was already registered channel = await reuseOrReturnChannel(peerId, channel); - if (channel === dialedChannel) { - // Register the new channel and start reading - registerChannel(peerId, channel); - } else { + if (channels.has(peerId)) { logger.log( `${peerId}:: reconnection: channel already exists, reusing existing channel`, ); + } else { + // Re-check connection limit after reuseOrReturnChannel to prevent race conditions + // Other connections (inbound or outbound) could be established during the await + try { + checkConnectionLimit(); + } catch (limitError) { + // Connection limit reached - treat as retryable and continue loop + // The limit might free up when other connections close + logger.log( + `${peerId}:: reconnection blocked by connection limit, will retry`, + ); + outputError( + peerId, + `reconnection attempt ${nextAttempt}`, + limitError, + ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, peerId); + // Continue the reconnection loop + continue; + } + + // Register the new channel and start reading + registerChannel(peerId, channel); } logger.log(`${peerId}:: reconnection successful`); @@ -559,6 +559,7 @@ export async function initNetwork( lastConnectionTime.delete(peerId); messageQueues.delete(peerId); locationHints.delete(peerId); + intentionallyClosed.delete(peerId); // Clear reconnection state reconnectionManager.clearPeer(peerId); } @@ -625,10 +626,11 @@ export async function initNetwork( } // Check if a concurrent call already registered a channel for this peer - // (dialIdempotent may return the same channel due to deduplication) - const dialedChannel = channel; + // Use channels.has() instead of object identity check because dialIdempotent + // may return the same channel object due to deduplication, making + // channel === dialedChannel true even when the channel was already registered channel = await reuseOrReturnChannel(targetPeerId, channel); - if (channel === dialedChannel) { + if (!channels.has(targetPeerId)) { // Re-check connection limit after dial completes to prevent race conditions // Multiple concurrent dials could all pass the initial check, then all add channels try { diff --git a/vitest.config.ts b/vitest.config.ts index 440b5e699..0d0322df5 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -106,7 +106,7 @@ export default defineConfig({ 'packages/kernel-platforms/**': { statements: 99.38, functions: 100, - branches: 96.25, + branches: 96.2, lines: 99.38, }, 'packages/kernel-rpc-methods/**': { @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { From 972080b17ce74e88efcef54ac304d2f4bc2d1ce7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 17:22:47 +0100 Subject: [PATCH 14/19] cleanup --- packages/ocap-kernel/src/remotes/network.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 9224e57e8..393efbdba 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -311,9 +311,6 @@ export async function initNetwork( // Check if a concurrent call already registered a channel for this peer // (e.g., an inbound connection or another reconnection attempt) - // Use channels.has() instead of object identity check because dialIdempotent - // may return the same channel object due to deduplication, making - // channel === dialedChannel true even when the channel was already registered channel = await reuseOrReturnChannel(peerId, channel); if (channels.has(peerId)) { logger.log( From 33e187da082e26c9d67ba50e3aea88128a536f45 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 17:24:29 +0100 Subject: [PATCH 15/19] moooore cleanup --- packages/ocap-kernel/src/remotes/network.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 393efbdba..50d3871b3 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -623,9 +623,6 @@ export async function initNetwork( } // Check if a concurrent call already registered a channel for this peer - // Use channels.has() instead of object identity check because dialIdempotent - // may return the same channel object due to deduplication, making - // channel === dialedChannel true even when the channel was already registered channel = await reuseOrReturnChannel(targetPeerId, channel); if (!channels.has(targetPeerId)) { // Re-check connection limit after dial completes to prevent race conditions From a85f40614c0da8b091df9d054620fe2a5529c765 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 18:43:29 +0100 Subject: [PATCH 16/19] fixing moooore bugs --- packages/ocap-kernel/src/remotes/network.ts | 39 +++++++++++++++++---- vitest.config.ts | 6 ++-- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 50d3871b3..ca4fe5ba0 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -312,7 +312,16 @@ export async function initNetwork( // Check if a concurrent call already registered a channel for this peer // (e.g., an inbound connection or another reconnection attempt) channel = await reuseOrReturnChannel(peerId, channel); - if (channels.has(peerId)) { + // Re-check after await to handle race condition where a channel was registered + // concurrently during the microtask delay + const registeredChannel = channels.get(peerId); + if (registeredChannel) { + // A channel was registered concurrently, use it instead + if (channel !== registeredChannel) { + // Close the dialed channel to prevent resource leak + await connectionFactory.closeChannel(channel, peerId); + } + channel = registeredChannel; logger.log( `${peerId}:: reconnection: channel already exists, reusing existing channel`, ); @@ -609,12 +618,16 @@ export async function initNetwork( true, // With retry for initial connection ); + // Re-fetch queue after dial in case cleanupStalePeers deleted it during the await + // This prevents orphaned messages in a stale queue reference + const currentQueue = getMessageQueue(targetPeerId); + // Check if reconnection started while we were dialing (race condition protection) if (reconnectionManager.isReconnecting(targetPeerId)) { - queue.enqueue(message); + currentQueue.enqueue(message); logger.log( `${targetPeerId}:: reconnection started during dial, queueing message ` + - `(${queue.length}/${maxQueue}): ${message}`, + `(${currentQueue.length}/${maxQueue}): ${message}`, ); // Explicitly close the channel to release network resources // The reconnection loop will dial its own new channel @@ -624,7 +637,18 @@ export async function initNetwork( // Check if a concurrent call already registered a channel for this peer channel = await reuseOrReturnChannel(targetPeerId, channel); - if (!channels.has(targetPeerId)) { + // Re-check after await to handle race condition where a channel was registered + // concurrently during the microtask delay + const registeredChannel = channels.get(targetPeerId); + if (registeredChannel) { + // A channel was registered concurrently, use it instead + if (channel !== registeredChannel) { + // Close the dialed channel to prevent resource leak + await connectionFactory.closeChannel(channel, targetPeerId); + } + channel = registeredChannel; + // Existing channel reused, nothing more to do + } else { // Re-check connection limit after dial completes to prevent race conditions // Multiple concurrent dials could all pass the initial check, then all add channels try { @@ -636,7 +660,7 @@ export async function initNetwork( ); // Explicitly close the channel to release network resources await connectionFactory.closeChannel(channel, targetPeerId); - queue.enqueue(message); + currentQueue.enqueue(message); // Start reconnection to retry later when limit might free up handleConnectionLoss(targetPeerId); return; @@ -645,11 +669,12 @@ export async function initNetwork( // Register the new channel and start reading registerChannel(targetPeerId, channel); } - // else: Existing channel reused, nothing more to do } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); - queue.enqueue(message); + // Re-fetch queue in case cleanupStalePeers deleted it during the dial await + const currentQueue = getMessageQueue(targetPeerId); + currentQueue.enqueue(message); return; } } diff --git a/vitest.config.ts b/vitest.config.ts index 0d0322df5..a6e2613ea 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.46, + statements: 96.4, functions: 98.56, - branches: 97.35, - lines: 96.46, + branches: 97.21, + lines: 96.4, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 4092b734aab3d7d96437111ed5b7fb07faebcae9 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 19:56:12 +0100 Subject: [PATCH 17/19] fixing all bugs --- .../ocap-kernel/src/remotes/network.test.ts | 153 +++++++++++++++++ packages/ocap-kernel/src/remotes/network.ts | 156 ++++++++++++++++-- vitest.config.ts | 6 +- 3 files changed, 294 insertions(+), 21 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 08be5e4d0..6c83e6c48 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -2556,4 +2556,157 @@ describe('network.initNetwork', () => { ); }, 10000); }); + + it('registerLocationHints merges with existing hints', async () => { + const { registerLocationHints, sendRemoteMessage } = await initNetwork( + '0x1234', + {}, + vi.fn(), + ); + + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + // Register initial hints + registerLocationHints('peer-1', ['hint1', 'hint2']); + + // Register additional hints (should merge) + registerLocationHints('peer-1', ['hint2', 'hint3']); + + await sendRemoteMessage('peer-1', 'msg'); + + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledWith( + 'peer-1', + ['hint1', 'hint2', 'hint3'], + true, + ); + }); + + it('registerLocationHints creates new set when no existing hints', async () => { + const { registerLocationHints, sendRemoteMessage } = await initNetwork( + '0x1234', + {}, + vi.fn(), + ); + + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + registerLocationHints('peer-1', ['hint1', 'hint2']); + + await sendRemoteMessage('peer-1', 'msg'); + + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledWith( + 'peer-1', + ['hint1', 'hint2'], + true, + ); + }); + + it('registerChannel closes replaced channel', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation((handler) => { + inboundHandler = handler; + }); + + await initNetwork('0x1234', {}, vi.fn()); + + const channel1 = createMockChannel('peer-1'); + const channel2 = createMockChannel('peer-1'); + + inboundHandler?.(channel1); + + await vi.waitFor(() => { + expect(channel1.msgStream.read).toHaveBeenCalled(); + }); + + inboundHandler?.(channel2); + + await vi.waitFor(() => { + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + channel1, + 'peer-1', + ); + }); + }); + + it('handles closeChannel error when replacing channel', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation((handler) => { + inboundHandler = handler; + }); + + mockConnectionFactory.closeChannel.mockRejectedValueOnce( + new Error('Close failed'), + ); + + await initNetwork('0x1234', {}, vi.fn()); + + const channel1 = createMockChannel('peer-1'); + const channel2 = createMockChannel('peer-1'); + + inboundHandler?.(channel1); + + await vi.waitFor(() => { + expect(channel1.msgStream.read).toHaveBeenCalled(); + }); + + inboundHandler?.(channel2); + + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('error closing replaced channel'), + ); + }); + }); + + it('closes rejected inbound channel from intentionally closed peer', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation((handler) => { + inboundHandler = handler; + }); + + const { closeConnection } = await initNetwork('0x1234', {}, vi.fn()); + + await closeConnection('peer-1'); + + const inboundChannel = createMockChannel('peer-1'); + inboundHandler?.(inboundChannel); + + await vi.waitFor(() => { + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + inboundChannel, + 'peer-1', + ); + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer-1:: rejecting inbound connection from intentionally closed peer', + ); + }); + }); + + it('handles error when closing rejected inbound from intentionally closed peer', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation((handler) => { + inboundHandler = handler; + }); + + mockConnectionFactory.closeChannel.mockRejectedValueOnce( + new Error('Close failed'), + ); + + const { closeConnection } = await initNetwork('0x1234', {}, vi.fn()); + + await closeConnection('peer-1'); + + const inboundChannel = createMockChannel('peer-1'); + inboundHandler?.(inboundChannel); + + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining( + 'error closing rejected inbound channel from intentionally closed peer', + ), + ); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index ca4fe5ba0..ea0ff13d0 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -191,6 +191,7 @@ export async function initNetwork( try { readBuf = await channel.msgStream.read(); } catch (problem) { + const isCurrentChannel = channels.get(channel.peerId) === channel; // Detect graceful disconnect const rtcProblem = problem as { errorDetail?: string; @@ -200,17 +201,27 @@ export async function initNetwork( rtcProblem?.errorDetail === 'sctp-failure' && rtcProblem?.sctpCauseCode === SCTP_USER_INITIATED_ABORT ) { - logger.log(`${channel.peerId}:: remote intentionally disconnected`); - // Mark as intentionally closed and don't trigger reconnection - intentionallyClosed.add(channel.peerId); - } else { + if (isCurrentChannel) { + logger.log( + `${channel.peerId}:: remote intentionally disconnected`, + ); + // Mark as intentionally closed and don't trigger reconnection + intentionallyClosed.add(channel.peerId); + } else { + logger.log( + `${channel.peerId}:: stale channel intentionally disconnected`, + ); + } + } else if (isCurrentChannel) { outputError( channel.peerId, `reading message from ${channel.peerId}`, problem, ); // Only trigger reconnection for non-intentional disconnects - handleConnectionLoss(channel.peerId); + handleConnectionLoss(channel.peerId, channel); + } else { + logger.log(`${channel.peerId}:: ignoring error from stale channel`); } logger.log(`closed channel to ${channel.peerId}`); throw problem; @@ -239,8 +250,15 @@ export async function initNetwork( * Skips reconnection if the peer was intentionally closed. * * @param peerId - The peer ID to handle the connection loss for. + * @param channel - Optional channel that experienced loss; used to ignore stale channels. */ - function handleConnectionLoss(peerId: string): void { + function handleConnectionLoss(peerId: string, channel?: Channel): void { + const currentChannel = channels.get(peerId); + // Ignore loss signals from stale channels if a different channel is active. + if (channel && currentChannel && currentChannel !== channel) { + logger.log(`${peerId}:: ignoring connection loss from stale channel`); + return; + } // Don't reconnect if this peer intentionally closed the connection if (intentionallyClosed.has(peerId)) { logger.log( @@ -270,7 +288,8 @@ export async function initNetwork( peerId: string, maxAttempts = maxRetryAttempts ?? DEFAULT_MAX_RETRY_ATTEMPTS, ): Promise { - const queue = getMessageQueue(peerId); + // Get queue reference - will re-fetch after long awaits to handle cleanup race conditions + let queue = getMessageQueue(peerId); while (reconnectionManager.isReconnecting(peerId) && !signal.aborted) { if (!reconnectionManager.shouldRetry(peerId, maxAttempts)) { @@ -297,21 +316,46 @@ export async function initNetwork( throw error; } + // Re-fetch queue after delay in case cleanupStalePeers deleted it during the await + queue = getMessageQueue(peerId); + + // Re-check reconnection state after the await; it may have been stopped concurrently + if (!reconnectionManager.isReconnecting(peerId) || signal.aborted) { + return; + } + + // If peer was intentionally closed while reconnecting, stop and exit + if (intentionallyClosed.has(peerId)) { + reconnectionManager.stopReconnection(peerId); + return; + } + logger.log( `${peerId}:: reconnection attempt ${nextAttempt}${maxAttempts ? `/${maxAttempts}` : ''}`, ); try { const hints = locationHints.get(peerId) ?? []; - let channel = await connectionFactory.dialIdempotent( + let channel: Channel | null = await connectionFactory.dialIdempotent( peerId, hints, false, // No retry here, we're already in a retry loop ); + // Re-fetch queue after dial in case cleanupStalePeers deleted it during the await + queue = getMessageQueue(peerId); + // Check if a concurrent call already registered a channel for this peer // (e.g., an inbound connection or another reconnection attempt) channel = await reuseOrReturnChannel(peerId, channel); + // Handle case where existing channel died during await and dialed channel was closed + if (channel === null) { + logger.log( + `${peerId}:: existing channel died during reuse check, continuing reconnection loop`, + ); + // Channel died and dialed channel was already closed, continue loop to re-dial + continue; + } // Re-check after await to handle race condition where a channel was registered // concurrently during the microtask delay const registeredChannel = channels.get(peerId); @@ -422,7 +466,7 @@ export async function initNetwork( // Re-queue any failed messages if (failedMessages.length > 0) { queue.replaceAll(failedMessages); - handleConnectionLoss(peerId); + handleConnectionLoss(peerId, channel); } } @@ -481,11 +525,25 @@ export async function initNetwork( channel: Channel, errorContext = 'reading channel to', ): void { + const previousChannel = channels.get(peerId); channels.set(peerId, channel); lastConnectionTime.set(peerId, Date.now()); readChannel(channel).catch((problem) => { outputError(peerId, errorContext, problem); }); + + // If we replaced an existing channel, close it to avoid leaks and stale readers. + if (previousChannel && previousChannel !== channel) { + const closePromise = connectionFactory.closeChannel( + previousChannel, + peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError(peerId, 'closing replaced channel', problem); + }); + } + } } /** @@ -494,20 +552,47 @@ export async function initNetwork( * * @param peerId - The peer ID for the channel. * @param dialedChannel - The newly dialed channel. - * @returns The channel to use (either existing or the dialed one). + * @returns The channel to use (either existing or the dialed one), or null if + * the existing channel died during the await and the dialed channel was already closed. */ async function reuseOrReturnChannel( peerId: string, dialedChannel: Channel, - ): Promise { + ): Promise { const existingChannel = channels.get(peerId); if (existingChannel) { // Close the dialed channel if it's different from the existing one if (dialedChannel !== existingChannel) { await connectionFactory.closeChannel(dialedChannel, peerId); + // Re-check if existing channel is still valid after await + // It may have been removed if readChannel exited during the close, + // or a new channel may have been registered concurrently + const currentChannel = channels.get(peerId); + if (currentChannel === existingChannel) { + // Existing channel is still valid, use it + return existingChannel; + } + if (currentChannel) { + // A different channel was registered concurrently, use that instead + return currentChannel; + } + // Existing channel died during await, but we already closed dialed channel + // Return null to signal caller needs to handle this (re-dial or fail) + return null; + } + // Same channel, check if it's still valid + const currentChannel = channels.get(peerId); + if (currentChannel === existingChannel) { + // Still the same channel, use it + return existingChannel; } - // Another concurrent call already registered the channel, use it - return existingChannel; + if (currentChannel) { + // A different channel was registered concurrently, use that instead + return currentChannel; + } + // Channel died, but we can't close dialed channel since it's the same + // Return null to signal caller needs to handle this + return null; } // No existing channel, return the dialed one for caller to register return dialedChannel; @@ -604,7 +689,7 @@ export async function initNetwork( return; } - let channel = channels.get(targetPeerId); + let channel: Channel | null | undefined = channels.get(targetPeerId); if (!channel) { // Check connection limit before dialing new connection // (Early check to fail fast, but we'll check again after dial to prevent race conditions) @@ -637,6 +722,17 @@ export async function initNetwork( // Check if a concurrent call already registered a channel for this peer channel = await reuseOrReturnChannel(targetPeerId, channel); + // Handle case where existing channel died during await and dialed channel was closed + if (channel === null) { + // Existing channel died and dialed channel was already closed + // Trigger reconnection to re-dial + logger.log( + `${targetPeerId}:: existing channel died during reuse check, triggering reconnection`, + ); + currentQueue.enqueue(message); + handleConnectionLoss(targetPeerId); + return; + } // Re-check after await to handle race condition where a channel was registered // concurrently during the microtask delay const registeredChannel = channels.get(targetPeerId); @@ -686,8 +782,10 @@ export async function initNetwork( lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { outputError(targetPeerId, `sending message`, problem); - handleConnectionLoss(targetPeerId); - queue.enqueue(message); + handleConnectionLoss(targetPeerId, channel); + // Re-fetch queue in case cleanupStalePeers deleted it during the await + const currentQueue = getMessageQueue(targetPeerId); + currentQueue.enqueue(message); } } @@ -706,7 +804,20 @@ export async function initNetwork( logger.log( `${channel.peerId}:: rejecting inbound connection from intentionally closed peer`, ); - // Don't add to channels map and don't start reading - connection will naturally close + // Explicitly close the channel to release network resources + const closePromise = connectionFactory.closeChannel( + channel, + channel.peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel from intentionally closed peer', + problem, + ); + }); + } return; } @@ -759,7 +870,8 @@ export async function initNetwork( async function closeConnection(peerId: string): Promise { logger.log(`${peerId}:: explicitly closing connection`); intentionallyClosed.add(peerId); - // Remove channel - the readChannel cleanup will handle stream closure + // Get the channel before removing from map + const channel = channels.get(peerId); channels.delete(peerId); // Stop any ongoing reconnection attempts if (reconnectionManager.isReconnecting(peerId)) { @@ -770,6 +882,14 @@ export async function initNetwork( if (queue) { queue.clear(); } + // Actually close the underlying network connection + if (channel) { + try { + await connectionFactory.closeChannel(channel, peerId); + } catch (problem) { + outputError(peerId, 'closing connection', problem); + } + } } /** diff --git a/vitest.config.ts b/vitest.config.ts index a6e2613ea..a558554b6 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.4, + statements: 95.86, functions: 98.56, - branches: 97.21, - lines: 96.4, + branches: 96.77, + lines: 95.86, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 21903283165f48fdf7df2fa73665560564063809 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 20:13:16 +0100 Subject: [PATCH 18/19] fixing race condition --- packages/ocap-kernel/src/remotes/network.ts | 19 +++++++++++++++++++ vitest.config.ts | 8 ++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index ea0ff13d0..e63764557 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -391,6 +391,16 @@ export async function initNetwork( continue; } + // Check if peer was intentionally closed during dial + if (intentionallyClosed.has(peerId)) { + logger.log( + `${peerId}:: peer intentionally closed during dial, closing channel`, + ); + await connectionFactory.closeChannel(channel, peerId); + reconnectionManager.stopReconnection(peerId); + return; + } + // Register the new channel and start reading registerChannel(peerId, channel); } @@ -762,6 +772,15 @@ export async function initNetwork( return; } + // Check if peer was intentionally closed during dial + if (intentionallyClosed.has(targetPeerId)) { + logger.log( + `${targetPeerId}:: peer intentionally closed during dial, closing channel`, + ); + await connectionFactory.closeChannel(channel, targetPeerId); + throw new Error('Message delivery failed after intentional close'); + } + // Register the new channel and start reading registerChannel(targetPeerId, channel); } diff --git a/vitest.config.ts b/vitest.config.ts index a558554b6..d32e910a6 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 95.86, + statements: 95.66, functions: 98.56, - branches: 96.77, - lines: 95.86, + branches: 96.64, + lines: 95.66, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 71fe98bdc412b55b93d199fa9a2b91acc2b797b6 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 6 Jan 2026 20:33:15 +0100 Subject: [PATCH 19/19] fix Messages stuck in queue when write fails on replaced channel --- packages/ocap-kernel/src/remotes/network.ts | 10 ++++++++++ vitest.config.ts | 8 ++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index e63764557..f855df1ad 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -805,6 +805,16 @@ export async function initNetwork( // Re-fetch queue in case cleanupStalePeers deleted it during the await const currentQueue = getMessageQueue(targetPeerId); currentQueue.enqueue(message); + + // If a new channel is active (stale channel was replaced by inbound connection), + // flush the queue on it to prevent messages from being stuck indefinitely + const newChannel = channels.get(targetPeerId); + if (newChannel && newChannel !== channel) { + logger.log( + `${targetPeerId}:: stale channel replaced, flushing queue on new channel`, + ); + await flushQueuedMessages(targetPeerId, newChannel, currentQueue); + } } } diff --git a/vitest.config.ts b/vitest.config.ts index d32e910a6..902af2815 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 95.66, + statements: 95.58, functions: 98.56, - branches: 96.64, - lines: 95.66, + branches: 96.5, + lines: 95.58, }, 'packages/omnium-gatherum/**': { statements: 5.67,