diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java b/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java index 347ded16a..27b035064 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java @@ -136,6 +136,13 @@ public final class BatchContext implements Closeable { private final CollectionDescriptor collectionDescriptor; private final CollectionHandleDefaults collectionHandleDefaults; + /** + * Tally of the failed items. This value is only written to from + * {@link #retryService} thread, which processes the incoming + * {@link Event.Results}; making it {@code volatile} is sufficient. + */ + private volatile int numberOfErrors; + /** * Internal execution service. Its lifecycle is bound to that of the * BatchContext: it's started when the context is initialized @@ -314,12 +321,25 @@ private TaskHandle add(final TaskHandle taskHandle) throws InterruptedException // Remove the task from the WIP list as soon as it completes, // successfully or otherwise. Note, that TaskHandle::done future // only completes exceptionally after all retries have been exhausted. - taskHandle.done().whenComplete((__, t) -> wip.remove(taskHandle.id())); + taskHandle.done().whenComplete((__, t) -> { + if (t != null) { + numberOfErrors++; + } + wip.remove(taskHandle.id()); + }); queue.put(taskHandle); return taskHandle; } + /** + * Get the current tally of failed tasks. + * An object is only considered failed if it can no longer be retried. + */ + public int numberOfErrors() { + return numberOfErrors; + } + void start() { if (closed) { throw new IllegalStateException("context is closed"); diff --git a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java index a7c84690a..3ccd86dc3 100644 --- a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java +++ b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java @@ -502,9 +502,13 @@ public void test_retryPolicy() throws Exception { .noneMatch(CompletableFuture::isCompletedExceptionally); Assertions.assertThat(tasks.subList(BATCH_SIZE - 1, BATCH_SIZE)) - .as("last %d tasks succeeded", BATCH_SIZE) + .as("last %d tasks failed", BATCH_SIZE) .extracting(TaskHandle::done) .allMatch(CompletableFuture::isCompletedExceptionally); + + Assertions.assertThat(context.numberOfErrors()) + .as("number of errors") + .isEqualTo(BATCH_SIZE); } @Test @@ -570,7 +574,6 @@ static String getBeacon(WeaviateProtoBatch.BatchReference reference) { private static final class OutboundStream { private final StreamObserver stream; private final Executor eventThread; - private final List pendingEvents = new ArrayList<>(); OutboundStream(StreamObserver stream, Executor eventThread) { this.stream = stream;