From a02b31ae056137073edaa1112190b03a4a1f8ec5 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 11 Aug 2025 16:03:33 +0530 Subject: [PATCH 1/4] [ECO-5517] Implemented implicit attach for getRoot method - Adjusted channel state checks prior to channel modes for read and write objects operations - Added getChannel method to adapter interface, added relevant ensureAttached method with relevant implementation --- .../java/io/ably/lib/objects/Adapter.java | 13 +++++ .../ably/lib/objects/LiveObjectsAdapter.java | 11 +++++ .../io/ably/lib/realtime/ChannelBase.java | 3 ++ .../io/ably/lib/objects/DefaultLiveObjects.kt | 1 + .../kotlin/io/ably/lib/objects/Helpers.kt | 47 ++++++++++++++++++- .../io/ably/lib/objects/unit/UtilsTest.kt | 2 +- 6 files changed, 74 insertions(+), 3 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index b8fa905cd..9203265dd 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -1,6 +1,7 @@ package io.ably.lib.objects; import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.ChannelBase; import io.ably.lib.realtime.ChannelState; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.transport.ConnectionManager; @@ -8,6 +9,7 @@ import io.ably.lib.types.ChannelMode; import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.ClientOptions; +import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.util.Log; import org.jetbrains.annotations.NotNull; @@ -82,4 +84,15 @@ public ChannelState getChannelState(@NotNull String channelName) { public long getTime() throws AblyException { return ably.time(); } + + @Override + public ChannelBase getChannel(@NotNull String channelName) throws AblyException { + if (ably.channels.containsKey(channelName)) { + return ably.channels.get(channelName); + } else { + Log.e(TAG, "attachChannel(): channel not found: " + channelName); + ErrorInfo errorInfo = new ErrorInfo("Channel not found: " + channelName, 404); + throw AblyException.fromErrorInfo(errorInfo); + } + } } diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index fd074fd66..bfa4456b3 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -1,5 +1,6 @@ package io.ably.lib.objects; +import io.ably.lib.realtime.ChannelBase; import io.ably.lib.realtime.ChannelState; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.transport.ConnectionManager; @@ -81,5 +82,15 @@ public interface LiveObjectsAdapter { */ @Blocking long getTime() throws AblyException; + + /** + * Retrieves the channel instance for the specified channel name. + * If the channel does not exist, an AblyException is thrown. + * + * @param channelName the name of the channel to retrieve + * @return the ChannelBase instance for the specified channel + * @throws AblyException if the channel is not found or cannot be retrieved + */ + ChannelBase getChannel(@NotNull String channelName) throws AblyException; } diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 4dd78c8eb..b59251b9f 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -1286,6 +1286,9 @@ public Map getParams() { } public ChannelMode[] getModes() { + if (modes == null) { + return null; + } return modes.toArray(new ChannelMode[modes.size()]); } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index eebe6dc4d..f6c740721 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -93,6 +93,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) { adapter.throwIfInvalidAccessApiConfiguration(channelName) + adapter.ensureAttached(channelName) objectsManager.ensureSynced(state) objectsPool.get(ROOT_OBJECT_ID) as LiveMap } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 56af0cba2..018e7774c 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -5,6 +5,7 @@ import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -28,6 +29,22 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = su } } +internal suspend fun LiveObjectsAdapter.attachAsync(channelName: String) = suspendCancellableCoroutine { continuation -> + try { + this.getChannel(channelName).attach(object : CompletionListener { + override fun onSuccess() { + continuation.resume(Unit) + } + + override fun onError(reason: ErrorInfo) { + continuation.resumeWithException(ablyException(reason)) + } + }) + } catch (e: Exception) { + continuation.resumeWithException(e) + } +} + /** * Spec: RTO15d */ @@ -47,16 +64,42 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe setChannelSerial(channelName, channelSerial) } +internal suspend fun LiveObjectsAdapter.ensureAttached(channelName: String) { + when (val currentChannelStatus = this.getChannelState(channelName)) { + ChannelState.initialized -> attachAsync(channelName) + ChannelState.attached -> return + ChannelState.attaching -> { + val attachDeferred = CompletableDeferred() + getChannel(channelName).once { + when(it.current) { + ChannelState.attached -> attachDeferred.complete(Unit) + else -> { + val exception = ablyException("Channel $channelName is in invalid state: ${it.current}, " + + "error: ${it.reason}", ErrorCode.ChannelStateError) + attachDeferred.completeExceptionally(exception) + } + } + } + if (this.getChannelState(channelName) == ChannelState.attached) { + attachDeferred.complete(Unit) + } + attachDeferred.await() + } + else -> + throw ablyException("Channel $channelName is in invalid state: $currentChannelStatus", ErrorCode.ChannelStateError) + } +} + // Spec: RTLO4b1, RTLO4b2 internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) { - throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe) throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed)) + throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe) } internal fun LiveObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: String) { throwIfEchoMessagesDisabled() - throwIfMissingChannelMode(channelName, ChannelMode.object_publish) throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed, ChannelState.suspended)) + throwIfMissingChannelMode(channelName, ChannelMode.object_publish) } internal fun LiveObjectsAdapter.throwIfUnpublishableState(channelName: String) { diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt index 4fbc63bc6..a6cd9bcf8 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt @@ -253,7 +253,7 @@ class UtilsTest { } asyncScope.launchWithCallback(callback) { - delay(1000) // Long delay + delay(10000) // Long delay "test result" } From 0ac1e1a6c3473c8e270a4746a0b5c3326630614a Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 11 Aug 2025 18:20:19 +0530 Subject: [PATCH 2/4] [ECO-5517] Removed implicit channel attach from integration test setup --- .../integration/DefaultLiveObjectsTest.kt | 2 +- .../integration/setup/IntegrationTest.kt | 9 ++------- .../lib/objects/integration/setup/Sandbox.kt | 17 ----------------- 3 files changed, 3 insertions(+), 25 deletions(-) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt index 7bc4a5a6d..ebef88ea1 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt @@ -33,7 +33,7 @@ class DefaultLiveObjectsTest : IntegrationTest() { // Initialize the root map on the channel with initial data restObjects.initializeRootMap(channelName) - val channel = getRealtimeChannel(channelName, autoAttach = false) + val channel = getRealtimeChannel(channelName) val objects = channel.objects assertNotNull(objects) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt index 6cad20508..dd3f41a59 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt @@ -36,7 +36,7 @@ abstract class IntegrationTest { * @return The attached realtime channel. * @throws Exception If the channel fails to attach or the client fails to connect. */ - internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1", autoAttach: Boolean = true): Channel { + internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1"): Channel { val client = realtimeClients.getOrPut(clientId) { sandbox.createRealtimeClient { this.clientId = clientId @@ -46,12 +46,7 @@ abstract class IntegrationTest { val channelOpts = ChannelOptions().apply { modes = arrayOf(ChannelMode.object_publish, ChannelMode.object_subscribe) } - return client.channels.get(channelName, channelOpts).apply { - if (autoAttach) { - attach() - ensureAttached() - } - } + return client.channels.get(channelName, channelOpts) } /** diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt index f38009450..cfcd4ed2b 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt @@ -92,20 +92,3 @@ internal suspend fun AblyRealtime.ensureConnected() { } connectedDeferred.await() } - -internal suspend fun Channel.ensureAttached() { - if (this.state == ChannelState.attached) { - return - } - val attachedDeferred = CompletableDeferred() - this.on { - if (it.event == ChannelEvent.attached) { - attachedDeferred.complete(Unit) - this.off() - } else if (it.event != ChannelEvent.attaching) { - attachedDeferred.completeExceptionally(ablyException(it.reason)) - this.off() - } - } - attachedDeferred.await() -} From 523dfe4fa7b0ef2b7071da21e050824b71bf01eb Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 12 Aug 2025 14:34:53 +0530 Subject: [PATCH 3/4] [ECO-5517] Refactored Adapter implementation, moved to liveobjects Helpers - Removed unnecessary implementation from Adapter, made it clean - Updated unit tests/mocks accordingly --- .../java/io/ably/lib/objects/Adapter.java | 55 +------------------ .../ably/lib/objects/LiveObjectsAdapter.java | 52 +----------------- .../kotlin/io/ably/lib/objects/Helpers.kt | 42 +++++++++++--- .../lib/objects/unit/ObjectMessageSizeTest.kt | 12 ++-- .../io/ably/lib/objects/unit/TestHelpers.kt | 4 +- 5 files changed, 46 insertions(+), 119 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index 9203265dd..4c7da7551 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -2,15 +2,10 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.realtime.ChannelState; -import io.ably.lib.realtime.CompletionListener; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; -import io.ably.lib.types.ChannelMode; -import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; -import io.ably.lib.types.ProtocolMessage; import io.ably.lib.util.Log; import org.jetbrains.annotations.NotNull; @@ -22,54 +17,6 @@ public Adapter(@NotNull AblyRealtime ably) { this.ably = ably; } - @Override - public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) { - if (ably.channels.containsKey(channelName)) { - ably.channels.get(channelName).properties.channelSerial = channelSerial; - } else { - Log.e(TAG, "setChannelSerial(): channel not found: " + channelName); - } - } - - @Override - public void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener listener) throws AblyException { - // Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment - ably.connection.connectionManager.send(msg, true, listener); - } - - @Override - public int maxMessageSizeLimit() { - return ably.connection.connectionManager.maxMessageSize; - } - - @Override - public ChannelMode[] getChannelModes(@NotNull String channelName) { - if (ably.channels.containsKey(channelName)) { - // RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set - ChannelMode[] modes = ably.channels.get(channelName).getModes(); - if (modes != null) { - return modes; - } - // RTO2b - otherwise as a best effort use user provided channel options - ChannelOptions options = ably.channels.get(channelName).getOptions(); - if (options != null && options.hasModes()) { - return options.modes; - } - return null; - } - Log.e(TAG, "getChannelMode(): channel not found: " + channelName); - return null; - } - - @Override - public ChannelState getChannelState(@NotNull String channelName) { - if (ably.channels.containsKey(channelName)) { - return ably.channels.get(channelName).state; - } - Log.e(TAG, "getChannelState(): channel not found: " + channelName); - return null; - } - @Override public @NotNull ClientOptions getClientOptions() { return ably.options; @@ -86,7 +33,7 @@ public long getTime() throws AblyException { } @Override - public ChannelBase getChannel(@NotNull String channelName) throws AblyException { + public @NotNull ChannelBase getChannel(@NotNull String channelName) throws AblyException { if (ably.channels.containsKey(channelName)) { return ably.channels.get(channelName); } else { diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index bfa4456b3..9f2db0595 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -1,63 +1,13 @@ package io.ably.lib.objects; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.realtime.ChannelState; -import io.ably.lib.realtime.CompletionListener; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; -import io.ably.lib.types.ChannelMode; import io.ably.lib.types.ClientOptions; -import io.ably.lib.types.ProtocolMessage; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; public interface LiveObjectsAdapter { - /** - * Sends a protocol message to its intended recipient. - * This method transmits a protocol message, allowing for queuing events if necessary, - * and notifies the provided listener upon the success or failure of the send operation. - * - * @param msg the protocol message to send. - * @param listener a listener to be notified of the success or failure of the send operation. - * @throws AblyException if an error occurs during the send operation. - */ - void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener listener) throws AblyException; - - /** - * Sets the channel serial for a specific channel. - * @param channelName the name of the channel for which to set the serial - * @param channelSerial the serial to set for the channel - */ - void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial); - - /** - * Retrieves the maximum message size allowed for the messages. - * This method returns the maximum size in bytes that a message can have. - * - * @return the maximum message size limit in bytes. - */ - int maxMessageSizeLimit(); - - /** - * Retrieves the channel modes for a specific channel. - * This method returns the modes that are set for the specified channel. - * - * @param channelName the name of the channel for which to retrieve the modes - * @return the array of channel modes for the specified channel, or null if the channel is not found - * Spec: RTO2a, RTO2b - */ - @Nullable ChannelMode[] getChannelModes(@NotNull String channelName); - - /** - * Retrieves the current state of a specific channel. - * This method returns the state of the specified channel, which indicates its connection status. - * - * @param channelName the name of the channel for which to retrieve the state - * @return the current state of the specified channel, or null if the channel is not found - */ - @Nullable ChannelState getChannelState(@NotNull String channelName); - /** * Retrieves the client options configured for the Ably client. * Used to access client configuration parameters such as echoMessages setting @@ -91,6 +41,6 @@ public interface LiveObjectsAdapter { * @return the ChannelBase instance for the specified channel * @throws AblyException if the channel is not found or cannot be retrieved */ - ChannelBase getChannel(@NotNull String channelName) throws AblyException; + @NotNull ChannelBase getChannel(@NotNull String channelName) throws AblyException; } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 018e7774c..0d42d122d 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -15,7 +15,7 @@ import kotlin.coroutines.resumeWithException */ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation -> try { - this.send(message, object : CompletionListener { + connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener { override fun onSuccess() { continuation.resume(Unit) } @@ -31,7 +31,7 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = su internal suspend fun LiveObjectsAdapter.attachAsync(channelName: String) = suspendCancellableCoroutine { continuation -> try { - this.getChannel(channelName).attach(object : CompletionListener { + getChannel(channelName).attach(object : CompletionListener { override fun onSuccess() { continuation.resume(Unit) } @@ -45,11 +45,38 @@ internal suspend fun LiveObjectsAdapter.attachAsync(channelName: String) = suspe } } +/** + * Retrieves the channel modes for a specific channel. + * This method returns the modes that are set for the specified channel. + * + * @param channelName the name of the channel for which to retrieve the modes + * @return the array of channel modes for the specified channel, or null if the channel is not found + * Spec: RTO2a, RTO2b + */ +internal fun LiveObjectsAdapter.getChannelModes(channelName: String): Array? { + val channel = getChannel(channelName) + + // RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set + channel.modes?.let { modes -> + if (modes.isNotEmpty()) { + return modes + } + } + + // RTO2b - otherwise as a best effort use user provided channel options + channel.options?.let { options -> + if (options.hasModes()) { + return options.modes + } + } + return null +} + /** * Spec: RTO15d */ internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Array) { - val maximumAllowedSize = maxMessageSizeLimit() + val maximumAllowedSize = connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -61,11 +88,12 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe if (protocolMessage.action != ProtocolMessage.Action.`object`) return val channelSerial = protocolMessage.channelSerial if (channelSerial.isNullOrEmpty()) return - setChannelSerial(channelName, channelSerial) + getChannel(channelName).properties.channelSerial = channelSerial } internal suspend fun LiveObjectsAdapter.ensureAttached(channelName: String) { - when (val currentChannelStatus = this.getChannelState(channelName)) { + val channel = getChannel(channelName) + when (val currentChannelStatus = channel.state) { ChannelState.initialized -> attachAsync(channelName) ChannelState.attached -> return ChannelState.attaching -> { @@ -80,7 +108,7 @@ internal suspend fun LiveObjectsAdapter.ensureAttached(channelName: String) { } } } - if (this.getChannelState(channelName) == ChannelState.attached) { + if (channel.state == ChannelState.attached) { attachDeferred.complete(Unit) } attachDeferred.await() @@ -119,7 +147,7 @@ internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, c } internal fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array) { - val currentState = getChannelState(channelName) + val currentState = getChannel(channelName).state if (currentState == null || channelStates.contains(currentState)) { throw ablyException("Channel is in invalid state: $currentState", ErrorCode.ChannelStateError) } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index 57ddde1e8..4d3df7e7d 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -12,7 +12,6 @@ import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException -import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test @@ -23,9 +22,9 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeWithinLimit() = runTest { - val mockAdapter = mockk() - every { mockAdapter.maxMessageSizeLimit() } returns Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.maxMessageSizeLimit()) + val mockAdapter = mockk(relaxed = true) + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -148,8 +147,9 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { - val mockAdapter = mockk() - every { mockAdapter.maxMessageSizeLimit() } returns Defaults.maxMessageSize // 64 kb + val mockAdapter = mockk(relaxed = true) + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index 6db9d5ccb..5fc8c590e 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -45,7 +45,9 @@ internal fun getMockRealtimeChannel( } internal fun getMockLiveObjectsAdapter(): LiveObjectsAdapter { - return mockk(relaxed = true) + val mockkAdapter = mockk(relaxed = true) + every { mockkAdapter.getChannel(any()) } returns getMockRealtimeChannel("testChannelName") + return mockkAdapter } internal fun getMockObjectsPool(): ObjectsPool { From a035ee7c82bd3eb5f297740ac885650b468ba2a8 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 12 Aug 2025 16:02:02 +0530 Subject: [PATCH 4/4] [ECO-5517] Added comprehensive unit tests for liveobjects Helpers --- .../io/ably/lib/realtime/ChannelBase.java | 2 +- .../kotlin/io/ably/lib/objects/Helpers.kt | 4 +- .../integration/setup/IntegrationTest.kt | 5 +- .../io/ably/lib/objects/unit/HelpersTest.kt | 448 ++++++++++++++++++ 4 files changed, 453 insertions(+), 6 deletions(-) create mode 100644 live-objects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b59251b9f..6ac064d56 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -1287,7 +1287,7 @@ public Map getParams() { public ChannelMode[] getModes() { if (modes == null) { - return null; + return new ChannelMode[0]; } return modes.toArray(new ChannelMode[modes.size()]); } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 0d42d122d..d0f0e94a9 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -138,7 +138,7 @@ internal fun LiveObjectsAdapter.throwIfUnpublishableState(channelName: String) { } // Spec: RTO2 -internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) { +private fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) { val channelModes = getChannelModes(channelName) if (channelModes == null || !channelModes.contains(channelMode)) { // Spec: RTO2a2, RTO2b2 @@ -146,7 +146,7 @@ internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, c } } -internal fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array) { +private fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array) { val currentState = getChannel(channelName).state if (currentState == null || channelStates.contains(currentState)) { throw ablyException("Channel is in invalid state: $currentState", ErrorCode.ChannelStateError) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt index dd3f41a59..cb46f2f89 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt @@ -29,12 +29,11 @@ abstract class IntegrationTest { /** * Retrieves a realtime channel for the specified channel name and client ID * If a client with the given clientID does not exist, a new client is created using the provided options. - * The channel is attached and ensured to be in the attached state before returning. * * @param channelName Name of the channel * @param clientId The ID of the client to use or create. Defaults to "client1". - * @return The attached realtime channel. - * @throws Exception If the channel fails to attach or the client fails to connect. + * @return The realtime channel in the INITIALIZED state. + * @throws Exception If the client fails to connect. */ internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1"): Channel { val client = realtimeClients.getOrPut(clientId) { diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt new file mode 100644 index 000000000..fe1824bad --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -0,0 +1,448 @@ +package io.ably.lib.objects.unit + +import io.ably.lib.objects.* +import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.ChannelStateListener +import io.ably.lib.realtime.CompletionListener +import io.ably.lib.transport.ConnectionManager +import io.ably.lib.types.* +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.verify +import kotlinx.coroutines.test.runTest +import org.junit.Assert.* +import org.junit.Test +import kotlin.test.assertFailsWith + +class HelpersTest { + + // sendAsync + @Test + fun testSendAsyncShouldQueueAccordingToClientOptions() = runTest { + val adapter = mockk(relaxed = true) + val connManager = mockk(relaxed = true) + val clientOptions = ClientOptions().apply { queueMessages = false } + + every { adapter.connectionManager } returns connManager + every { adapter.clientOptions } returns clientOptions + + every { connManager.send(any(), any(), any()) } answers { + val listener = thirdArg() + listener.onSuccess() + } + + val pm = ProtocolMessage(ProtocolMessage.Action.message) + adapter.sendAsync(pm) + verify(exactly = 1) { connManager.send(any(), false, any()) } + + clientOptions.queueMessages = true + adapter.sendAsync(pm) + verify(exactly = 1) { connManager.send(any(), true, any()) } + } + + @Test + fun testSendAsyncErrorPropagatesAblyException() = runTest { + val adapter = mockk(relaxed = true) + val connManager = mockk(relaxed = true) + val clientOptions = ClientOptions() + + every { adapter.connectionManager } returns connManager + every { adapter.clientOptions } returns clientOptions + + every { connManager.send(any(), any(), any()) } answers { + val listener = thirdArg() + listener.onError(clientError("boom").errorInfo) + } + + val ex = assertFailsWith { + adapter.sendAsync(ProtocolMessage(ProtocolMessage.Action.message)) + } + assertEquals(400, ex.errorInfo.statusCode) + assertEquals(40000, ex.errorInfo.code) + } + + @Test + fun testSendAsyncThrowsWhenConnectionManagerThrows() = runTest { + val adapter = mockk(relaxed = true) + val connManager = mockk(relaxed = true) + val clientOptions = ClientOptions() + + every { adapter.connectionManager } returns connManager + every { adapter.clientOptions } returns clientOptions + + every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") + + val ex = assertFailsWith { + adapter.sendAsync(ProtocolMessage(ProtocolMessage.Action.message)) + } + assertEquals("send failed hard", ex.message) + } + + // attachAsync + @Test + fun testAttachAsyncSuccess() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + every { channel.attach(any()) } answers { + val listener = firstArg() + listener.onSuccess() + } + + adapter.attachAsync("ch") + verify(exactly = 1) { channel.attach(any()) } + } + + @Test + fun testAttachAsyncError() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + every { channel.attach(any()) } answers { + val listener = firstArg() + listener.onError(serverError("attach failed").errorInfo) + } + + val ex = assertFailsWith { adapter.attachAsync("ch") } + assertEquals(50000, ex.errorInfo.code) + assertEquals(500, ex.errorInfo.statusCode) + } + + // getChannelModes + @Test + fun testGetChannelModesPrefersChannelModes() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + every { channel.modes } returns arrayOf(ChannelMode.object_publish) + every { channel.options } returns ChannelOptions().apply { modes = arrayOf(ChannelMode.object_subscribe) } + + val modes = adapter.getChannelModes("ch") + assertArrayEquals(arrayOf(ChannelMode.object_publish), modes) + } + + @Test + fun testGetChannelModesFallsBackToOptions() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + every { channel.modes } returns emptyArray() + every { channel.options } returns ChannelOptions().apply { modes = arrayOf(ChannelMode.object_subscribe) } + + val modes = adapter.getChannelModes("ch") + assertArrayEquals(arrayOf(ChannelMode.object_subscribe), modes) + } + + @Test + fun testGetChannelModesReturnsNullWhenNoModes() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + every { channel.modes } returns null + every { channel.options } returns ChannelOptions().apply { modes = null } + + val modes = adapter.getChannelModes("ch") + assertNull(modes) + } + + @Test + fun testGetChannelModesIgnoresEmptyModes() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + every { channel.modes } returns emptyArray() + every { channel.options } returns ChannelOptions().apply { modes = null } + + val modes = adapter.getChannelModes("ch") + assertNull(modes) + } + + // setChannelSerial + @Test + fun testSetChannelSerialSetsWhenObjectActionAndNonEmpty() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + val props = ChannelProperties() + channel.properties = props + every { adapter.getChannel("ch") } returns channel + + val msg = ProtocolMessage(ProtocolMessage.Action.`object`) + msg.channelSerial = "abc:123" + + adapter.setChannelSerial("ch", msg) + assertEquals("abc:123", props.channelSerial) + } + + @Test + fun testSetChannelSerialNoOpForNonObjectActionOrEmpty() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + val props = ChannelProperties() + channel.properties = props + every { adapter.getChannel("ch") } returns channel + + // Non-object action + val msg1 = ProtocolMessage(ProtocolMessage.Action.message) + msg1.channelSerial = "abc" + adapter.setChannelSerial("ch", msg1) + assertNull(props.channelSerial) + + // Object action but empty serial + val msg2 = ProtocolMessage(ProtocolMessage.Action.`object`) + msg2.channelSerial = "" + adapter.setChannelSerial("ch", msg2) + assertNull(props.channelSerial) + } + + // ensureAttached + @Test + fun testEnsureAttachedFromInitializedAttaches() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.initialized + + val attachCalled = slot() + every { channel.attach(capture(attachCalled)) } answers { + attachCalled.captured.onSuccess() + } + + adapter.ensureAttached("ch") + verify(exactly = 1) { channel.attach(any()) } + } + + @Test + fun testEnsureAttachedWhenAlreadyAttachedReturns() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attached + + adapter.ensureAttached("ch") + // no attach call + verify(exactly = 0) { channel.attach(any()) } + } + + @Test + fun testEnsureAttachedWaitsForAttachingThenAttached() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attaching + + every { channel.once(any()) } answers { + val listener = firstArg() + val stateChange = mockk(relaxed = true) { + setPrivateField("current", ChannelState.attached) + } + listener.onChannelStateChanged(stateChange) + } + + adapter.ensureAttached("ch") + verify(exactly = 1) { channel.once(any()) } + } + + @Test + fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attaching + every { channel.once(any()) } answers { + val listener = firstArg() + val stateChange = mockk(relaxed = true) { + setPrivateField("current", ChannelState.suspended) + setPrivateField("reason", clientError("Not attached").errorInfo) + } + listener.onChannelStateChanged(stateChange) + } + val ex = assertFailsWith { adapter.ensureAttached("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("Not attached")) + verify(exactly = 1) { channel.once(any()) } + } + + @Test + fun testEnsureAttachedThrowsForInvalidState() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.failed + + val ex = assertFailsWith { adapter.ensureAttached("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + } + + // throwIfInvalidAccessApiConfiguration + @Test + fun testThrowIfInvalidAccessApiConfigurationStateDetached() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.detached + + val ex = assertFailsWith { adapter.throwIfInvalidAccessApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + } + + @Test + fun testThrowIfInvalidAccessApiConfigurationMissingMode() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attached + every { channel.modes } returns emptyArray() + every { channel.options } returns ChannelOptions().apply { modes = null } + + val ex = assertFailsWith { adapter.throwIfInvalidAccessApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelModeRequired.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("object_subscribe")) + } + + // throwIfInvalidWriteApiConfiguration + @Test + fun testThrowIfInvalidWriteApiConfigurationEchoDisabled() { + val adapter = mockk(relaxed = true) + val clientOptions = ClientOptions().apply { echoMessages = false } + every { adapter.clientOptions } returns clientOptions + + val ex = assertFailsWith { adapter.throwIfInvalidWriteApiConfiguration("ch") } + assertEquals(ErrorCode.BadRequest.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("echoMessages")) + } + + @Test + fun testThrowIfInvalidWriteApiConfigurationInvalidState() { + val adapter = mockk(relaxed = true) + every { adapter.clientOptions } returns ClientOptions() + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.suspended + + val ex = assertFailsWith { adapter.throwIfInvalidWriteApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + } + + @Test + fun testThrowIfInvalidWriteApiConfigurationMissingMode() { + val adapter = mockk(relaxed = true) + every { adapter.clientOptions } returns ClientOptions() + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attached + every { channel.modes } returns emptyArray() + every { channel.options } returns ChannelOptions().apply { modes = null } + + val ex = assertFailsWith { adapter.throwIfInvalidWriteApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelModeRequired.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("object_publish")) + } + + // throwIfUnpublishableState + @Test + fun testThrowIfUnpublishableStateInactiveConnection() { + val adapter = mockk(relaxed = true) + val connManager = mockk(relaxed = true) + every { adapter.connectionManager } returns connManager + every { connManager.isActive } returns false + every { connManager.stateErrorInfo } returns serverError("not active").errorInfo + + val ex = assertFailsWith { adapter.throwIfUnpublishableState("ch") } + assertEquals(500, ex.errorInfo.statusCode) + assertEquals(50000, ex.errorInfo.code) + } + + @Test + fun testThrowIfUnpublishableStateChannelFailed() { + val adapter = mockk(relaxed = true) + val connManager = mockk(relaxed = true) + every { adapter.connectionManager } returns connManager + every { connManager.isActive } returns true + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.failed + + val ex = assertFailsWith { adapter.throwIfUnpublishableState("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + } + + @Test + fun testAccessConfigThrowsWhenRequiredModeMissing() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attached + // No modes anywhere + every { channel.modes } returns null + every { channel.options } returns ChannelOptions().apply { modes = null } + + val ex = assertFailsWith { adapter.throwIfInvalidAccessApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelModeRequired.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("object_subscribe")) + } + + @Test + fun testWriteConfigThrowsWhenRequiredModeMissing() { + val adapter = mockk(relaxed = true) + every { adapter.clientOptions } returns ClientOptions() // echo enabled + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attached + every { channel.modes } returns emptyArray() + every { channel.options } returns ChannelOptions().apply { modes = null } + + val ex = assertFailsWith { adapter.throwIfInvalidWriteApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelModeRequired.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("object_publish")) + } + + @Test + fun testAccessConfigThrowsOnInvalidChannelState() { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.detached + + val ex = assertFailsWith { adapter.throwIfInvalidAccessApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + } + + @Test + fun testWriteConfigThrowsOnInvalidChannelStates() { + val adapter = mockk(relaxed = true) + every { adapter.clientOptions } returns ClientOptions() + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + + // Suspended should be rejected for write config + channel.state = ChannelState.suspended + val ex1 = assertFailsWith { adapter.throwIfInvalidWriteApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex1.errorInfo.code) + + // Failed should also be rejected + channel.state = ChannelState.failed + val ex2 = assertFailsWith { adapter.throwIfInvalidWriteApiConfiguration("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex2.errorInfo.code) + } + + // Binary utilities + @Test + fun testBinaryEqualityHashCodeAndSize() { + val data1 = byteArrayOf(1, 2, 3, 4) + val data2 = byteArrayOf(1, 2, 3, 4) + val data3 = byteArrayOf(4, 3, 2, 1) + + val b1 = Binary(data1) + val b2 = Binary(data2) + val b3 = Binary(data3) + + assertEquals(b1, b2) + assertEquals(b1.hashCode(), b2.hashCode()) + assertNotEquals(b1, b3) + + assertEquals(4, b1.size()) + } +}