From 415addb1d4dab2118020d0cb5dfc2ebfcf77c135 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 27 Mar 2026 15:04:53 +0100 Subject: [PATCH 1/3] feat(batch): publish the total number of failed tasks --- .../v1/api/collections/batch/BatchContext.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java b/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java index 347ded16a..5f6743965 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java @@ -136,6 +136,8 @@ public final class BatchContext implements Closeable { private final CollectionDescriptor collectionDescriptor; private final CollectionHandleDefaults collectionHandleDefaults; + private volatile int numberOfErrors; + /** * Internal execution service. Its lifecycle is bound to that of the * BatchContext: it's started when the context is initialized @@ -314,12 +316,25 @@ private TaskHandle add(final TaskHandle taskHandle) throws InterruptedException // Remove the task from the WIP list as soon as it completes, // successfully or otherwise. Note, that TaskHandle::done future // only completes exceptionally after all retries have been exhausted. - taskHandle.done().whenComplete((__, t) -> wip.remove(taskHandle.id())); + taskHandle.done().whenComplete((__, t) -> { + if (t != null) { + numberOfErrors++; + } + wip.remove(taskHandle.id()); + }); queue.put(taskHandle); return taskHandle; } + /** + * Get the current tally of failed tasks. + * An object is only considered failed if it can no longer be retried. + */ + public int numberOfErrors() { + return numberOfErrors; + } + void start() { if (closed) { throw new IllegalStateException("context is closed"); From 54aaefb2bff1a1f1442bdee847a31b814403b898 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 27 Mar 2026 15:07:38 +0100 Subject: [PATCH 2/3] test(batch): add test for numberOfErrors --- .../client6/v1/api/collections/batch/BatchContextTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java index a7c84690a..3ccd86dc3 100644 --- a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java +++ b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java @@ -502,9 +502,13 @@ public void test_retryPolicy() throws Exception { .noneMatch(CompletableFuture::isCompletedExceptionally); Assertions.assertThat(tasks.subList(BATCH_SIZE - 1, BATCH_SIZE)) - .as("last %d tasks succeeded", BATCH_SIZE) + .as("last %d tasks failed", BATCH_SIZE) .extracting(TaskHandle::done) .allMatch(CompletableFuture::isCompletedExceptionally); + + Assertions.assertThat(context.numberOfErrors()) + .as("number of errors") + .isEqualTo(BATCH_SIZE); } @Test @@ -570,7 +574,6 @@ static String getBeacon(WeaviateProtoBatch.BatchReference reference) { private static final class OutboundStream { private final StreamObserver stream; private final Executor eventThread; - private final List pendingEvents = new ArrayList<>(); OutboundStream(StreamObserver stream, Executor eventThread) { this.stream = stream; From 6d3e4fec605a1bd5299eb54d62eba7d5ce41a0ae Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 27 Mar 2026 15:11:10 +0100 Subject: [PATCH 3/3] docs(batch): document numberOfErrors field --- .../client6/v1/api/collections/batch/BatchContext.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java b/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java index 5f6743965..27b035064 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java @@ -136,6 +136,11 @@ public final class BatchContext implements Closeable { private final CollectionDescriptor collectionDescriptor; private final CollectionHandleDefaults collectionHandleDefaults; + /** + * Tally of the failed items. This value is only written to from + * {@link #retryService} thread, which processes the incoming + * {@link Event.Results}; making it {@code volatile} is sufficient. + */ private volatile int numberOfErrors; /**