@@ -430,17 +430,18 @@ public void test_reconnect_DrainAfterStreamHangup() throws Exception {
430430 out .emitEventAsync (Event .STARTED );
431431 Future <?> backgroundAcks = backgroundThread .submit (() -> {
432432 try {
433- recvDataAndAck ();
434- recvDataAndAck ();
435- recvDataAndAck ();
433+ List <String > received = new ArrayList <>();
434+
435+ received .addAll (recvDataAndAck ());
436+ received .addAll (recvDataAndAck ());
437+ received .addAll (recvDataAndAck ());
438+
439+ out .emitEvent (new Event .Results (received , Collections .emptyMap ()));
436440 } catch (Exception e ) {
437441 throw new RuntimeException (e );
438442 }
439443 });
440444
441- List <String > submitted = tasks .stream ().map (TaskHandle ::id ).toList ();
442- out .emitEventAsync (new Event .Results (submitted , Collections .emptyMap ()));
443-
444445 closeContext ();
445446 backgroundAcks .get ();
446447
@@ -546,14 +547,20 @@ public void test_startAfterClose() throws Exception {
546547 /**
547548 * Read the next Data message from the stream and ACK it.
548549 * This method does not wait for the server to process the Acks.
550+ *
551+ * @return IDs of tasks that were received.
549552 */
550553 private List <String > recvDataAndAck () throws InterruptedException {
551554 List <String > received = recvData ();
552555 out .emitEventAsync (new Event .Acks (received ));
553556 return received ;
554557 }
555558
556- /** Read the next Data message from the stream. */
559+ /**
560+ * Read the next Data message from the stream.
561+ *
562+ * @return IDs of tasks that were received.
563+ */
557564 private List <String > recvData () throws InterruptedException {
558565 WeaviateProtoBatch .BatchStreamRequest .Data data = in .expectMessage (DATA ).getData ();
559566 return Stream .concat (
0 commit comments