diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/BrokerSessionIT.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/BrokerSessionIT.java index ff293e9..4d25ca5 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/BrokerSessionIT.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/BrokerSessionIT.java @@ -86,6 +86,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -98,6 +99,18 @@ public class BrokerSessionIT { static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static void waitUntil(BooleanSupplier condition, Duration timeout) + throws InterruptedException { + long deadlineMs = System.currentTimeMillis() + timeout.toMillis(); + while (!condition.getAsBoolean()) { + long remaining = deadlineMs - System.currentTimeMillis(); + if (remaining <= 0) { + throw new AssertionError("Condition not met within " + timeout); + } + Thread.sleep(Math.min(10, remaining)); + } + } + private static Uri createUri() { return BmqBroker.Domains.Priority.generateQueueUri(); } @@ -464,7 +477,7 @@ void openConfigureCloseQueueTest() { * */ @Test - void pushMessageTest() throws IOException { + void pushMessageTest() throws IOException, InterruptedException { logger.info("#TEST_BEGIN BrokerSessionIT pushMessageTest"); BmqBrokerSimulator server = new BmqBrokerSimulator(Mode.BMQ_AUTO_MODE); @@ -498,6 +511,9 @@ void pushMessageTest() throws IOException { server.writePushRequest(1); + // Wait for the push event to be received before closing + waitUntil(() -> !events.isEmpty(), Duration.ofSeconds(3)); + assertEquals(CloseQueueResult.SUCCESS, queue.close(TEST_REQUEST_TIMEOUT)); assertEquals(QueueState.e_CLOSED, queue.getState()); verifyConfigureRequest(++reqId, server.nextClientRequest());