Skip to content

Commit ecf37ee

Browse files
committed
fix: await until tasks are acked
1 parent 9ab63b3 commit ecf37ee

1 file changed

Lines changed: 12 additions & 5 deletions

File tree

src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private void closeContext() throws Exception {
159159

160160
try {
161161
context.close();
162-
eof.get();
162+
eof.get(5, TimeUnit.SECONDS);
163163
} finally {
164164
contextClosed = true;
165165
}
@@ -177,15 +177,19 @@ public void test_sendOneBatch() throws Exception {
177177
List<String> received = recvDataAndAck();
178178
Assertions.assertThat(tasks)
179179
.extracting(TaskHandle::id).containsExactlyInAnyOrderElementsOf(received);
180-
Assertions.assertThat(tasks)
181-
.extracting(TaskHandle::isAcked).allMatch(CompletableFuture::isDone);
180+
181+
CompletableFuture<?>[] tasksAcked = tasks.stream()
182+
.map(TaskHandle::isAcked).toArray(CompletableFuture[]::new);
183+
Assertions.assertThat(CompletableFuture.allOf(tasksAcked))
184+
.succeedsWithin(5, TimeUnit.SECONDS);
182185

183186
out.beforeEof(new Event.Results(received, Collections.emptyMap()));
184187

185188
// Since MockServer runs in the same thread as this test,
186189
// the context will be updated before the last emitEvent returns.
187190
closeContext();
188191

192+
// By the time context.close() returns all tasks MUST have results set.
189193
Assertions.assertThat(tasks).extracting(TaskHandle::result)
190194
.allMatch(CompletableFuture::isDone)
191195
.extracting(CompletableFuture::get).extracting(TaskHandle.Result::error)
@@ -207,8 +211,11 @@ public void test_drainOnClose() throws Exception {
207211
List<String> received = recvDataAndAck();
208212
Assertions.assertThat(tasks).extracting(TaskHandle::id)
209213
.containsExactlyInAnyOrderElementsOf(received);
210-
Assertions.assertThat(tasks).extracting(TaskHandle::isAcked)
211-
.allMatch(CompletableFuture::isDone);
214+
215+
CompletableFuture<?>[] tasksAcked = tasks.stream()
216+
.map(TaskHandle::isAcked).toArray(CompletableFuture[]::new);
217+
Assertions.assertThat(CompletableFuture.allOf(tasksAcked))
218+
.succeedsWithin(5, TimeUnit.SECONDS);
212219
} catch (Exception e) {
213220
throw new RuntimeException(e);
214221
}

0 commit comments

Comments
 (0)