diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
new file mode 100644
index 000000000..cd336e6f5
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
@@ -0,0 +1,371 @@
+package com.linkedin.openhouse.jobs.spark;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
+import com.linkedin.openhouse.common.metrics.OtelEmitter;
+import com.linkedin.openhouse.jobs.spark.state.StateManager;
+import com.linkedin.openhouse.jobs.util.AppConstants;
+import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.Builder;
+import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.iceberg.Table;
+
+/**
+ * Runs orphan file deletion for a batch of tables in one Spark session.
+ *
+ *
Spinning up a Spark session per table is expensive. This app amortizes that cost by processing
+ * many tables in a single job, using a driver-side thread pool so each table's deletion runs
+ * concurrently without competing for executors.
+ *
+ *
When {@code --resultsEndpoint} is supplied, each table's outcome is POSTed to the optimizer
+ * service's update-operation endpoint as it completes, letting the service track per-table status
+ * independently of the overall job.
+ */
+@Slf4j
+public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp {
+ private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final List tableNames;
+ private final int parallelism;
+ private final long ttlSeconds;
+ private final String backupDir;
+ private final int concurrentDeletes;
+ private final boolean streamResults;
+ private final int maxOrphanFileSampleSize;
+ private final List operationIds;
+ private final String resultsEndpoint;
+
+ public BatchedOrphanFilesDeletionSparkApp(
+ String jobId,
+ StateManager stateManager,
+ List tableNames,
+ int parallelism,
+ long ttlSeconds,
+ OtelEmitter otelEmitter,
+ String backupDir,
+ int concurrentDeletes,
+ boolean streamResults,
+ int maxOrphanFileSampleSize,
+ List operationIds,
+ String resultsEndpoint) {
+ super(jobId, stateManager, otelEmitter);
+ this.tableNames = tableNames;
+ this.parallelism = parallelism;
+ this.ttlSeconds = ttlSeconds;
+ this.backupDir = backupDir;
+ this.concurrentDeletes = concurrentDeletes;
+ this.streamResults = streamResults;
+ this.maxOrphanFileSampleSize = maxOrphanFileSampleSize;
+ this.operationIds = operationIds;
+ this.resultsEndpoint = resultsEndpoint;
+ }
+
+ @Override
+ protected void runInner(Operations ops) throws Exception {
+ if (resultsEndpoint != null && operationIds.size() != tableNames.size()) {
+ throw new IllegalArgumentException(
+ "operationIds count ("
+ + operationIds.size()
+ + ") must equal tableNames count ("
+ + tableNames.size()
+ + ") when resultsEndpoint is provided");
+ }
+
+ Map tableToOperationId = new HashMap<>();
+ if (resultsEndpoint != null) {
+ for (int i = 0; i < tableNames.size(); i++) {
+ tableToOperationId.put(tableNames.get(i), operationIds.get(i));
+ }
+ }
+
+ long olderThanTimestampMillis =
+ System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlSeconds);
+
+ log.info(
+ "Starting batched orphan files deletion for {} tables with parallelism {}",
+ tableNames.size(),
+ parallelism);
+
+ int poolSize = Math.min(parallelism, Math.max(1, tableNames.size()));
+ ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+ List> futures = new ArrayList<>();
+
+ for (String tableName : tableNames) {
+ futures.add(
+ executor.submit(
+ () -> {
+ long startTime = System.currentTimeMillis();
+ try {
+ Table table = ops.getTable(tableName);
+ boolean backupEnabled =
+ Boolean.parseBoolean(
+ table
+ .properties()
+ .getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false"));
+
+ log.info("Processing orphan files for table: {}", tableName);
+ Operations.OrphanFilesResult result =
+ ops.deleteOrphanFilesWithMetrics(
+ table,
+ olderThanTimestampMillis,
+ backupEnabled,
+ backupDir,
+ concurrentDeletes,
+ streamResults,
+ maxOrphanFileSampleSize);
+
+ List orphanFiles =
+ Lists.newArrayList(result.orphanFileLocations().iterator());
+ long durationMs = System.currentTimeMillis() - startTime;
+
+ log.info(
+ "Successfully processed table {}: {} orphan files deleted in {}ms",
+ tableName,
+ orphanFiles.size(),
+ durationMs);
+
+ return OrphanDeletionResult.success(
+ tableName, orphanFiles.size(), result.getBytesDeleted(), durationMs);
+
+ } catch (Exception e) {
+ long durationMs = System.currentTimeMillis() - startTime;
+ log.error("Failed to process table: {}", tableName, e);
+ return OrphanDeletionResult.failure(tableName, e, durationMs);
+ }
+ }));
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+ List allResults = new ArrayList<>();
+ for (Future future : futures) {
+ try {
+ allResults.add(future.get());
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Unexpected exception in table processing", e.getCause());
+ }
+ }
+
+ reportResults(allResults, tableToOperationId);
+ }
+
+ private void reportResults(
+ List results, Map tableToOperationId) throws Exception {
+ OkHttpClient client =
+ resultsEndpoint != null
+ ? new OkHttpClient.Builder()
+ .connectTimeout(10, TimeUnit.SECONDS)
+ .readTimeout(30, TimeUnit.SECONDS)
+ .build()
+ : null;
+
+ int failureCount = 0;
+ for (OrphanDeletionResult result : results) {
+ if (result.isSuccess()) {
+ otelEmitter.count(
+ METRICS_SCOPE,
+ AppConstants.ORPHAN_FILE_COUNT,
+ result.getOrphanFilesDeleted(),
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), result.getTableName()));
+ } else {
+ failureCount++;
+ }
+
+ if (client != null) {
+ String opId = tableToOperationId.get(result.getTableName());
+ if (opId != null) {
+ updateOperation(client, opId, result);
+ }
+ }
+ }
+
+ if (client == null && failureCount > 0) {
+ throw new RuntimeException(failureCount + " table(s) failed in batch");
+ }
+ if (client != null && failureCount == results.size()) {
+ throw new RuntimeException("All tables failed in batch");
+ }
+ }
+
+ private void updateOperation(OkHttpClient client, String id, OrphanDeletionResult result)
+ throws Exception {
+ OperationResult opResult =
+ result.isSuccess()
+ ? OperationResult.success(result.getOrphanFilesDeleted(), result.getBytesDeleted())
+ : OperationResult.failure(result.getErrorMessage(), result.getErrorType());
+
+ String json = OBJECT_MAPPER.writeValueAsString(opResult);
+ RequestBody body = RequestBody.create(json, MediaType.parse("application/json; charset=utf-8"));
+ Request request =
+ new Request.Builder().url(resultsEndpoint + "/" + id + "/update").post(body).build();
+ try (Response response = client.newCall(request).execute()) {
+ int code = response.code();
+ if (code < 200 || code >= 300) {
+ throw new RuntimeException("POST operation/" + id + "/update returned HTTP " + code);
+ }
+ }
+ }
+
+ /** Result of orphan deletion for a single table. */
+ @Value
+ @Builder
+ public static class OrphanDeletionResult implements Serializable {
+ String tableName;
+ boolean success;
+ int orphanFilesDeleted;
+ long bytesDeleted;
+ long durationMs;
+ String errorMessage;
+ String errorType;
+
+ public static OrphanDeletionResult success(
+ String tableName, int orphanFileCount, long bytesDeleted, long durationMs) {
+ return OrphanDeletionResult.builder()
+ .tableName(tableName)
+ .success(true)
+ .orphanFilesDeleted(orphanFileCount)
+ .bytesDeleted(bytesDeleted)
+ .durationMs(durationMs)
+ .build();
+ }
+
+ public static OrphanDeletionResult failure(String tableName, Throwable error, long durationMs) {
+ return OrphanDeletionResult.builder()
+ .tableName(tableName)
+ .success(false)
+ .orphanFilesDeleted(0)
+ .bytesDeleted(0)
+ .durationMs(durationMs)
+ .errorMessage(error.getMessage())
+ .errorType(error.getClass().getSimpleName())
+ .build();
+ }
+ }
+
+ /** POST payload sent to the optimizer service's update-operation endpoint. */
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ static class OperationResult {
+ public final String status;
+ public final Integer orphanFilesDeleted;
+ public final Long orphanBytesDeleted;
+ public final String errorMessage;
+ public final String errorType;
+
+ private OperationResult(
+ String status,
+ Integer orphanFilesDeleted,
+ Long orphanBytesDeleted,
+ String errorMessage,
+ String errorType) {
+ this.status = status;
+ this.orphanFilesDeleted = orphanFilesDeleted;
+ this.orphanBytesDeleted = orphanBytesDeleted;
+ this.errorMessage = errorMessage;
+ this.errorType = errorType;
+ }
+
+ static OperationResult success(int orphanFilesDeleted, long orphanBytesDeleted) {
+ return new OperationResult("SUCCESS", orphanFilesDeleted, orphanBytesDeleted, null, null);
+ }
+
+ static OperationResult failure(String errorMessage, String errorType) {
+ return new OperationResult("FAILED", null, null, errorMessage, errorType);
+ }
+ }
+
+ public static void main(String[] args) {
+ OtelEmitter otelEmitter =
+ new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));
+ createApp(args, otelEmitter).run();
+ }
+
+ public static BatchedOrphanFilesDeletionSparkApp createApp(
+ String[] args, OtelEmitter otelEmitter) {
+ List extraOptions = new ArrayList<>();
+ extraOptions.add(
+ new Option("tn", "tableNames", true, "Comma-separated fully-qualified table names"));
+ extraOptions.add(new Option("p", "parallelism", true, "Number of parallel table processes"));
+ extraOptions.add(
+ new Option(
+ "r",
+ "ttl",
+ true,
+ "How old files should be to be considered orphaned in seconds, minimum 1d is"
+ + " enforced"));
+ extraOptions.add(new Option("b", "backupDir", true, "Backup directory for deleted data"));
+ extraOptions.add(new Option("c", "concurrentDeletes", true, "Number of concurrent deletes"));
+ extraOptions.add(
+ new Option(
+ null,
+ "streamResults",
+ false,
+ "Stream orphan file deletions instead of collecting all paths into driver memory"));
+ extraOptions.add(
+ new Option(
+ null,
+ "maxOrphanFileSampleSize",
+ true,
+ "Maximum number of orphan file paths to return in the result when streaming"));
+ extraOptions.add(new Option("oi", "operationIds", true, "Comma-separated operation IDs"));
+ extraOptions.add(new Option("re", "resultsEndpoint", true, "Base URL for per-table PATCH"));
+
+ CommandLine cmdLine = createCommandLine(args, extraOptions);
+
+ String tableNamesStr = cmdLine.getOptionValue("tableNames");
+ List tableNames =
+ tableNamesStr != null ? Arrays.asList(tableNamesStr.split(",")) : new ArrayList<>();
+
+ String idsStr = cmdLine.getOptionValue("operationIds");
+ List operationIds =
+ idsStr != null ? Arrays.asList(idsStr.split(",")) : Collections.emptyList();
+ String resultsEndpoint = cmdLine.getOptionValue("resultsEndpoint");
+
+ long rawTtl = NumberUtils.toLong(cmdLine.getOptionValue("ttl"), TimeUnit.DAYS.toSeconds(7));
+ // TTL=0 bypasses the minimum-age guard (for tests that seed orphan files and need
+ // them deleted immediately). Any other explicit value is clamped to the 1-day minimum.
+ long ttlSeconds = rawTtl == 0 ? 0 : Math.max(rawTtl, TimeUnit.DAYS.toSeconds(1));
+
+ return new BatchedOrphanFilesDeletionSparkApp(
+ getJobId(cmdLine),
+ createStateManager(cmdLine, otelEmitter),
+ tableNames,
+ Integer.parseInt(cmdLine.getOptionValue("parallelism", "10")),
+ ttlSeconds,
+ otelEmitter,
+ cmdLine.getOptionValue("backupDir", ".backup"),
+ Integer.parseInt(cmdLine.getOptionValue("concurrentDeletes", "10")),
+ cmdLine.hasOption("streamResults"),
+ Integer.parseInt(
+ cmdLine.getOptionValue(
+ "maxOrphanFileSampleSize", String.valueOf(DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE))),
+ operationIds,
+ resultsEndpoint);
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java
index bd301c729..afc2df075 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java
@@ -26,6 +26,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.AccessLevel;
@@ -112,6 +113,29 @@ public DeleteOrphanFiles.Result deleteOrphanFiles(
int concurrentDeletes,
boolean streamResults,
int maxOrphanFileSampleSize) {
+ return deleteOrphanFilesWithMetrics(
+ table,
+ olderThanTimestampMillis,
+ backupEnabled,
+ backupDir,
+ concurrentDeletes,
+ streamResults,
+ maxOrphanFileSampleSize)
+ .getIcebergResult();
+ }
+
+ /**
+ * Same as {@link #deleteOrphanFiles} but also tracks and returns the total bytes deleted,
+ * suitable for callers that emit byte-level metrics (e.g. batched OFD).
+ */
+ public OrphanFilesResult deleteOrphanFilesWithMetrics(
+ Table table,
+ long olderThanTimestampMillis,
+ boolean backupEnabled,
+ String backupDir,
+ int concurrentDeletes,
+ boolean streamResults,
+ int maxOrphanFileSampleSize) {
DeleteOrphanFiles operation =
SparkActions.get(spark)
@@ -128,6 +152,7 @@ public DeleteOrphanFiles.Result deleteOrphanFiles(
Map dataManifestsCache = new ConcurrentHashMap<>();
Path backupDirRoot = new Path(table.location(), backupDir);
Path dataDirRoot = new Path(table.location(), "data");
+ AtomicLong bytesAccumulator = new AtomicLong(0);
operation =
operation.deleteWith(
file -> {
@@ -147,6 +172,7 @@ && isExistBackupDataManifests(table, file, backupDir, dataManifestsCache)) {
Path backupFilePath = getTrashPath(table, file, backupDir);
log.info("Moving orphan file {} to {}", file, backupFilePath);
try {
+ bytesAccumulator.addAndGet(fs().getFileStatus(new Path(file)).getLen());
rename(new Path(file), backupFilePath);
// update modification time to current time
fs().setTimes(backupFilePath, System.currentTimeMillis(), -1);
@@ -156,13 +182,41 @@ && isExistBackupDataManifests(table, file, backupDir, dataManifestsCache)) {
} else {
log.info("Deleting orphan file {}", file);
try {
+ bytesAccumulator.addAndGet(fs().getFileStatus(new Path(file)).getLen());
fs().delete(new Path(file), false);
} catch (IOException e) {
log.error(String.format("Delete operation failed for file: %s", file), e);
}
}
});
- return operation.execute();
+ DeleteOrphanFiles.Result icebergResult = operation.execute();
+ return new OrphanFilesResult(icebergResult, bytesAccumulator.get());
+ }
+
+ /**
+ * Result of a {@link #deleteOrphanFilesWithMetrics} call, augmenting the Iceberg result with byte
+ * count.
+ */
+ public static class OrphanFilesResult {
+ private final DeleteOrphanFiles.Result result;
+ private final long bytesDeleted;
+
+ public OrphanFilesResult(DeleteOrphanFiles.Result result, long bytesDeleted) {
+ this.result = result;
+ this.bytesDeleted = bytesDeleted;
+ }
+
+ public DeleteOrphanFiles.Result getIcebergResult() {
+ return result;
+ }
+
+ public Iterable orphanFileLocations() {
+ return result.orphanFileLocations();
+ }
+
+ public long getBytesDeleted() {
+ return bytesDeleted;
+ }
}
private ExecutorService removeFilesService(int concurrentDeletes) {
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppTest.java
new file mode 100644
index 000000000..9f057a8d7
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppTest.java
@@ -0,0 +1,345 @@
+package com.linkedin.openhouse.jobs.spark;
+
+import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
+import com.linkedin.openhouse.common.metrics.OtelEmitter;
+import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
+import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
+import com.sun.net.httpserver.HttpServer;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.Table;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link BatchedOrphanFilesDeletionSparkApp}.
+ *
+ * These tests run the app's logic in-process against a real Spark session backed by an in-memory
+ * Iceberg catalog ({@link OpenHouseSparkITest}). The app is never submitted as an external Spark
+ * job; {@code runInner(ops)} is called directly.
+ *
+ *
When testing the per-table complete-operation callback path, the Optimizer service is replaced
+ * by an embedded {@link com.sun.net.httpserver.HttpServer} that captures request bodies. This
+ * verifies the shape and content of the JSON payloads without requiring a live service.
+ */
+@Slf4j
+public class BatchedOrphanFilesDeletionSparkAppTest extends OpenHouseSparkITest {
+ private static final int MAX_SAMPLE_SIZE = 20000;
+
+ private final OtelEmitter otelEmitter =
+ new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));
+
+ @Test
+ public void testSuccessfulOrphanFilesDeletionForMultipleTables() throws Exception {
+ final List tableNames = Arrays.asList("db.test_batch1", "db.test_batch2");
+ final int parallelism = 2;
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ for (String tableName : tableNames) {
+ prepareTable(ops, tableName);
+ populateTable(ops, tableName, 2);
+ }
+
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(
+ tableNames, parallelism, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null);
+
+ app.runInner(ops);
+
+ for (String tableName : tableNames) {
+ Table table = ops.getTable(tableName);
+ Assertions.assertNotNull(table);
+ }
+ }
+ }
+
+ @Test
+ public void testBatchedDeletionWithSingleTable() throws Exception {
+ final List tableNames = Arrays.asList("db.test_single");
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ prepareTable(ops, tableNames.get(0));
+ populateTable(ops, tableNames.get(0), 3);
+
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(tableNames, 1, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null);
+
+ app.runInner(ops);
+
+ Assertions.assertNotNull(ops.getTable(tableNames.get(0)));
+ }
+ }
+
+ @Test
+ public void testBatchedDeletionWithInvalidTable() throws Exception {
+ final List tableNames =
+ Arrays.asList("db.test_valid1", "db.nonexistent_table", "db.test_valid2");
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ prepareTable(ops, "db.test_valid1");
+ populateTable(ops, "db.test_valid1", 2);
+ prepareTable(ops, "db.test_valid2");
+ populateTable(ops, "db.test_valid2", 2);
+
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(tableNames, 3, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null);
+
+ Assertions.assertThrows(RuntimeException.class, () -> app.runInner(ops));
+ }
+ }
+
+ @Test
+ public void testBatchedDeletionWithEmptyTables() throws Exception {
+ final List tableNames = Arrays.asList("db.test_empty1", "db.test_empty2");
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ for (String tableName : tableNames) {
+ prepareTable(ops, tableName);
+ }
+
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(tableNames, 2, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null);
+
+ app.runInner(ops);
+
+ for (String tableName : tableNames) {
+ Assertions.assertNotNull(ops.getTable(tableName));
+ }
+ }
+ }
+
+ @Test
+ public void testOrphanDeletionResultSuccess() {
+ BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult result =
+ BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult.success(
+ "db.test_result", 42, 1024L, 5000L);
+
+ Assertions.assertTrue(result.isSuccess());
+ Assertions.assertEquals("db.test_result", result.getTableName());
+ Assertions.assertEquals(42, result.getOrphanFilesDeleted());
+ Assertions.assertEquals(1024L, result.getBytesDeleted());
+ Assertions.assertEquals(5000L, result.getDurationMs());
+ Assertions.assertNull(result.getErrorMessage());
+ Assertions.assertNull(result.getErrorType());
+ }
+
+ @Test
+ public void testOrphanDeletionResultFailure() {
+ BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult result =
+ BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult.failure(
+ "db.test_failure", new RuntimeException("Test error"), 1000L);
+
+ Assertions.assertFalse(result.isSuccess());
+ Assertions.assertEquals("db.test_failure", result.getTableName());
+ Assertions.assertEquals(0, result.getOrphanFilesDeleted());
+ Assertions.assertEquals(0, result.getBytesDeleted());
+ Assertions.assertEquals(1000L, result.getDurationMs());
+ Assertions.assertEquals("Test error", result.getErrorMessage());
+ Assertions.assertEquals("RuntimeException", result.getErrorType());
+ }
+
+ @Test
+ public void testCreateAppFromCommandLineArgs() {
+ String[] args = {
+ "--jobId", "test-job-123",
+ "--storageURL", "http://localhost:8080",
+ "--tableNames", "db.table1,db.table2,db.table3",
+ "--parallelism", "5",
+ "--ttl", "86400",
+ "--backupDir", "/backup",
+ "--concurrentDeletes", "20",
+ "--operationIds", "uuid-1,uuid-2,uuid-3",
+ "--resultsEndpoint", "http://localhost:8083/v1/optimizer/operations"
+ };
+
+ Assertions.assertNotNull(BatchedOrphanFilesDeletionSparkApp.createApp(args, otelEmitter));
+ }
+
+ @Test
+ public void testCreateAppWithDefaultValues() {
+ String[] args = {
+ "--jobId", "test-job-456",
+ "--storageURL", "http://localhost:8080",
+ "--tableNames", "db.table1"
+ };
+
+ Assertions.assertNotNull(BatchedOrphanFilesDeletionSparkApp.createApp(args, otelEmitter));
+ }
+
+ @Test
+ public void testCreateAppWithOperationIdsAndEndpoint() {
+ String[] args = {
+ "--jobId", "test-job-789",
+ "--storageURL", "http://localhost:8080",
+ "--tableNames", "db.table1,db.table2",
+ "--operationIds", "uuid-1,uuid-2",
+ "--resultsEndpoint", "http://localhost:8083/v1/optimizer/operations"
+ };
+
+ Assertions.assertNotNull(BatchedOrphanFilesDeletionSparkApp.createApp(args, otelEmitter));
+ }
+
+ @Test
+ public void testBatchedDeletionWithActualOrphanFiles() throws Exception {
+ final String tableName = "db.test_orphans";
+ final List tableNames = Arrays.asList(tableName);
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ prepareTable(ops, tableName);
+ populateTable(ops, tableName, 3);
+
+ Assertions.assertNotNull(ops.getTable(tableName).currentSnapshot());
+
+ // TTL=0 bypasses the minimum-age guard so seeded orphan files are eligible.
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(tableNames, 1, 0L, Collections.emptyList(), null);
+
+ app.runInner(ops);
+
+ Assertions.assertNotNull(ops.getTable(tableName).currentSnapshot());
+ }
+ }
+
+ @Test
+ public void testBatchedDeletionPartialSuccessWithEndpoint() throws Exception {
+ final List tableNames = Arrays.asList("db.test_ep_valid", "db.nonexistent_ep");
+ final List operationIds = Arrays.asList("op-100", "op-200");
+
+ List receivedBodies = Collections.synchronizedList(new ArrayList<>());
+ HttpServer httpServer = HttpServer.create(new InetSocketAddress(0), 0);
+ httpServer.createContext(
+ "/ops",
+ exchange -> {
+ byte[] body = exchange.getRequestBody().readAllBytes();
+ receivedBodies.add(new String(body, StandardCharsets.UTF_8));
+ exchange.sendResponseHeaders(200, 0);
+ exchange.close();
+ });
+ httpServer.start();
+ String endpoint = "http://localhost:" + httpServer.getAddress().getPort() + "/ops";
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ prepareTable(ops, "db.test_ep_valid");
+ populateTable(ops, "db.test_ep_valid", 2);
+
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(tableNames, 2, TimeUnit.DAYS.toSeconds(1), operationIds, endpoint);
+
+ app.runInner(ops);
+
+ Assertions.assertEquals(2, receivedBodies.size());
+ long successCount = receivedBodies.stream().filter(b -> b.contains("\"SUCCESS\"")).count();
+ long failureCount = receivedBodies.stream().filter(b -> b.contains("\"FAILED\"")).count();
+ Assertions.assertEquals(1, successCount);
+ Assertions.assertEquals(1, failureCount);
+
+ // SUCCESS payload carries orphan-file/byte metrics; no error fields.
+ String successBody =
+ receivedBodies.stream().filter(b -> b.contains("\"SUCCESS\"")).findFirst().get();
+ Assertions.assertTrue(successBody.contains("\"orphanFilesDeleted\""));
+ Assertions.assertTrue(successBody.contains("\"orphanBytesDeleted\""));
+ Assertions.assertFalse(successBody.contains("\"errorMessage\""));
+ Assertions.assertFalse(successBody.contains("\"errorType\""));
+
+ // FAILED payload carries error fields; no orphan metrics.
+ String failureBody =
+ receivedBodies.stream().filter(b -> b.contains("\"FAILED\"")).findFirst().get();
+ Assertions.assertTrue(failureBody.contains("\"errorMessage\""));
+ Assertions.assertTrue(failureBody.contains("\"errorType\""));
+ Assertions.assertFalse(failureBody.contains("\"orphanFilesDeleted\""));
+ Assertions.assertFalse(failureBody.contains("\"orphanBytesDeleted\""));
+ } finally {
+ httpServer.stop(0);
+ }
+ }
+
+ @Test
+ public void testBatchedDeletionAllFailWithEndpoint() throws Exception {
+ final List tableNames = Arrays.asList("db.nonexistent_ep1", "db.nonexistent_ep2");
+ final List operationIds = Arrays.asList("op-300", "op-400");
+
+ List receivedBodies = Collections.synchronizedList(new ArrayList<>());
+ HttpServer httpServer = HttpServer.create(new InetSocketAddress(0), 0);
+ httpServer.createContext(
+ "/ops",
+ exchange -> {
+ byte[] body = exchange.getRequestBody().readAllBytes();
+ receivedBodies.add(new String(body, StandardCharsets.UTF_8));
+ exchange.sendResponseHeaders(200, 0);
+ exchange.close();
+ });
+ httpServer.start();
+ String endpoint = "http://localhost:" + httpServer.getAddress().getPort() + "/ops";
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(tableNames, 2, TimeUnit.DAYS.toSeconds(1), operationIds, endpoint);
+
+ Assertions.assertThrows(RuntimeException.class, () -> app.runInner(ops));
+
+ Assertions.assertEquals(2, receivedBodies.size());
+ long failures = receivedBodies.stream().filter(b -> b.contains("\"FAILED\"")).count();
+ Assertions.assertEquals(2, failures);
+ for (String body : receivedBodies) {
+ Assertions.assertTrue(body.contains("\"errorMessage\""));
+ Assertions.assertTrue(body.contains("\"errorType\""));
+ }
+ } finally {
+ httpServer.stop(0);
+ }
+ }
+
+ @Test
+ public void testBatchedDeletionMismatchedOperationIdsThrows() throws Exception {
+ final List tableNames = Arrays.asList("db.table1", "db.table2");
+ final List operationIds = Arrays.asList("op-1", "op-2", "op-3");
+
+ BatchedOrphanFilesDeletionSparkApp app =
+ newApp(
+ tableNames, 2, TimeUnit.DAYS.toSeconds(1), operationIds, "http://localhost:9999/ops");
+
+ try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> app.runInner(ops));
+ }
+ }
+
+ private BatchedOrphanFilesDeletionSparkApp newApp(
+ List tableNames,
+ int parallelism,
+ long ttlSeconds,
+ List operationIds,
+ String resultsEndpoint) {
+ return new BatchedOrphanFilesDeletionSparkApp(
+ "test-job",
+ null,
+ tableNames,
+ parallelism,
+ ttlSeconds,
+ otelEmitter,
+ ".backup",
+ 10,
+ false,
+ MAX_SAMPLE_SIZE,
+ operationIds,
+ resultsEndpoint);
+ }
+
+ private static void prepareTable(Operations ops, String tableName) {
+ ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
+ ops.spark().sql(String.format("CREATE TABLE %s (data string, ts timestamp)", tableName)).show();
+ }
+
+ private static void populateTable(Operations ops, String tableName, int numRows) {
+ for (int row = 0; row < numRows; ++row) {
+ ops.spark()
+ .sql(String.format("INSERT INTO %s VALUES ('v%d', current_timestamp())", tableName, row))
+ .show();
+ }
+ }
+}
diff --git a/build.gradle b/build.gradle
index ec75fd89d..1f951a91c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -178,6 +178,8 @@ tasks.register('CopyGitHooksTask', Copy) {
// housetables-service.Dockerfile -> :services:housetables:bootJar
// jobs-service.Dockerfile -> :services:jobs:bootJar
// optimizer-service.Dockerfile -> :services:optimizer:bootJar
+// optimizer-analyzer.Dockerfile -> :apps:optimizer:analyzerapp:bootJar
+// optimizer-scheduler.Dockerfile -> :apps:optimizer:schedulerapp:bootJar
// jobs-scheduler.Dockerfile -> :apps:openhouse-spark-apps_2.12:shadowJar (uber JAR)
// spark-base-hadoop2.8.dockerfile ->
// :integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar (uber JAR)
@@ -198,6 +200,8 @@ tasks.register('dockerPrereqs') {
dependsOn ':services:housetables:bootJar'
dependsOn ':services:jobs:bootJar'
dependsOn ':services:optimizer:bootJar'
+ dependsOn ':apps:optimizer:analyzerapp:bootJar'
+ dependsOn ':apps:optimizer:schedulerapp:bootJar'
// Spark runtime uber JARs (shadowJar)
dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar'
@@ -222,6 +226,8 @@ tasks.register('dockerPrereqs') {
println ' build/housetables/libs/housetables.jar'
println ' build/jobs/libs/jobs.jar'
println ' build/optimizer/libs/optimizer.jar'
+ println ' build/analyzerapp/libs/analyzerapp.jar'
+ println ' build/schedulerapp/libs/schedulerapp.jar'
println ' build/openhouse-spark-runtime_2.12/libs/openhouse-spark-runtime_2.12-uber.jar'
println ' build/openhouse-spark-3.5-runtime_2.12/libs/openhouse-spark-3.5-runtime_2.12-uber.jar'
println ' build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-uber.jar'
diff --git a/demo-check.sh b/demo-check.sh
new file mode 100755
index 000000000..d7c60c55d
--- /dev/null
+++ b/demo-check.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+# demo-check.sh — Run after demo-setup.sh. Verifies SUCCESS history rows and
+# shows HDFS file counts per table.
+set -e
+
+OPT_API="http://localhost:8005"
+
+if [ ! -f /tmp/demo_ofd_locs.txt ] || [ ! -f /tmp/demo_ofd_uuids.txt ]; then
+ echo "ERROR: /tmp/demo_ofd_locs.txt or /tmp/demo_ofd_uuids.txt not found — run demo-setup.sh first"
+ exit 1
+fi
+
+echo "=== HDFS data files after OFD ==="
+while IFS='=' read -r TABLE TABLE_LOC; do
+ FILE_COUNT=$(docker exec local.namenode \
+ hdfs dfs -ls -R "$TABLE_LOC/data/" 2>/dev/null \
+ | grep -c "\.orc" || echo "0")
+ echo " $TABLE: $FILE_COUNT files remaining"
+done < /tmp/demo_ofd_locs.txt
+
+echo ""
+echo "=== Operation history per table ==="
+while IFS='=' read -r TABLE TABLE_UUID; do
+ echo " --- $TABLE ($TABLE_UUID) ---"
+ curl -sf "$OPT_API/v1/optimizer/operations-history/$TABLE_UUID?limit=10" \
+ | jq '.[] | {status, orphanFilesDeleted, orphanBytesDeleted, errorMessage, completedAt}'
+done < /tmp/demo_ofd_uuids.txt
diff --git a/demo-setup.sh b/demo-setup.sh
new file mode 100755
index 000000000..6fc5491f7
--- /dev/null
+++ b/demo-setup.sh
@@ -0,0 +1,174 @@
+#!/usr/bin/env bash
+# demo-setup.sh — Populate tables, expire snapshots, wait for the continuous
+# analyzer + scheduler to schedule OFD jobs.
+#
+# Prerequisites: ./gradlew dockerUp (stack must be healthy)
+#
+# After this script completes, batched-OFD Spark jobs have been submitted via
+# the continuous scheduler. Use demo-check.sh to watch for completion.
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+TOKEN=$(cat "$SCRIPT_DIR/services/common/src/main/resources/dummy.token")
+TABLES_API="http://localhost:8000"
+JOBS_API="http://localhost:8002/jobs"
+OPT_API="http://localhost:8005"
+LIVY_API="http://localhost:9003"
+
+TABLES="demo_ofd_a:5 demo_ofd_b:7 demo_ofd_c:4"
+TABLE_COUNT=3
+ANALYZER_WAIT_SECS=90
+SCHEDULER_WAIT_SECS=90
+
+wait_for_job() {
+ local JOB_ID="$1" LABEL="$2" MAX_SECS="${3:-180}"
+ local i=0
+ while [ $i -lt "$MAX_SECS" ]; do
+ STATE=$(curl -sf "$JOBS_API/$JOB_ID" | jq -r '.state // empty' 2>/dev/null || echo "")
+ [ "$STATE" = "SUCCEEDED" ] && return 0
+ [ "$STATE" = "FAILED" ] && { echo "FAIL: $LABEL job $JOB_ID FAILED"; exit 1; }
+ sleep 5
+ i=$((i + 5))
+ done
+ echo "FAIL: $LABEL job $JOB_ID timed out after ${MAX_SECS}s (last state: $STATE)"
+ exit 1
+}
+
+kill_idle_session() {
+ IDLE=$(curl -sf "$LIVY_API/sessions" \
+ | jq -r '[.sessions[] | select(.state=="idle")] | first | .id // empty')
+ if [ -n "$IDLE" ]; then
+ curl -sf -X DELETE "$LIVY_API/sessions/$IDLE" > /dev/null
+ echo " Freed idle Livy session $IDLE"
+ fi
+}
+
+wait_for_count() {
+ local URL="$1" EXPECTED="$2" LABEL="$3" MAX_SECS="$4"
+ local i=0
+ while [ $i -lt "$MAX_SECS" ]; do
+ COUNT=$(curl -sf "$URL" | jq 'length' 2>/dev/null || echo "0")
+ [ "$COUNT" -ge "$EXPECTED" ] && return 0
+ printf " %s: %d/%d (waited %ds)\r" "$LABEL" "$COUNT" "$EXPECTED" "$i"
+ sleep 5
+ i=$((i + 5))
+ done
+ echo ""
+ echo "FAIL: $LABEL expected $EXPECTED, got $COUNT after ${MAX_SECS}s"
+ exit 1
+}
+
+echo "=== [1/4] Create tables and populate with data ==="
+rm -f /tmp/demo_ofd_locs.txt /tmp/demo_ofd_uuids.txt
+
+for entry in $TABLES; do
+ TABLE="${entry%%:*}"
+ WRITES="${entry##*:}"
+ ORPHANS=$((WRITES - 2))
+
+ "$SCRIPT_DIR/local-spark-sql.sh" "DROP TABLE IF EXISTS openhouse.db1.$TABLE" > /dev/null
+ "$SCRIPT_DIR/local-spark-sql.sh" "CREATE TABLE openhouse.db1.$TABLE (
+ id STRING, val STRING
+ ) TBLPROPERTIES (
+ 'maintenance.optimizer.ofd.enabled'='true',
+ 'maintenance.optimizer.stats.enabled'='true'
+ )" > /dev/null
+
+ for i in $(seq 1 "$WRITES"); do
+ "$SCRIPT_DIR/local-spark-sql.sh" \
+ "INSERT OVERWRITE openhouse.db1.$TABLE VALUES ('$i', 'row$i')" > /dev/null
+ printf " $TABLE: insert %d/%d\r" "$i" "$WRITES"
+ done
+ echo ""
+
+ TBL_JSON=$(curl -sf -H "Authorization: Bearer $TOKEN" \
+ "$TABLES_API/v1/databases/db1/tables/$TABLE")
+ TABLE_LOC=$(dirname "$(echo "$TBL_JSON" | jq -r '.tableLocation')")
+ TABLE_UUID=$(echo "$TBL_JSON" | jq -r '.tableUUID')
+ echo " $TABLE -> $TABLE_LOC ($WRITES snapshots, $ORPHANS will become orphans)"
+ echo " $TABLE uuid=$TABLE_UUID"
+ echo "$TABLE=$TABLE_LOC" >> /tmp/demo_ofd_locs.txt
+ echo "$TABLE=$TABLE_UUID" >> /tmp/demo_ofd_uuids.txt
+done
+
+# Stats push is async fire-and-forget; the dispatcher subscribes on the Netty event
+# loop after the commit thread returns. Poll briefly for the rows to settle so we
+# don't false-fail before the loop wakes up.
+echo ""
+echo "=== [Wait] Tables on-commit stats push to land in optimizer DB ==="
+STATS_WAIT_SECS=30
+i=0
+STATS_COUNT=0
+while [ $i -lt "$STATS_WAIT_SECS" ]; do
+ STATS_COUNT=$(curl -sf "$OPT_API/v1/optimizer/stats?limit=100" | jq 'length')
+ [ "$STATS_COUNT" -ge "$TABLE_COUNT" ] && break
+ printf " stats rows: %d/%d (waited %ds)\r" "$STATS_COUNT" "$TABLE_COUNT" "$i"
+ sleep 2
+ i=$((i + 2))
+done
+echo ""
+[ "$STATS_COUNT" -ge "$TABLE_COUNT" ] \
+ || { echo "FAIL: expected $TABLE_COUNT stats rows, got $STATS_COUNT after ${STATS_WAIT_SECS}s"; exit 1; }
+echo "PASS: $STATS_COUNT stats rows posted by Tables Service on-commit hook"
+
+# Verify the new payload shape carries snapshot stats, not just identity fields.
+# Pick one table and confirm the optimizer recorded a non-zero numCurrentFiles, a
+# non-zero tableSizeBytes, and the snapshot ID we sent.
+SAMPLE_UUID=$(head -n1 /tmp/demo_ofd_uuids.txt | cut -d= -f2)
+SAMPLE_ROW=$(curl -sf "$OPT_API/v1/optimizer/stats/$SAMPLE_UUID")
+SAMPLE_NUM_FILES=$(echo "$SAMPLE_ROW" | jq -r '.stats.snapshot.numCurrentFiles // 0')
+SAMPLE_SIZE=$(echo "$SAMPLE_ROW" | jq -r '.stats.snapshot.tableSizeBytes // 0')
+SAMPLE_SNAPSHOT_ID=$(echo "$SAMPLE_ROW" | jq -r '.stats.snapshot.snapshotId // empty')
+[ "$SAMPLE_NUM_FILES" -gt 0 ] \
+ || { echo "FAIL: sample stats row has numCurrentFiles=$SAMPLE_NUM_FILES, expected > 0"; exit 1; }
+[ "$SAMPLE_SIZE" -gt 0 ] \
+ || { echo "FAIL: sample stats row has tableSizeBytes=$SAMPLE_SIZE, expected > 0"; exit 1; }
+[ -n "$SAMPLE_SNAPSHOT_ID" ] \
+ || { echo "FAIL: sample stats row missing snapshotId"; exit 1; }
+echo "PASS: sample stats row carries snapshot payload (files=$SAMPLE_NUM_FILES, bytes=$SAMPLE_SIZE, snapshotId=$SAMPLE_SNAPSHOT_ID)"
+
+echo ""
+echo "=== [2/4] Expire old snapshots (creates orphan data files) ==="
+kill_idle_session
+
+EXPIRE_JOBS=""
+for entry in $TABLES; do
+ TABLE="${entry%%:*}"
+ BODY=$(jq -n --arg n "demo-expire-$TABLE" --arg t "db1.$TABLE" \
+ '{jobName:$n, clusterId:"LocalHadoopCluster",
+ jobConf:{jobType:"SNAPSHOTS_EXPIRATION",
+ args:["--tableName",$t,"--maxAge","1","--granularity","days","--versions","1"]}}')
+ JOB_ID=$(curl -sf -X POST "$JOBS_API" -H "Content-Type: application/json" -d "$BODY" \
+ | jq -r '.jobId')
+ [ -n "$JOB_ID" ] || { echo "FAIL: could not submit expiration job for $TABLE"; exit 1; }
+ echo " $TABLE: submitted $JOB_ID"
+ EXPIRE_JOBS="$EXPIRE_JOBS $JOB_ID"
+done
+
+for JOB_ID in $EXPIRE_JOBS; do
+ wait_for_job "$JOB_ID" "snapshot-expiration" 180
+ echo " $JOB_ID: SUCCEEDED"
+done
+
+while IFS='=' read -r TABLE TABLE_LOC; do
+ COUNT=$(docker exec local.namenode \
+ hdfs dfs -ls -R "$TABLE_LOC/data/" 2>/dev/null | grep -c "\.orc" || echo "0")
+ [ "$COUNT" -ge 2 ] \
+ || { echo "FAIL: $TABLE has $COUNT files after expiration, expected >= 2"; exit 1; }
+ echo " $TABLE: $COUNT files on HDFS ($((COUNT-1)) orphans + 1 live)"
+done < /tmp/demo_ofd_locs.txt
+
+echo ""
+echo "=== [3/4] Wait for analyzer → scheduler → SCHEDULED ==="
+echo " (analyzer and scheduler both run every ~30s; PENDING is transient)"
+wait_for_count "$OPT_API/v1/optimizer/operations?status=SCHEDULED&limit=100" \
+ "$TABLE_COUNT" "SCHEDULED ops" "$(($ANALYZER_WAIT_SECS + $SCHEDULER_WAIT_SECS))"
+echo ""
+echo "PASS: $TABLE_COUNT operations SCHEDULED"
+
+echo ""
+echo "=== [4/4] (skip) — SCHEDULED is the SUCCESS-readiness signal ==="
+
+echo ""
+echo "Setup complete. Batched-OFD Spark jobs have been launched."
+echo "Run ./demo-check.sh to verify SUCCESS in history + orphan files deleted."
diff --git a/demo.sh b/demo.sh
new file mode 100755
index 000000000..89ea74af3
--- /dev/null
+++ b/demo.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+# demo.sh — Full end-to-end optimizer demo from a clean docker stack.
+#
+# Prerequisites: ./gradlew dockerUp (stack must be healthy)
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+OPT_API="http://localhost:8005"
+
+TABLE_COUNT=3
+SUCCESS_WAIT_SECS=300
+
+bash "$SCRIPT_DIR/demo-setup.sh"
+
+echo ""
+echo "=== [Wait] OFD Spark jobs to complete (up to 5 min) ==="
+for i in $(seq 1 30); do
+ TOTAL_HISTORY=0
+ SUCCESS_HISTORY=0
+ if [ -f /tmp/demo_ofd_uuids.txt ]; then
+ while IFS='=' read -r _ UUID; do
+ LATEST=$(curl -sf "$OPT_API/v1/optimizer/operations-history/$UUID?limit=1" \
+ | jq -r '.[0].status // empty' 2>/dev/null || echo "")
+ [ -n "$LATEST" ] && TOTAL_HISTORY=$((TOTAL_HISTORY + 1))
+ [ "$LATEST" = "SUCCESS" ] && SUCCESS_HISTORY=$((SUCCESS_HISTORY + 1))
+ done < /tmp/demo_ofd_uuids.txt
+ fi
+ [ "$SUCCESS_HISTORY" -ge "$TABLE_COUNT" ] && break
+ echo " $i/30: $SUCCESS_HISTORY SUCCESS / $TOTAL_HISTORY total history rows..."
+ sleep 10
+done
+
+[ "$SUCCESS_HISTORY" -ge "$TABLE_COUNT" ] \
+ || { echo "FAIL: only $SUCCESS_HISTORY/$TABLE_COUNT operations reached SUCCESS"; bash "$SCRIPT_DIR/demo-check.sh"; exit 1; }
+
+echo ""
+echo "PASS: all $TABLE_COUNT operations completed with SUCCESS"
+echo ""
+bash "$SCRIPT_DIR/demo-check.sh"
diff --git a/infra/recipes/docker-compose/common/oh-services.yml b/infra/recipes/docker-compose/common/oh-services.yml
index 73ef5a656..71c4f889e 100644
--- a/infra/recipes/docker-compose/common/oh-services.yml
+++ b/infra/recipes/docker-compose/common/oh-services.yml
@@ -22,6 +22,20 @@ services:
build:
context: ../../../..
dockerfile: jobs-scheduler.Dockerfile
+ openhouse-optimizer:
+ build:
+ context: ../../../..
+ dockerfile: optimizer-service.Dockerfile
+ ports:
+ - 8005:8080
+ openhouse-optimizer-analyzer:
+ build:
+ context: ../../../..
+ dockerfile: optimizer-analyzer.Dockerfile
+ openhouse-optimizer-scheduler:
+ build:
+ context: ../../../..
+ dockerfile: optimizer-scheduler.Dockerfile
prometheus:
image: prom/prometheus:v2.21.0
ports:
diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml
index bb72176c5..81d142815 100644
--- a/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml
+++ b/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml
@@ -23,6 +23,8 @@ cluster:
database:
type: "MYSQL"
url: "jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false"
+ optimizer:
+ base-uri: "http://openhouse-optimizer:8080"
security:
token:
interceptor:
diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml b/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml
index 0b8df01ac..bcf4787bf 100644
--- a/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml
+++ b/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml
@@ -22,6 +22,10 @@ services:
- OTEL_METRICS_EXPORTER=none
- OTEL_LOGS_EXPORTER=none
- OTEL_TRACES_SAMPLER=always_on
+ # Post-commit operation framework: enable the dispatcher and the optimizer-stats op.
+ - TABLES_POSTCOMMIT_ENABLED=true
+ - OPTIMIZER_STATS_ENABLED=true
+ - OPTIMIZER_STATS_BASE_URI=http://openhouse-optimizer:8080
openhouse-jobs:
container_name: local.openhouse-jobs
@@ -81,6 +85,53 @@ services:
- OTEL_LOGS_EXPORTER=none
- OTEL_TRACES_SAMPLER=always_on
+ openhouse-optimizer:
+ container_name: local.openhouse-optimizer
+ extends:
+ file: ../common/oh-services.yml
+ service: openhouse-optimizer
+ volumes:
+ - ./:/var/config/
+ depends_on:
+ - mysql
+ environment:
+ - OPTIMIZER_DB_URL=jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false
+ - OPTIMIZER_DB_USER=oh_user
+ - OPTIMIZER_DB_PASSWORD=oh_password
+
+ openhouse-optimizer-analyzer:
+ container_name: local.openhouse-optimizer-analyzer
+ extends:
+ file: ../common/oh-services.yml
+ service: openhouse-optimizer-analyzer
+ volumes:
+ - ./:/var/config/
+ depends_on:
+ - openhouse-optimizer
+ environment:
+ - OPTIMIZER_DB_URL=jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false
+ - OPTIMIZER_DB_USER=oh_user
+ - OPTIMIZER_DB_PASSWORD=oh_password
+ - ANALYZER_INTERVAL_SECONDS=30
+
+ openhouse-optimizer-scheduler:
+ container_name: local.openhouse-optimizer-scheduler
+ extends:
+ file: ../common/oh-services.yml
+ service: openhouse-optimizer-scheduler
+ volumes:
+ - ./:/var/config/
+ depends_on:
+ - openhouse-optimizer
+ - openhouse-jobs
+ environment:
+ - OPTIMIZER_DB_URL=jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false
+ - OPTIMIZER_DB_USER=oh_user
+ - OPTIMIZER_DB_PASSWORD=oh_password
+ - JOBS_BASE_URI=http://openhouse-jobs:8080
+ - SCHEDULER_RESULTS_ENDPOINT=http://openhouse-optimizer:8080/v1/optimizer/operations
+ - SCHEDULER_INTERVAL_SECONDS=30
+
jaeger:
container_name: local.jaeger
image: jaegertracing/all-in-one:1.54
diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
index 3e795d4db..7a0c51e01 100644
--- a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
+++ b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
@@ -51,8 +51,8 @@ jobs:
<< : *apps-defaults
<< : *livy-engine
- type: ORPHAN_FILES_DELETION
- class-name: com.linkedin.openhouse.jobs.spark.OrphanFilesDeletionSparkApp
- args: ["--backupDir", ".backup"]
+ class-name: com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp
+ args: ["--backupDir", ".backup", "--ttl", "0"]
<<: *apps-defaults
spark-properties:
<<: *spark-defaults
diff --git a/local-spark-sql.sh b/local-spark-sql.sh
new file mode 100755
index 000000000..45582f698
--- /dev/null
+++ b/local-spark-sql.sh
@@ -0,0 +1,85 @@
+#!/usr/bin/env bash
+# local-spark-sql.sh — Execute a SQL statement against the local OpenHouse docker cluster.
+#
+# Reuses an existing idle Livy session if one exists. First run pays the cold start (~20s),
+# subsequent runs execute immediately.
+#
+# Usage: ./local-spark-sql.sh "INSERT INTO openhouse.db1.smoke_tbl VALUES ('3', 'c')"
+# ./local-spark-sql.sh --kill # tear down the reusable session
+set -e
+
+LIVY="http://localhost:9003"
+TOKEN=$(cat /Users/mkuchenb/code/openhouse/services/common/src/main/resources/dummy.token)
+
+if [ "$1" = "--kill" ]; then
+ SESSIONS=$(curl -sf "$LIVY/sessions" | jq -r '.sessions[].id')
+ for sid in $SESSIONS; do
+ curl -sf -X DELETE "$LIVY/sessions/$sid" > /dev/null 2>&1
+ echo "Killed session $sid"
+ done
+ exit 0
+fi
+
+if [ -z "$1" ]; then
+ echo "Usage: local-spark-sql.sh \"\""
+ echo " local-spark-sql.sh --kill"
+ exit 1
+fi
+
+SQL="$1"
+
+SESSION_ID=$(curl -sf "$LIVY/sessions" | jq -r '[.sessions[] | select(.state == "idle")] | first | .id // empty')
+
+if [ -z "$SESSION_ID" ]; then
+ SPARK_CONF=$(jq -n --arg token "$TOKEN" '{
+ kind: "sql",
+ conf: {
+ "spark.jars": "local:/opt/spark/openhouse-spark-runtime_2.12-latest-all.jar",
+ "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0",
+ "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions",
+ "spark.sql.catalog.openhouse": "org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.openhouse.catalog-impl": "com.linkedin.openhouse.spark.OpenHouseCatalog",
+ "spark.sql.catalog.openhouse.uri": "http://openhouse-tables:8080",
+ "spark.sql.catalog.openhouse.auth-token": $token,
+ "spark.sql.catalog.openhouse.cluster": "LocalHadoopCluster"
+ }
+ }')
+
+ echo "No idle session found, creating one..."
+ SESSION_ID=$(curl -sf -X POST "$LIVY/sessions" \
+ -H "Content-Type: application/json" \
+ -d "$SPARK_CONF" | jq -r '.id')
+
+ for i in $(seq 1 60); do
+ STATE=$(curl -sf "$LIVY/sessions/$SESSION_ID/state" | jq -r '.state')
+ [ "$STATE" = "idle" ] && break
+ if [ "$STATE" = "error" ] || [ "$STATE" = "dead" ]; then
+ echo "FAIL: session state=$STATE"
+ exit 1
+ fi
+ sleep 2
+ done
+ [ "$STATE" = "idle" ] || { echo "FAIL: session never reached idle (state=$STATE)"; exit 1; }
+fi
+
+STMT_ID=$(curl -sf -X POST "$LIVY/sessions/$SESSION_ID/statements" \
+ -H "Content-Type: application/json" \
+ -d "$(jq -n --arg code "$SQL" '{code: $code}')" | jq -r '.id')
+
+for i in $(seq 1 60); do
+ STMT_STATE=$(curl -sf "$LIVY/sessions/$SESSION_ID/statements/$STMT_ID" | jq -r '.state')
+ [ "$STMT_STATE" = "available" ] && break
+ [ "$STMT_STATE" = "error" ] && break
+ sleep 1
+done
+
+RESULT=$(curl -sf "$LIVY/sessions/$SESSION_ID/statements/$STMT_ID")
+STATUS=$(echo "$RESULT" | jq -r '.output.status')
+
+if [ "$STATUS" = "error" ]; then
+ echo "FAIL: SQL execution error:"
+ echo "$RESULT" | jq -r '.output.evalue, .output.traceback[]?'
+ exit 1
+fi
+
+echo "$RESULT" | jq -r '.output.data["text/plain"] // ""'
diff --git a/optimizer-analyzer.Dockerfile b/optimizer-analyzer.Dockerfile
new file mode 100644
index 000000000..925018dc7
--- /dev/null
+++ b/optimizer-analyzer.Dockerfile
@@ -0,0 +1,44 @@
+FROM openjdk:23-ea-11-slim
+
+ARG USER=openhouse
+ARG USER_ID=1000
+ARG GROUP_ID=1000
+ENV APP_NAME=optimizer-analyzer
+ENV USER_HOME=/home/$USER
+ENV ANALYZER_INTERVAL_SECONDS=30
+
+# Create an openhouse user as there's no reason to run as root user
+RUN groupadd --force -g $GROUP_ID $USER && useradd -l -d $USER_HOME -m $USER -u $USER_ID -g $GROUP_ID
+
+WORKDIR $USER_HOME
+
+# IMAGE does not set the necessary paths by default.
+ENV PATH=$PATH:/export/apps/jdk/JDK-1_8_0_172/bin/:$USER_HOME
+
+ARG VERSION="1.0.0-SNAPSHOT"
+ARG BUILD_DIR="build/analyzerapp/libs"
+ARG JAR_FILES=$BUILD_DIR/*.jar
+
+COPY $JAR_FILES ./
+
+# Delete unwanted JAR files
+RUN find . -name "*-sources.jar" -delete
+RUN find . -name "*-javadoc.jar" -delete
+RUN find . -name "*-lib.jar" -delete
+
+# Rename the JAR file.
+RUN find ./ -name "*.jar" -exec mv {} $APP_NAME.jar \;
+RUN ls $APP_NAME.jar
+
+# Ensure that everything in $USER_HOME is owned by openhouse user
+RUN chown -R openhouse:openhouse $USER_HOME
+
+# Setup default path for Java
+RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default
+
+USER $USER
+
+# Loop the analyzer on a fixed interval. Each pass is a fresh JVM, so opt-3's
+# analyzer (which runs once per CommandLineRunner invocation and exits) becomes
+# a continuous service in this container.
+ENTRYPOINT ["sh", "-c", "while true; do echo \"Running $APP_NAME at $(date)\"; java -Xmx256M -Xms64M -XX:NativeMemoryTracking=summary -jar $APP_NAME.jar; echo \"Exited; sleeping ${ANALYZER_INTERVAL_SECONDS}s\"; sleep ${ANALYZER_INTERVAL_SECONDS}; done"]
diff --git a/optimizer-scheduler.Dockerfile b/optimizer-scheduler.Dockerfile
new file mode 100644
index 000000000..9be0f3d4c
--- /dev/null
+++ b/optimizer-scheduler.Dockerfile
@@ -0,0 +1,43 @@
+FROM openjdk:23-ea-11-slim
+
+ARG USER=openhouse
+ARG USER_ID=1000
+ARG GROUP_ID=1000
+ENV APP_NAME=optimizer-scheduler
+ENV USER_HOME=/home/$USER
+ENV SCHEDULER_INTERVAL_SECONDS=30
+
+# Create an openhouse user as there's no reason to run as root user
+RUN groupadd --force -g $GROUP_ID $USER && useradd -l -d $USER_HOME -m $USER -u $USER_ID -g $GROUP_ID
+
+WORKDIR $USER_HOME
+
+# IMAGE does not set the necessary paths by default.
+ENV PATH=$PATH:/export/apps/jdk/JDK-1_8_0_172/bin/:$USER_HOME
+
+ARG VERSION="1.0.0-SNAPSHOT"
+ARG BUILD_DIR="build/schedulerapp/libs"
+ARG JAR_FILES=$BUILD_DIR/*.jar
+
+COPY $JAR_FILES ./
+
+# Delete unwanted JAR files
+RUN find . -name "*-sources.jar" -delete
+RUN find . -name "*-javadoc.jar" -delete
+RUN find . -name "*-lib.jar" -delete
+
+# Rename the JAR file.
+RUN find ./ -name "*.jar" -exec mv {} $APP_NAME.jar \;
+RUN ls $APP_NAME.jar
+
+# Ensure that everything in $USER_HOME is owned by openhouse user
+RUN chown -R openhouse:openhouse $USER_HOME
+
+# Setup default path for Java
+RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default
+
+USER $USER
+
+# Loop the scheduler on a fixed interval; each pass is a fresh JVM (opt-4's
+# scheduler runs once per CommandLineRunner invocation and exits).
+ENTRYPOINT ["sh", "-c", "while true; do echo \"Running $APP_NAME at $(date)\"; java -Xmx256M -Xms64M -XX:NativeMemoryTracking=summary -jar $APP_NAME.jar; echo \"Exited; sleeping ${SCHEDULER_INTERVAL_SECONDS}s\"; sleep ${SCHEDULER_INTERVAL_SECONDS}; done"]
diff --git a/optimizer-service.Dockerfile b/optimizer-service.Dockerfile
new file mode 100644
index 000000000..8f2af923c
--- /dev/null
+++ b/optimizer-service.Dockerfile
@@ -0,0 +1,47 @@
+FROM openjdk:23-ea-11-slim
+
+ARG USER=openhouse
+ARG USER_ID=1000
+ARG GROUP_ID=1000
+ENV APP_NAME=optimizer
+ENV USER_HOME=/home/$USER
+
+# Create an openhouse user as there's no reason to run as root user
+RUN groupadd --force -g $GROUP_ID $USER && useradd -l -d $USER_HOME -m $USER -u $USER_ID -g $GROUP_ID
+
+WORKDIR $USER_HOME
+
+# IMAGE does not set the necessary paths by default.
+ENV PATH=$PATH:/export/apps/jdk/JDK-1_8_0_172/bin/:$USER_HOME
+
+ARG VERSION="1.0.0-SNAPSHOT"
+ARG BUILD_DIR="build/$APP_NAME/libs"
+ARG JAR_FILES=$BUILD_DIR/*.jar
+
+COPY $JAR_FILES ./
+
+# Delete unwanted JAR files
+RUN ls ./
+RUN find . -name "*-sources.jar" -delete
+RUN find . -name "*-javadoc.jar" -delete
+RUN find . -name "*-lib.jar" -delete
+
+# Rename the JAR file.
+RUN ls ./
+RUN find ./ -name "*.jar" -exec mv {} $APP_NAME.jar \;
+
+RUN ls $APP_NAME.jar
+
+COPY run.sh .
+
+
+# Ensure that everything in $USER_HOME is owned by openhouse user
+RUN chown -R openhouse:openhouse $USER_HOME
+
+# Setup default path for Java
+RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default
+
+USER $USER
+
+EXPOSE 8080
+ENTRYPOINT ["sh", "-c", "./run.sh $APP_NAME.jar $@"]
diff --git a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java
index baf65c627..1c7945f9c 100644
--- a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java
+++ b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java
@@ -63,4 +63,12 @@ private MetricsConstant() {}
public static final String HTS_LIST_TABLES_REQUEST = "hts_list_tables_request";
public static final String HTS_LIST_TABLES_TIME = "hts_list_tables_time";
public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time";
+
+ // Tables post-commit operation framework (bounded, best-effort actions after commit).
+ // Tagged with op={operation name}; on failure additionally tagged with outcome={timeout,
+ // network_error, server_error, client_error, prepare_threw, unknown_error}.
+ // The duration timer is bounded by tables.postcommit.per-op-timeout-ms.
+ public static final String POSTCOMMIT_OP_DURATION = "postcommit_op_duration";
+ public static final String POSTCOMMIT_OP_SKIPPED = "postcommit_op_skipped";
+ public static final String POSTCOMMIT_OP_FAILED = "postcommit_op_failed";
}
diff --git a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java
index 1f4b31542..f344dcb2b 100644
--- a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java
+++ b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java
@@ -51,6 +51,7 @@ public class CadenceBasedOrphanFilesDeletionAnalyzer implements OperationAnalyze
private final CadencePolicy cadencePolicy;
+ @org.springframework.beans.factory.annotation.Autowired
public CadenceBasedOrphanFilesDeletionAnalyzer(
@Value("${ofd.success-retry-hours:16}") long successRetryHours,
@Value("${ofd.failure-retry-hours:1}") long failureRetryHours) {
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java
index 2ee40802f..ae4ee20b3 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java
@@ -9,13 +9,11 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@@ -34,10 +32,10 @@ public class TableOperationsController {
private final OptimizerDataService service;
/**
- * Report an update to an operation. {@code id} comes from the URL; the body's {@code operationId}
- * must match (the controller rejects mismatched requests with 400). The backend looks up the
- * operation row, writes a history entry with the operation's table metadata, and returns 201
- * Created with the history row, or 404 if the operation does not exist.
+ * Report an update to an operation. {@code id} comes from the URL; the body carries the terminal
+ * status and any per-operation metrics or error details. The backend looks up the operation row,
+ * writes a history entry with the operation's table metadata plus the supplied metrics, and
+ * returns 201 Created with the history row, or 404 if the operation does not exist.
*/
@ApiResponses(
value = {
@@ -48,21 +46,17 @@ public class TableOperationsController {
@PostMapping("/{id}/update")
public ResponseEntity updateOperation(
@PathVariable String id, @RequestBody UpdateOperationRequest request) {
- if (!StringUtils.hasText(request.getOperationId())) {
- throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "operationId is required");
- }
- if (!Objects.equals(id, request.getOperationId())) {
- throw new ResponseStatusException(
- HttpStatus.BAD_REQUEST,
- String.format(
- "operationId in body (%s) does not match path id (%s)",
- request.getOperationId(), id));
- }
if (request.getStatus() == null) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "status is required");
}
return service
- .updateOperation(id, request.getStatus().toModel())
+ .updateOperation(
+ id,
+ request.getStatus().toModel(),
+ request.getOrphanFilesDeleted(),
+ request.getOrphanBytesDeleted(),
+ request.getErrorMessage(),
+ request.getErrorType())
.map(
history ->
ResponseEntity.status(HttpStatus.CREATED)
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java
index b119dd1c7..87fbfa709 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java
@@ -24,10 +24,22 @@
/** REST controller for managing per-table stats in the optimizer DB. */
@RestController
-@RequestMapping("/v1/optimizer/stats")
+@RequestMapping(TableStatsController.BASE_PATH)
@RequiredArgsConstructor
public class TableStatsController {
+ /**
+ * Base path for the stats endpoints. Single source of truth — external callers must reference.
+ */
+ public static final String BASE_PATH = "/v1/optimizer/stats";
+
+ /**
+ * URI template (with {@code {tableUuid}} placeholder) for the per-table stats upsert/fetch
+ * endpoint. Use this from any client that calls the optimizer over HTTP, rather than rebuilding
+ * the path inline.
+ */
+ public static final String TABLE_PATH_TEMPLATE = BASE_PATH + "/{tableUuid}";
+
private final OptimizerDataService service;
/**
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java
index 7a000f840..307a3a892 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java
@@ -35,6 +35,18 @@ public class TableOperationsHistory {
/** {@code SUCCESS} or {@code FAILED}. */
private HistoryStatus status;
+ /** OFD-specific: number of orphan files deleted; null if not OFD or on failure. */
+ private Long orphanFilesDeleted;
+
+ /** OFD-specific: bytes reclaimed by orphan file deletion; null if not OFD or on failure. */
+ private Long orphanBytesDeleted;
+
+ /** On failure, the message from the Spark-side exception. Null on success. */
+ private String errorMessage;
+
+ /** On failure, the simple name of the Spark-side exception class. Null on success. */
+ private String errorType;
+
/** Convert to the internal-model counterpart. */
public TableOperationsHistoryDto toModel() {
return TableOperationsHistoryDto.builder()
@@ -45,6 +57,10 @@ public TableOperationsHistoryDto toModel() {
.operationType(operationType == null ? null : operationType.toModel())
.completedAt(completedAt)
.status(status == null ? null : status.toModel())
+ .orphanFilesDeleted(orphanFilesDeleted)
+ .orphanBytesDeleted(orphanBytesDeleted)
+ .errorMessage(errorMessage)
+ .errorType(errorType)
.build();
}
@@ -61,6 +77,10 @@ public static TableOperationsHistory fromModel(TableOperationsHistoryDto h) {
.operationType(OperationType.fromModel(h.getOperationType()))
.completedAt(h.getCompletedAt())
.status(HistoryStatus.fromModel(h.getStatus()))
+ .orphanFilesDeleted(h.getOrphanFilesDeleted())
+ .orphanBytesDeleted(h.getOrphanBytesDeleted())
+ .errorMessage(h.getErrorMessage())
+ .errorType(h.getErrorType())
.build();
}
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java
index a216e9db3..3931242db 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java
@@ -6,20 +6,19 @@
import lombok.NoArgsConstructor;
/**
- * Request body for {@code POST /v1/table-operations/update}.
+ * Request body for {@code POST /v1/optimizer/operations/{id}/update}.
*
- * Reports the outcome of a single operation update. The service looks up the operation row by
- * {@link #operationId} and writes a history entry for it.
+ *
Reports an update to a single operation. The operation row's UUID lives in the URL path; the
+ * body carries the terminal status and any operation-type-specific result metrics.
*
*
A single Spark job typically processes N tables and yields N independent (status) outcomes —
* one per operation. Callers issue one update request per operation; the service does not
* bulk-update by job.
*
- *
The remaining fields ({@link #tableUuid}, {@link #databaseName}, {@link #tableName}, {@link
- * #operationType}) are debug-only echo information. The server does not key off them; they are
- * preserved on log lines and traces so an operator looking at a failing update call can see which
- * (db, table, operation) the caller believed it was updating without joining back to the operation
- * row.
+ *
The debug-echo fields ({@link #tableUuid}, {@link #databaseName}, {@link #tableName}, {@link
+ * #operationType}) are optional. The server does not key off them; they are preserved on log lines
+ * and traces so an operator looking at a failing update call can see which (db, table, operation)
+ * the caller believed it was updating without joining back to the operation row.
*/
@Data
@Builder
@@ -27,12 +26,21 @@
@AllArgsConstructor
public class UpdateOperationRequest {
- /** Operation row's UUID — the primary lookup key. */
- private String operationId;
-
/** Terminal outcome for this single operation. */
private HistoryStatus status;
+ /** OFD-specific: number of orphan files deleted. Null on failure or non-OFD operation. */
+ private Long orphanFilesDeleted;
+
+ /** OFD-specific: bytes reclaimed. Null on failure or non-OFD operation. */
+ private Long orphanBytesDeleted;
+
+ /** On failure, the exception message from the Spark-side worker. Null on success. */
+ private String errorMessage;
+
+ /** On failure, the simple name of the exception class. Null on success. */
+ private String errorType;
+
/** Debug echo: stable table identity the caller believed it was completing. */
private String tableUuid;
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java
index 5f4a598d9..484c3e8dd 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java
@@ -72,4 +72,20 @@ public class TableOperationsHistoryRow {
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 20)
private HistoryStatus status;
+
+ /** OFD-specific: number of orphan files deleted; null if not an OFD operation or on failure. */
+ @Column(name = "orphan_files_deleted")
+ private Long orphanFilesDeleted;
+
+ /** OFD-specific: bytes reclaimed by orphan file deletion; null if not OFD or on failure. */
+ @Column(name = "orphan_bytes_deleted")
+ private Long orphanBytesDeleted;
+
+ /** On failure, the message from the Spark-side exception. Null on success. */
+ @Column(name = "error_message", length = 1024)
+ private String errorMessage;
+
+ /** On failure, the simple name of the Spark-side exception class. Null on success. */
+ @Column(name = "error_type", length = 256)
+ private String errorType;
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java
index 74922e7b0..20225fc84 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java
@@ -40,6 +40,18 @@ public class TableOperationsHistoryDto {
/** Terminal outcome: {@link HistoryStatusDto#SUCCESS} or {@link HistoryStatusDto#FAILED}. */
private HistoryStatusDto status;
+ /** OFD-specific: number of orphan files deleted; null if not an OFD operation or on failure. */
+ private Long orphanFilesDeleted;
+
+ /** OFD-specific: bytes reclaimed by orphan file deletion; null if not OFD or on failure. */
+ private Long orphanBytesDeleted;
+
+ /** On failure, the message from the Spark-side exception. Null on success. */
+ private String errorMessage;
+
+ /** On failure, the simple name of the Spark-side exception class. Null on success. */
+ private String errorType;
+
/** Convert to the corresponding DB row. */
public TableOperationsHistoryRow toRow() {
return TableOperationsHistoryRow.builder()
@@ -50,6 +62,10 @@ public TableOperationsHistoryRow toRow() {
.operationType(operationType == null ? null : operationType.toDb())
.completedAt(completedAt)
.status(status == null ? null : status.toDb())
+ .orphanFilesDeleted(orphanFilesDeleted)
+ .orphanBytesDeleted(orphanBytesDeleted)
+ .errorMessage(errorMessage)
+ .errorType(errorType)
.build();
}
@@ -66,6 +82,10 @@ public static TableOperationsHistoryDto fromRow(TableOperationsHistoryRow row) {
.operationType(OperationTypeDto.fromDb(row.getOperationType()))
.completedAt(row.getCompletedAt())
.status(HistoryStatusDto.fromDb(row.getStatus()))
+ .orphanFilesDeleted(row.getOrphanFilesDeleted())
+ .orphanBytesDeleted(row.getOrphanBytesDeleted())
+ .errorMessage(row.getErrorMessage())
+ .errorType(row.getErrorType())
.build();
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java
index c20ae7bf2..365e062b8 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java
@@ -37,10 +37,20 @@ List listTableOperations(
/**
* Update an operation by writing a history entry. Looks up the operation row by {@code
* operationId}, copies its table metadata into a new history row with the supplied terminal
- * {@code status}, and saves it. Returns the history record, or empty if the operation does not
- * exist.
+ * {@code status} and optional per-operation metrics / error details, and saves it. Returns the
+ * history record, or empty if the operation does not exist.
+ *
+ * The four trailing parameters are all nullable. Successful OFD operations populate {@code
+ * orphanFilesDeleted} / {@code orphanBytesDeleted}; failures populate {@code errorMessage} /
+ * {@code errorType}. Non-OFD operations leave all four null.
*/
- Optional updateOperation(String operationId, HistoryStatusDto status);
+ Optional updateOperation(
+ String operationId,
+ HistoryStatusDto status,
+ Long orphanFilesDeleted,
+ Long orphanBytesDeleted,
+ String errorMessage,
+ String errorType);
/**
* Return the operation row for {@code id} regardless of status, or empty if it does not exist.
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java
index 29fd0eeee..26408abb9 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java
@@ -67,7 +67,12 @@ public List listTableOperations(
@Override
@Transactional
public Optional updateOperation(
- String operationId, HistoryStatusDto status) {
+ String operationId,
+ HistoryStatusDto status,
+ Long orphanFilesDeleted,
+ Long orphanBytesDeleted,
+ String errorMessage,
+ String errorType) {
return operationsRepository
.findById(operationId)
.map(
@@ -80,6 +85,10 @@ public Optional updateOperation(
.operationType(OperationTypeDto.fromDb(row.getOperationType()))
.completedAt(Instant.now())
.status(status)
+ .orphanFilesDeleted(orphanFilesDeleted)
+ .orphanBytesDeleted(orphanBytesDeleted)
+ .errorMessage(errorMessage)
+ .errorType(errorType)
.build())
.map(history -> TableOperationsHistoryDto.fromRow(historyRepository.save(history.toRow())));
}
diff --git a/services/optimizer/src/main/resources/db/optimizer-schema.sql b/services/optimizer/src/main/resources/db/optimizer-schema.sql
index 892c1c55f..24071e2e8 100644
--- a/services/optimizer/src/main/resources/db/optimizer-schema.sql
+++ b/services/optimizer/src/main/resources/db/optimizer-schema.sql
@@ -38,13 +38,17 @@ CREATE TABLE IF NOT EXISTS table_stats_history (
);
CREATE TABLE IF NOT EXISTS table_operations_history (
- id VARCHAR(36) NOT NULL,
- table_uuid VARCHAR(36) NOT NULL,
- database_name VARCHAR(128) NOT NULL,
- table_name VARCHAR(128) NOT NULL,
- operation_type VARCHAR(50) NOT NULL,
- completed_at TIMESTAMP(6) NOT NULL,
- status VARCHAR(20) NOT NULL,
+ id VARCHAR(36) NOT NULL,
+ table_uuid VARCHAR(36) NOT NULL,
+ database_name VARCHAR(128) NOT NULL,
+ table_name VARCHAR(128) NOT NULL,
+ operation_type VARCHAR(50) NOT NULL,
+ completed_at TIMESTAMP(6) NOT NULL,
+ status VARCHAR(20) NOT NULL,
+ orphan_files_deleted BIGINT,
+ orphan_bytes_deleted BIGINT,
+ error_message VARCHAR(1024),
+ error_type VARCHAR(256),
PRIMARY KEY (id),
INDEX idx_toph_db_table (database_name, table_name),
-- Drives TableOperationHistoryRepository.findLatestPerTable: the correlated
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java
index b9c8dc3dc..18a077d45 100644
--- a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java
@@ -20,12 +20,12 @@
import org.springframework.transaction.annotation.Transactional;
/**
- * Exercises what the controllers own: server-side validation on {@code updateOperation} (path/body
- * mismatch, missing fields) and 404s on missing rows. Assertions are status-code-only: MockMvc does
- * not trigger Spring's error-dispatch to {@code BasicErrorController}, so the response body of a
- * {@link org.springframework.web.server.ResponseStatusException} is empty in tests even though it
- * is populated in production (with {@code server.error.include-message=always}). Framework-level
- * 4xx (missing query param, malformed JSON, etc.) is left to Spring's defaults and not asserted.
+ * Exercises what the controllers own: server-side validation on {@code updateOperation} (status
+ * required) and 404s on missing rows. Assertions are status-code-only: MockMvc does not trigger
+ * Spring's error-dispatch to {@code BasicErrorController}, so the response body of a {@link
+ * org.springframework.web.server.ResponseStatusException} is empty in tests even though it is
+ * populated in production (with {@code server.error.include-message=always}). Framework-level 4xx
+ * (missing query param, malformed JSON, etc.) is left to Spring's defaults and not asserted.
*/
@SpringBootTest
@AutoConfigureMockMvc
@@ -39,7 +39,7 @@ class ControllerErrorHandlingTest {
@Test
void updateOperation_notFound_returns404() throws Exception {
String id = UUID.randomUUID().toString();
- String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", id);
+ String body = "{\"status\":\"SUCCESS\"}";
mockMvc
.perform(
post("/v1/optimizer/operations/" + id + "/update")
@@ -48,40 +48,14 @@ void updateOperation_notFound_returns404() throws Exception {
.andExpect(status().isNotFound());
}
- @Test
- void updateOperation_pathBodyMismatch_returns400() throws Exception {
- String pathId = UUID.randomUUID().toString();
- String bodyId = UUID.randomUUID().toString();
- String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", bodyId);
- mockMvc
- .perform(
- post("/v1/optimizer/operations/" + pathId + "/update")
- .contentType(MediaType.APPLICATION_JSON)
- .content(body))
- .andExpect(status().isBadRequest());
- }
-
- @Test
- void updateOperation_missingOperationId_returns400() throws Exception {
- String pathId = UUID.randomUUID().toString();
- String body = "{\"status\":\"SUCCESS\"}";
- mockMvc
- .perform(
- post("/v1/optimizer/operations/" + pathId + "/update")
- .contentType(MediaType.APPLICATION_JSON)
- .content(body))
- .andExpect(status().isBadRequest());
- }
-
@Test
void updateOperation_missingStatus_returns400() throws Exception {
String id = UUID.randomUUID().toString();
- String body = String.format("{\"operationId\":\"%s\"}", id);
mockMvc
.perform(
post("/v1/optimizer/operations/" + id + "/update")
.contentType(MediaType.APPLICATION_JSON)
- .content(body))
+ .content("{}"))
.andExpect(status().isBadRequest());
}
@@ -112,7 +86,7 @@ void updateOperation_happyPath_returns201() throws Exception {
.scheduledAt(Instant.now())
.jobId("job-x")
.build());
- String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", id);
+ String body = "{\"status\":\"SUCCESS\"}";
mockMvc
.perform(
post("/v1/optimizer/operations/" + id + "/update")
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java
index 2a3c1e676..8c4d6ecf2 100644
--- a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java
@@ -37,7 +37,7 @@ class OptimizerDataServiceImplTest {
// --- updateOperation ---
@Test
- void completeOperation_writesHistoryFromOperationRow() {
+ void updateOperation_writesHistoryFromOperationRow() {
String operationId = UUID.randomUUID().toString();
String tableUuid = UUID.randomUUID().toString();
operationsRepository.save(
@@ -54,7 +54,7 @@ void completeOperation_writesHistoryFromOperationRow() {
.build());
Optional result =
- service.updateOperation(operationId, HistoryStatusDto.SUCCESS);
+ service.updateOperation(operationId, HistoryStatusDto.SUCCESS, 42L, 1024L, null, null);
assertThat(result).isPresent();
assertThat(result.get().getStatus()).isEqualTo(HistoryStatusDto.SUCCESS);
@@ -62,12 +62,45 @@ void completeOperation_writesHistoryFromOperationRow() {
assertThat(result.get().getOperationType()).isEqualTo(OperationTypeDto.ORPHAN_FILES_DELETION);
assertThat(result.get().getDatabaseName()).isEqualTo("db1");
assertThat(result.get().getCompletedAt()).isNotNull();
+ assertThat(result.get().getOrphanFilesDeleted()).isEqualTo(42L);
+ assertThat(result.get().getOrphanBytesDeleted()).isEqualTo(1024L);
+ assertThat(result.get().getErrorMessage()).isNull();
+ assertThat(result.get().getErrorType()).isNull();
}
@Test
- void completeOperation_notFound_returnsEmpty() {
+ void updateOperation_failurePersistsErrorFields() {
+ String operationId = UUID.randomUUID().toString();
+ operationsRepository.save(
+ TableOperationsRow.builder()
+ .id(operationId)
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION)
+ .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED)
+ .createdAt(Instant.now())
+ .scheduledAt(Instant.now())
+ .jobId("spark-job-456")
+ .build());
+
+ Optional result =
+ service.updateOperation(
+ operationId, HistoryStatusDto.FAILED, null, null, "boom", "RuntimeException");
+
+ assertThat(result).isPresent();
+ assertThat(result.get().getStatus()).isEqualTo(HistoryStatusDto.FAILED);
+ assertThat(result.get().getOrphanFilesDeleted()).isNull();
+ assertThat(result.get().getOrphanBytesDeleted()).isNull();
+ assertThat(result.get().getErrorMessage()).isEqualTo("boom");
+ assertThat(result.get().getErrorType()).isEqualTo("RuntimeException");
+ }
+
+ @Test
+ void updateOperation_notFound_returnsEmpty() {
Optional result =
- service.updateOperation(UUID.randomUUID().toString(), HistoryStatusDto.FAILED);
+ service.updateOperation(
+ UUID.randomUUID().toString(), HistoryStatusDto.FAILED, null, null, null, null);
assertThat(result).isEmpty();
}
diff --git a/services/tables/build.gradle b/services/tables/build.gradle
index c85a57131..da42c32ef 100644
--- a/services/tables/build.gradle
+++ b/services/tables/build.gradle
@@ -44,6 +44,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version
testImplementation 'org.springframework.security:spring-security-test:5.7.3'
testImplementation 'org.springframework:spring-context-support:5.3.18'
+ testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0'
testImplementation(testFixtures(project(':services:common')))
testImplementation (project(':tables-test-fixtures:tables-test-fixtures_2.12')) {
exclude group: 'com.linkedin.iceberg'
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java
index 08111cd82..9315df9d8 100644
--- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java
@@ -73,6 +73,7 @@ public interface TablesMapper {
@Mapping(source = "requestBody.sortOrder", target = "sortOrder"),
@Mapping(target = "lastModifiedTime", ignore = true),
@Mapping(target = "creationTime", ignore = true),
+ @Mapping(target = "currentSnapshot", ignore = true)
})
TableDto toTableDto(TableDto tableDto, CreateUpdateTableRequestBody requestBody);
@@ -120,7 +121,8 @@ public interface TablesMapper {
defaultExpression = "java(TableType.PRIMARY_TABLE)"),
@Mapping(source = "requestBody.createUpdateTableRequestBody.sortOrder", target = "sortOrder"),
@Mapping(target = "lastModifiedTime", ignore = true),
- @Mapping(target = "creationTime", ignore = true)
+ @Mapping(target = "creationTime", ignore = true),
+ @Mapping(target = "currentSnapshot", ignore = true)
})
TableDto toTableDto(TableDto tableDto, IcebergSnapshotsRequestBody requestBody);
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java
new file mode 100644
index 000000000..90437d418
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java
@@ -0,0 +1,30 @@
+package com.linkedin.openhouse.tables.model;
+
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+// In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a
+// TableDto.
+//
+// This carries only fields already materialized by the catalog client. Constructing it never
+// triggers a separate HDFS or object-store read.
+//
+// The value is present whenever the underlying table has at least one committed snapshot at
+// construction time. It is absent (modeled as Optional.empty() on TableDto) for tables with no
+// committed data, such as a CREATE TABLE with no rows yet.
+@Getter
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode
+public class CurrentSnapshotInfo {
+
+ // Iceberg snapshot ID. Stable per commit; usable as an idempotency token.
+ private final long snapshotId;
+
+ // Iceberg Snapshot.summary() map, unmodified. Keys include total-data-files, total-files-size,
+ // added-data-files, deleted-data-files, added-files-size, removed-files-size.
+ private final Map summary;
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java
index 917c632d8..59dc2e80f 100644
--- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java
@@ -11,11 +11,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import javax.persistence.Convert;
import javax.persistence.ElementCollection;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
+import javax.persistence.Transient;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -83,6 +85,23 @@ public class TableDto {
private boolean replaceCommit;
+ // In-memory current-snapshot metadata captured when this TableDto was built from an Iceberg
+ // Table.
+ //
+ // Present whenever the underlying table has at least one committed snapshot at that point.
+ // Absent for tables with no committed data, such as a CREATE TABLE with no rows yet.
+ //
+ // Not persisted and not part of equality. Read through getCurrentSnapshot().
+ @Getter(AccessLevel.NONE)
+ @Transient
+ @EqualsAndHashCode.Exclude
+ private CurrentSnapshotInfo currentSnapshot;
+
+ // Returns the current-snapshot metadata if any, else Optional.empty().
+ public Optional getCurrentSnapshot() {
+ return Optional.ofNullable(currentSnapshot);
+ }
+
/**
* Bundling eligible string type field into a map as {@link org.mapstruct.Mapper} doesn't provide
* easy interface to achieve so.
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java
index 0dde039c0..d3ced5458 100644
--- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java
@@ -9,6 +9,7 @@
import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.PoliciesSpecMapper;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.TableTypeMapper;
+import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.repository.PreservedKeyChecker;
import java.net.URI;
@@ -135,6 +136,13 @@ static TableDto convertToTableDto(
.jsonSnapshots(null)
.tableProperties(megaProps)
.sortOrder(SortOrderParser.toJson(table.sortOrder()))
+ .currentSnapshot(
+ table.currentSnapshot() == null
+ ? null
+ : CurrentSnapshotInfo.builder()
+ .snapshotId(table.currentSnapshot().snapshotId())
+ .summary(table.currentSnapshot().summary())
+ .build())
.build();
return tableDto;
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java
index af43a169e..6623b5f37 100644
--- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java
@@ -10,6 +10,7 @@
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey;
import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository;
+import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher;
import com.linkedin.openhouse.tables.utils.AuthorizationUtils;
import com.linkedin.openhouse.tables.utils.TableUUIDGenerator;
import java.util.Optional;
@@ -32,6 +33,13 @@ public class IcebergSnapshotsServiceImpl implements IcebergSnapshotsService {
@Autowired AuthorizationUtils authorizationUtils;
+ /**
+ * Present only when {@code tables.postcommit.enabled=true}. When absent, the on-commit hook is a
+ * literal no-op and no post-commit operations run.
+ */
+ @Autowired(required = false)
+ Optional postCommitDispatcher = Optional.empty();
+
@Override
public Pair putIcebergSnapshots(
String databaseId,
@@ -83,7 +91,9 @@ public Pair putIcebergSnapshots(
databaseId, tableCreatorUpdater, Privileges.CREATE_TABLE);
}
try {
- return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent());
+ TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave);
+ postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto));
+ return Pair.of(savedDto, !tableDto.isPresent());
} catch (BadRequestException e) {
throw new RequestValidationFailureException(e.getMessage(), e);
} catch (CommitFailedException ce) {
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java
new file mode 100644
index 000000000..47613f98c
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java
@@ -0,0 +1,36 @@
+package com.linkedin.openhouse.tables.services.optimizer;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.netty.http.client.HttpClient;
+
+// Wiring for the post-commit Tables-to-Optimizer stats push.
+//
+// This configuration is active only when optimizer.stats.enabled=true. When the flag is false,
+// no WebClient bean is constructed, and the dispatcher sees no optimizer-stats operation.
+@Configuration
+@EnableConfigurationProperties(OptimizerStatsProperties.class)
+@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true")
+public class OptimizerStatsConfig {
+
+ // Dedicated WebClient for the optimizer stats endpoint.
+ //
+ // The per-attempt timeout is applied on the Reactor chain in OptimizerStatsPostCommitOperation,
+ // and the outer per-op timeout is applied by PostCommitDispatcher. Neither timeout is
+ // configured on the underlying Netty client.
+ //
+ // This arrangement ensures that any timeout always emerges as a standard
+ // java.util.concurrent.TimeoutException rather than a Netty ReadTimeoutException, which keeps
+ // the dispatcher's outcome classification simple.
+ @Bean("optimizerStatsWebClient")
+ public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) {
+ return WebClient.builder()
+ .baseUrl(properties.getBaseUri())
+ .clientConnector(new ReactorClientHttpConnector(HttpClient.create()))
+ .build();
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java
new file mode 100644
index 000000000..756da2d10
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java
@@ -0,0 +1,168 @@
+package com.linkedin.openhouse.tables.services.optimizer;
+
+import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo;
+import com.linkedin.openhouse.tables.model.TableDto;
+import com.linkedin.openhouse.tables.services.postcommit.PostCommitOperation;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetrySpec;
+
+// A PostCommitOperation that PUTs a snapshot-stats record to the optimizer's per-table stats
+// endpoint.
+//
+// prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise. The
+// dispatcher owns the timeout, the subscription, error swallowing, and metric emission. The
+// returned value is Optional.empty() when the table is not opted in via OPT_IN_PROPERTY or has
+// no current snapshot.
+//
+// Internal retry is bounded by OptimizerStatsProperties.maxAttempts and fires only on retryable
+// errors: network failures, a TimeoutException, or HTTP 408 / 429 / 5xx responses. The
+// dispatcher's per-op timeout is the hard ceiling on the whole chain.
+//
+// The bean is wired only when optimizer.stats.enabled=true. PATH_TEMPLATE is intentionally
+// duplicated from TableStatsController.TABLE_PATH_TEMPLATE so the tables service does not take a
+// compile-time dependency on the optimizer jar. Keep both copies in sync.
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true")
+public class OptimizerStatsPostCommitOperation implements PostCommitOperation {
+
+ // Metric tag value for the "op" tag.
+ static final String OP_NAME = "optimizer_stats";
+
+ // Per-call URL path. Intentionally duplicated from TableStatsController.TABLE_PATH_TEMPLATE so
+ // that we avoid a compile-time dependency on the optimizer jar. Keep both copies in sync.
+ static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}";
+
+ // Table-property key that opts a table in for the post-commit stats push.
+ static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled";
+
+ // Iceberg snapshot-summary keys we read. All values are decimal-string longs.
+ static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files";
+ static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size";
+ static final String SUMMARY_ADDED_DATA_FILES = "added-data-files";
+ static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files";
+ static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size";
+ static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size";
+
+ private final WebClient webClient;
+ private final OptimizerStatsProperties properties;
+
+ public OptimizerStatsPostCommitOperation(
+ @Qualifier("optimizerStatsWebClient") WebClient webClient,
+ OptimizerStatsProperties properties) {
+ this.webClient = webClient;
+ this.properties = properties;
+ }
+
+ @Override
+ public String name() {
+ return OP_NAME;
+ }
+
+ @Override
+ public Optional> prepare(TableDto savedDto) {
+ if (!isOptedIn(savedDto)) {
+ return Optional.empty();
+ }
+ Optional snapshot = savedDto.getCurrentSnapshot();
+ if (!snapshot.isPresent()) {
+ return Optional.empty();
+ }
+
+ OptimizerStatsRequest body = buildRequest(savedDto, snapshot.get());
+ String tableUuid = savedDto.getTableUUID();
+
+ RetrySpec retrySpec =
+ Retry.max(Math.max(0, properties.getMaxAttempts() - 1)).filter(this::isRetryable);
+
+ Mono chain =
+ webClient
+ .put()
+ .uri(PATH_TEMPLATE, tableUuid)
+ .bodyValue(body)
+ .retrieve()
+ .toBodilessEntity()
+ .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs()))
+ .retryWhen(retrySpec.onRetryExhaustedThrow((spec, signal) -> signal.failure()))
+ .then();
+ return Optional.of(chain);
+ }
+
+ // Returns true when the table's properties contain the literal opt-in value "true" for the
+ // configured OPT_IN_PROPERTY.
+ private boolean isOptedIn(TableDto saved) {
+ Map props = saved.getTableProperties();
+ return props != null && "true".equals(props.get(OPT_IN_PROPERTY));
+ }
+
+ // Builds the wire body. Missing summary keys default to 0L.
+ OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) {
+ Map summary = snapshot.getSummary();
+ OptimizerStatsRequest.Snapshot snapshotPayload =
+ OptimizerStatsRequest.Snapshot.builder()
+ .snapshotId(snapshot.getSnapshotId())
+ .tableVersion(saved.getTableVersion())
+ .tableLocation(saved.getTableLocation())
+ .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE))
+ .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES))
+ .build();
+ OptimizerStatsRequest.Delta delta =
+ OptimizerStatsRequest.Delta.builder()
+ .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES))
+ .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES))
+ .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE))
+ .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE))
+ .build();
+ return OptimizerStatsRequest.builder()
+ .databaseName(saved.getDatabaseId())
+ .tableName(saved.getTableId())
+ .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshotPayload).delta(delta).build())
+ .tableProperties(saved.getTableProperties())
+ .build();
+ }
+
+ // Returns true for errors that a later attempt could plausibly succeed against: a per-attempt
+ // timeout, a network-level failure, or an HTTP 5xx / 408 / 429 response.
+ //
+ // Other 4xx responses are client errors that retries cannot fix, so we fail fast on them.
+ boolean isRetryable(Throwable e) {
+ if (e instanceof TimeoutException) {
+ return true;
+ }
+ if (e instanceof WebClientRequestException) {
+ return true;
+ }
+ if (e instanceof WebClientResponseException) {
+ int code = ((WebClientResponseException) e).getStatusCode().value();
+ return code == 408 || code == 429 || (code >= 500 && code < 600);
+ }
+ return false;
+ }
+
+ private static long longOrZero(Map m, String key) {
+ if (m == null) {
+ return 0L;
+ }
+ String v = m.get(key);
+ if (v == null) {
+ return 0L;
+ }
+ try {
+ return Long.parseLong(v);
+ } catch (NumberFormatException nfe) {
+ return 0L;
+ }
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java
new file mode 100644
index 000000000..877ce3423
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java
@@ -0,0 +1,35 @@
+package com.linkedin.openhouse.tables.services.optimizer;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+// Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats
+// endpoint. Property prefix is optimizer.stats.
+//
+// When enabled is false (the default), no client bean is constructed, and the operation is
+// absent from the dispatcher. Environments that want the feed turn it on and set baseUri.
+@ConfigurationProperties("optimizer.stats")
+@Getter
+@Setter
+public class OptimizerStatsProperties {
+
+ // Master switch. When false, no HTTP push is attempted and no client bean is wired.
+ private boolean enabled = false;
+
+ // Base URI of the Optimizer service. The path /v1/optimizer/stats/{tableUuid} is appended at
+ // call time. No default; required when enabled is true.
+ private String baseUri;
+
+ // Per-attempt HTTP timeout in milliseconds.
+ //
+ // Bounds each individual attempt so retries can fit inside the dispatcher's outer per-op budget
+ // (tables.postcommit.per-op-timeout-ms, default 3000 ms). Default is 1000 ms.
+ private long perAttemptTimeoutMs = 1000L;
+
+ // Total attempt count (the initial try plus retries). Default is 3.
+ //
+ // Retries fire only on retryable errors: network failures, a timeout, or an HTTP 408 / 429 /
+ // 5xx response. Other 4xx responses fail fast without retry.
+ private int maxAttempts = 3;
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java
new file mode 100644
index 000000000..2cb2b2873
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java
@@ -0,0 +1,67 @@
+package com.linkedin.openhouse.tables.services.optimizer;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+// Wire body for PUT /v1/optimizer/stats/{tableUuid}.
+//
+// This mirrors the optimizer's UpsertTableStatsRequest field-for-field. The type is duplicated
+// here so that the tables service does not take a compile-time dependency on the optimizer jar.
+// Keep both copies in sync.
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class OptimizerStatsRequest {
+
+ private String databaseName;
+ private String tableName;
+ private Stats stats;
+ private Map tableProperties;
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public static class Stats {
+ private Snapshot snapshot;
+ private Delta delta;
+ }
+
+ // Point-in-time snapshot metrics. Maps to the optimizer's
+ // TableStatsPayload.SnapshotMetricsDto.
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public static class Snapshot {
+ // Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and
+ // reject out-of-order replays.
+ private Long snapshotId;
+
+ private String tableVersion;
+ private String tableLocation;
+ private Long tableSizeBytes;
+ private Long numCurrentFiles;
+ }
+
+ // Per-commit incremental counters. Maps to the optimizer's TableStatsPayload.CommitDeltaDto.
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public static class Delta {
+ private Long numFilesAdded;
+ private Long numFilesDeleted;
+ private Long addedSizeBytes;
+ private Long deletedSizeBytes;
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java
new file mode 100644
index 000000000..5a04e2df7
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java
@@ -0,0 +1,136 @@
+package com.linkedin.openhouse.tables.services.postcommit;
+
+import com.linkedin.openhouse.common.metrics.MetricsConstant;
+import com.linkedin.openhouse.tables.model.TableDto;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+// Runs PostCommitOperations after a successful Iceberg commit.
+//
+// Each operation receives a wall-clock timeout from tables.postcommit.per-op-timeout-ms. Any
+// error the operation signals is recorded as a metric and a log line, then swallowed. Dispatch
+// is fire-and-forget, so the commit thread is never blocked on operation work.
+//
+// Operations describe payload and endpoint only. The timeout, error swallowing, subscription,
+// and metric emission all live here, so the contract across operations stays uniform.
+//
+// The bean is constructed only when tables.postcommit.enabled=true.
+@Slf4j
+@Component
+@EnableConfigurationProperties(PostCommitProperties.class)
+@ConditionalOnProperty(prefix = "tables.postcommit", name = "enabled", havingValue = "true")
+public class PostCommitDispatcher {
+
+ private final List operations;
+ private final PostCommitProperties properties;
+ private final MeterRegistry meterRegistry;
+
+ public PostCommitDispatcher(
+ List operations,
+ PostCommitProperties properties,
+ MeterRegistry meterRegistry) {
+ this.operations = operations;
+ this.properties = properties;
+ this.meterRegistry = meterRegistry;
+ }
+
+ // Dispatches all registered operations for savedDto.
+ //
+ // Returns immediately on the calling thread. Each operation runs on its underlying reactive
+ // scheduler. This method never throws.
+ public void dispatch(TableDto savedDto) {
+ for (PostCommitOperation op : operations) {
+ decorate(op, savedDto).ifPresent(Mono::subscribe);
+ }
+ }
+
+ // Returns the fully-decorated Mono for op without subscribing to it.
+ //
+ // The decoration applies the per-op timeout, records the success or error metric, and swallows
+ // any error. When the operation does not apply (or its prepare() throws synchronously), this
+ // method emits the "skipped" or "prepare_threw" metric and returns Optional.empty().
+ //
+ // Package-private so that tests can .block() on the chain rather than poll for metric emission
+ // after a fire-and-forget subscription.
+ Optional> decorate(PostCommitOperation op, TableDto savedDto) {
+ Optional> work;
+ try {
+ work = op.prepare(savedDto);
+ } catch (RuntimeException e) {
+ // Defensive: a prepare() that throws synchronously must not break dispatch of later ops.
+ meterRegistry
+ .counter(
+ MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", "prepare_threw")
+ .increment();
+ log.warn("Post-commit op {} prepare() threw {}", op.name(), e.toString());
+ return Optional.empty();
+ }
+ if (!work.isPresent()) {
+ meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", op.name()).increment();
+ return Optional.empty();
+ }
+ Timer.Sample sample = Timer.start(meterRegistry);
+ Mono decorated =
+ work.get()
+ .timeout(Duration.ofMillis(properties.getPerOpTimeoutMs()))
+ .doOnSuccess(
+ ignored ->
+ sample.stop(
+ meterRegistry.timer(
+ MetricsConstant.POSTCOMMIT_OP_DURATION,
+ "op",
+ op.name(),
+ "outcome",
+ "success")))
+ .onErrorResume(
+ e -> {
+ String outcome = classifyOutcome(e);
+ sample.stop(
+ meterRegistry.timer(
+ MetricsConstant.POSTCOMMIT_OP_DURATION,
+ "op",
+ op.name(),
+ "outcome",
+ outcome));
+ meterRegistry
+ .counter(
+ MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", outcome)
+ .increment();
+ log.warn("Post-commit op {} failed ({}): {}", op.name(), outcome, e.toString());
+ return Mono.empty();
+ });
+ return Optional.of(decorated);
+ }
+
+ // Maps a terminal error to a small set of outcome tags. The classifier lives here so that all
+ // operations share the same taxonomy.
+ private static String classifyOutcome(Throwable e) {
+ if (e instanceof TimeoutException) {
+ return "timeout";
+ }
+ if (e instanceof WebClientRequestException) {
+ return "network_error";
+ }
+ if (e instanceof WebClientResponseException) {
+ int code = ((WebClientResponseException) e).getStatusCode().value();
+ if (code >= 500 && code < 600) {
+ return "server_error";
+ }
+ if (code >= 400 && code < 500) {
+ return "client_error";
+ }
+ }
+ return "unknown_error";
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java
new file mode 100644
index 000000000..6cf99cd7e
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java
@@ -0,0 +1,26 @@
+package com.linkedin.openhouse.tables.services.postcommit;
+
+import com.linkedin.openhouse.tables.model.TableDto;
+import java.util.Optional;
+import reactor.core.publisher.Mono;
+
+// A best-effort, bounded, asynchronous action that the Tables Service runs after a successful
+// Iceberg commit.
+//
+// Operations are invoked by PostCommitDispatcher, which owns the per-op timeout, the
+// subscription, error swallowing, and metric emission. An error signaled from prepare() is
+// recorded and dropped, so it never affects commit correctness.
+//
+// Implementations describe what to push and where. They may apply internal retries within a
+// bounded budget. They must not subscribe themselves or apply an outer timeout, because the
+// dispatcher already owns both concerns.
+public interface PostCommitOperation {
+
+ // Returns a short, stable identifier for this operation. Used as the "op" metric tag.
+ String name();
+
+ // Builds the work to run for savedDto, or returns Optional.empty() when the operation does not
+ // apply to this commit (for example, when the table is not opted in or has no committed
+ // snapshot). The dispatcher records an empty return as a "skipped" metric.
+ Optional> prepare(TableDto savedDto);
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java
new file mode 100644
index 000000000..98e8b02eb
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java
@@ -0,0 +1,27 @@
+package com.linkedin.openhouse.tables.services.postcommit;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+// Configuration for the Tables Service post-commit operation framework. Property prefix is
+// tables.postcommit.
+//
+// When enabled is false (the default), the dispatcher bean is not constructed, and any
+// registered PostCommitOperation beans are never invoked.
+@ConfigurationProperties("tables.postcommit")
+@Getter
+@Setter
+public class PostCommitProperties {
+
+ // Master switch. When false, no operations are dispatched on commit.
+ private boolean enabled = false;
+
+ // Wall-clock ceiling that the dispatcher applies to each operation's prepared Mono.
+ //
+ // This bounds resource occupancy (connections, threads) for a misbehaving operation. It does
+ // not block the commit thread, because operations run on the underlying reactive scheduler.
+ //
+ // Default is 3000 ms.
+ private long perOpTimeoutMs = 3000L;
+}
diff --git a/services/tables/src/main/resources/application.properties b/services/tables/src/main/resources/application.properties
index 4a6c82a89..24f5554c5 100644
--- a/services/tables/src/main/resources/application.properties
+++ b/services/tables/src/main/resources/application.properties
@@ -23,4 +23,16 @@ management.metrics.distribution.maximum-expected-value.catalog_metadata_retrieva
management.metrics.distribution.maximum-expected-value.catalog_metadata_update_latency=600s
management.metrics.distribution.maximum-expected-value.http.server.requests=600s
server.shutdown=graceful
-spring.lifecycle.timeout-per-shutdown-phase=60s
\ No newline at end of file
+spring.lifecycle.timeout-per-shutdown-phase=60s
+
+# Post-commit operation framework. Disabled by default; flip per OH instance.
+# Outer per-op wall-clock budget is enforced by PostCommitDispatcher.
+tables.postcommit.enabled=${TABLES_POSTCOMMIT_ENABLED:false}
+tables.postcommit.per-op-timeout-ms=${TABLES_POSTCOMMIT_PER_OP_TIMEOUT_MS:3000}
+
+# Optimizer stats post-commit operation (one impl of PostCommitOperation).
+# Both flags must be true for the push to be wired and dispatched.
+optimizer.stats.enabled=${OPTIMIZER_STATS_ENABLED:false}
+optimizer.stats.base-uri=${OPTIMIZER_STATS_BASE_URI:}
+optimizer.stats.per-attempt-timeout-ms=${OPTIMIZER_STATS_PER_ATTEMPT_TIMEOUT_MS:1000}
+optimizer.stats.max-attempts=${OPTIMIZER_STATS_MAX_ATTEMPTS:3}
\ No newline at end of file
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java
index 44467b705..648ae1871 100644
--- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java
@@ -11,11 +11,14 @@
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies;
import com.linkedin.openhouse.tables.dto.mapper.TablesMapper;
import com.linkedin.openhouse.tables.dto.mapper.TablesMapperImpl;
+import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey;
import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository;
import com.linkedin.openhouse.tables.services.IcebergSnapshotsService;
+import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher;
import com.linkedin.openhouse.tables.utils.TableUUIDGenerator;
+import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import org.apache.iceberg.exceptions.BadRequestException;
@@ -45,6 +48,13 @@ public class IcebergSnapshotsServiceTest {
@MockBean private TableUUIDGenerator tableUUIDGenerator;
+ /**
+ * Forces the optional post-commit dispatcher into the service so that we can verify it is invoked
+ * after a successful save. Production wiring is conditional on {@code
+ * tables.postcommit.enabled=true}; this {@code @MockBean} bypasses that condition.
+ */
+ @MockBean private PostCommitDispatcher postCommitDispatcher;
+
private OpenHouseInternalRepository mockRepository;
@Captor ArgumentCaptor tableDtoArgumentCaptor;
@@ -90,6 +100,32 @@ public void testTableCreated() {
verifyCalls(key, TEST_TABLE_CREATOR, requestBody.getCreateUpdateTableRequestBody());
}
+ @Test
+ public void testPostCommitDispatcherInvokedAfterSuccessfulCommit() {
+ final IcebergSnapshotsRequestBody requestBody =
+ TEST_ICEBERG_SNAPSHOTS_INITIAL_VERSION_REQUEST_BODY;
+ final String dbId = requestBody.getCreateUpdateTableRequestBody().getDatabaseId();
+ final String tableId = requestBody.getCreateUpdateTableRequestBody().getTableId();
+ final TableDtoPrimaryKey key =
+ TableDtoPrimaryKey.builder().databaseId(dbId).tableId(tableId).build();
+ final CurrentSnapshotInfo snapshot =
+ CurrentSnapshotInfo.builder()
+ .snapshotId(42L)
+ .summary(Collections.singletonMap("total-data-files", "7"))
+ .build();
+ final TableDto savedDto =
+ TableDto.builder().databaseId(dbId).tableId(tableId).currentSnapshot(snapshot).build();
+
+ Mockito.when(tableUUIDGenerator.generateUUID(Mockito.any(IcebergSnapshotsRequestBody.class)))
+ .thenReturn(UUID.randomUUID());
+ Mockito.when(mockRepository.findById(key)).thenReturn(Optional.empty());
+ Mockito.when(mockRepository.save(Mockito.any(TableDto.class))).thenReturn(savedDto);
+
+ service.putIcebergSnapshots(dbId, tableId, requestBody, TEST_TABLE_CREATOR);
+
+ Mockito.verify(postCommitDispatcher, Mockito.times(1)).dispatch(savedDto);
+ }
+
@Test
public void testPutTableExceptionHandling() {
final IcebergSnapshotsRequestBody requestBody =
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java
index e3e19a7d8..53fb633e3 100644
--- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java
@@ -33,7 +33,8 @@ public class TableDtoMappingTest {
"jsonSnapshots",
"snapshotRefs",
"policies",
- "tableType");
+ "tableType",
+ "currentSnapshot");
/** Making all fields making it to map is expected, and all expected field are making it there. */
@Test
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java
new file mode 100644
index 000000000..417db2abf
--- /dev/null
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java
@@ -0,0 +1,211 @@
+package com.linkedin.openhouse.tables.services.optimizer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo;
+import com.linkedin.openhouse.tables.model.TableDto;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+
+class OptimizerStatsPostCommitOperationTest {
+
+ private static final String TABLE_UUID = "uuid-1";
+ private static final String DB = "db1";
+ private static final String TABLE = "tbl1";
+ private static final long SNAPSHOT_ID = 9876543210L;
+ private static final Duration BLOCK_MAX = Duration.ofSeconds(5);
+
+ private MockWebServer server;
+ private OptimizerStatsProperties properties;
+ private OptimizerStatsPostCommitOperation op;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @BeforeEach
+ void setUp() throws IOException {
+ server = new MockWebServer();
+ server.start();
+ properties = new OptimizerStatsProperties();
+ properties.setEnabled(true);
+ properties.setBaseUri(server.url("/").toString());
+ properties.setPerAttemptTimeoutMs(1000L);
+ properties.setMaxAttempts(3);
+ op = new OptimizerStatsPostCommitOperation(buildWebClient(), properties);
+ }
+
+ @AfterEach
+ void tearDown() throws IOException {
+ server.shutdown();
+ }
+
+ private WebClient buildWebClient() {
+ return WebClient.builder()
+ .baseUrl(properties.getBaseUri())
+ .clientConnector(new ReactorClientHttpConnector(HttpClient.create()))
+ .build();
+ }
+
+ @Test
+ void name_isStableTagValue() {
+ assertThat(op.name()).isEqualTo("optimizer_stats");
+ }
+
+ @Test
+ void prepare_optedIn_emitsRequestWithFullPayload() throws Exception {
+ server.enqueue(new MockResponse().setResponseCode(200));
+
+ TableDto saved =
+ optedInBuilder()
+ .tableVersion("v3")
+ .tableLocation("/data/tables/db1/tbl1")
+ .currentSnapshot(snapshot())
+ .build();
+
+ Optional> work = op.prepare(saved);
+ assertThat(work).isPresent();
+ work.get().block(BLOCK_MAX);
+
+ RecordedRequest req = server.takeRequest(2, TimeUnit.SECONDS);
+ assertThat(req).isNotNull();
+ assertThat(req.getMethod()).isEqualTo("PUT");
+ assertThat(req.getPath()).isEqualTo("/v1/optimizer/stats/" + TABLE_UUID);
+
+ @SuppressWarnings("unchecked")
+ Map body = mapper.readValue(req.getBody().readUtf8(), Map.class);
+ assertThat(body).containsEntry("databaseName", DB).containsEntry("tableName", TABLE);
+ @SuppressWarnings("unchecked")
+ Map stats = (Map) body.get("stats");
+ @SuppressWarnings("unchecked")
+ Map snapshot = (Map) stats.get("snapshot");
+ @SuppressWarnings("unchecked")
+ Map delta = (Map) stats.get("delta");
+ assertThat(snapshot)
+ .containsEntry("snapshotId", SNAPSHOT_ID)
+ .containsEntry("tableVersion", "v3")
+ .containsEntry("tableLocation", "/data/tables/db1/tbl1")
+ .containsEntry("tableSizeBytes", 4096)
+ .containsEntry("numCurrentFiles", 12);
+ assertThat(delta)
+ .containsEntry("numFilesAdded", 5)
+ .containsEntry("numFilesDeleted", 2)
+ .containsEntry("addedSizeBytes", 2048)
+ .containsEntry("deletedSizeBytes", 1024);
+ }
+
+ @Test
+ void prepare_notOptedIn_returnsEmpty() {
+ TableDto saved = builderWithoutOptIn().currentSnapshot(snapshot()).build();
+ assertThat(op.prepare(saved)).isEmpty();
+ assertThat(server.getRequestCount()).isZero();
+ }
+
+ @Test
+ void prepare_noSnapshot_returnsEmpty() {
+ TableDto saved = optedInBuilder().currentSnapshot(null).build();
+ assertThat(op.prepare(saved)).isEmpty();
+ assertThat(server.getRequestCount()).isZero();
+ }
+
+ @Test
+ void prepare_5xxThenSuccess_retriesAndCompletes() {
+ server.enqueue(new MockResponse().setResponseCode(503));
+ server.enqueue(new MockResponse().setResponseCode(200));
+
+ TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build();
+ op.prepare(saved).orElseThrow(AssertionError::new).block(BLOCK_MAX);
+
+ assertThat(server.getRequestCount()).isEqualTo(2);
+ }
+
+ @Test
+ void prepare_allAttemptsFail_propagatesUnderlyingError() {
+ properties.setMaxAttempts(2);
+ server.enqueue(new MockResponse().setResponseCode(503));
+ server.enqueue(new MockResponse().setResponseCode(503));
+
+ TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build();
+ Mono chain = op.prepare(saved).orElseThrow(AssertionError::new);
+
+ assertThatChainErrors(chain, WebClientResponseException.class);
+ assertThat(server.getRequestCount()).isEqualTo(2);
+ }
+
+ @Test
+ void prepare_4xxNonRetryable_doesNotRetry() {
+ server.enqueue(new MockResponse().setResponseCode(400));
+
+ TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build();
+ Mono chain = op.prepare(saved).orElseThrow(AssertionError::new);
+
+ assertThatChainErrors(chain, WebClientResponseException.class);
+ assertThat(server.getRequestCount()).isEqualTo(1);
+ }
+
+ // ---- helpers ----
+
+ private void assertThatChainErrors(Mono chain, Class extends Throwable> errorType) {
+ try {
+ chain.block(BLOCK_MAX);
+ throw new AssertionError("expected chain to signal an error of type " + errorType);
+ } catch (RuntimeException e) {
+ Throwable cause = unwrap(e);
+ assertThat(cause).isInstanceOf(errorType);
+ }
+ }
+
+ private static Throwable unwrap(Throwable t) {
+ Throwable cur = t;
+ while (cur.getCause() != null && cur.getCause() != cur) {
+ cur = cur.getCause();
+ }
+ return cur;
+ }
+
+ private static CurrentSnapshotInfo snapshot() {
+ return CurrentSnapshotInfo.builder().snapshotId(SNAPSHOT_ID).summary(summary()).build();
+ }
+
+ private static Map summary() {
+ Map s = new HashMap<>();
+ s.put("total-data-files", "12");
+ s.put("total-files-size", "4096");
+ s.put("added-data-files", "5");
+ s.put("deleted-data-files", "2");
+ s.put("added-files-size", "2048");
+ s.put("removed-files-size", "1024");
+ return s;
+ }
+
+ private static TableDto.TableDtoBuilder optedInBuilder() {
+ Map props = new HashMap<>();
+ props.put(OptimizerStatsPostCommitOperation.OPT_IN_PROPERTY, "true");
+ return TableDto.builder()
+ .tableUUID(TABLE_UUID)
+ .databaseId(DB)
+ .tableId(TABLE)
+ .tableProperties(props);
+ }
+
+ private static TableDto.TableDtoBuilder builderWithoutOptIn() {
+ return TableDto.builder()
+ .tableUUID(TABLE_UUID)
+ .databaseId(DB)
+ .tableId(TABLE)
+ .tableProperties(new HashMap<>());
+ }
+}
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java
new file mode 100644
index 000000000..14b6f2dde
--- /dev/null
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java
@@ -0,0 +1,177 @@
+package com.linkedin.openhouse.tables.services.postcommit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.common.metrics.MetricsConstant;
+import com.linkedin.openhouse.tables.model.TableDto;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+class PostCommitDispatcherTest {
+
+ private static final Duration BLOCK_MAX = Duration.ofSeconds(5);
+
+ private MeterRegistry meterRegistry;
+ private PostCommitProperties properties;
+ private final TableDto savedDto = TableDto.builder().databaseId("db").tableId("t").build();
+
+ @BeforeEach
+ void setUp() {
+ meterRegistry = new SimpleMeterRegistry();
+ properties = new PostCommitProperties();
+ properties.setEnabled(true);
+ properties.setPerOpTimeoutMs(500L);
+ }
+
+ private PostCommitDispatcher dispatcherWith(PostCommitOperation... ops) {
+ return new PostCommitDispatcher(Arrays.asList(ops), properties, meterRegistry);
+ }
+
+ @Test
+ void dispatch_invokesEveryRegisteredOperationsPrepare() {
+ AtomicInteger aCount = new AtomicInteger();
+ AtomicInteger bCount = new AtomicInteger();
+ dispatcherWith(new EmptyOp("a", aCount), new EmptyOp("b", bCount)).dispatch(savedDto);
+
+ assertThat(aCount.get()).isEqualTo(1);
+ assertThat(bCount.get()).isEqualTo(1);
+ }
+
+ @Test
+ void decorate_emptyPrepare_incrementsSkippedAndReturnsEmpty() {
+ PostCommitOperation op = new EmptyOp("opx", new AtomicInteger());
+
+ Optional> decorated = dispatcherWith(op).decorate(op, savedDto);
+
+ assertThat(decorated).isEmpty();
+ assertThat(meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", "opx").count())
+ .isEqualTo(1.0);
+ }
+
+ @Test
+ void decorate_successfulWork_recordsDurationWithSuccessOutcome() {
+ PostCommitOperation op = new SimpleOp("ok", Mono::empty);
+
+ dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX);
+
+ assertThat(
+ meterRegistry
+ .timer(MetricsConstant.POSTCOMMIT_OP_DURATION, "op", "ok", "outcome", "success")
+ .count())
+ .isEqualTo(1L);
+ }
+
+ @Test
+ void decorate_workSignalsError_incrementsFailedWithClassifiedOutcomeAndSwallowsError() {
+ RuntimeException boom = new RuntimeException("boom");
+ PostCommitOperation op = new SimpleOp("broke", () -> Mono.error(boom));
+
+ // Dispatcher swallows the error — block() returns normally.
+ dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX);
+
+ assertThat(
+ meterRegistry
+ .counter(
+ MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "broke", "outcome", "unknown_error")
+ .count())
+ .isEqualTo(1.0);
+ }
+
+ @Test
+ void decorate_prepareThrowsSynchronously_incrementsFailedAndDispatchContinuesToLaterOps() {
+ AtomicInteger laterCount = new AtomicInteger();
+ PostCommitOperation thrower =
+ new PostCommitOperation() {
+ @Override
+ public String name() {
+ return "thrower";
+ }
+
+ @Override
+ public Optional> prepare(TableDto dto) {
+ throw new IllegalStateException("nope");
+ }
+ };
+ PostCommitOperation later = new EmptyOp("later", laterCount);
+
+ dispatcherWith(thrower, later).dispatch(savedDto);
+
+ assertThat(
+ meterRegistry
+ .counter(
+ MetricsConstant.POSTCOMMIT_OP_FAILED,
+ "op",
+ "thrower",
+ "outcome",
+ "prepare_threw")
+ .count())
+ .isEqualTo(1.0);
+ assertThat(laterCount.get())
+ .as("later operations must still run after a synchronous prepare() throw")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void decorate_workExceedsPerOpTimeout_incrementsFailedWithTimeoutOutcome() {
+ properties.setPerOpTimeoutMs(50L);
+ PostCommitOperation op = new SimpleOp("slow", Mono::never);
+
+ dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX);
+
+ assertThat(
+ meterRegistry
+ .counter(MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "slow", "outcome", "timeout")
+ .count())
+ .isEqualTo(1.0);
+ }
+
+ // ---- helpers ----
+
+ private static class EmptyOp implements PostCommitOperation {
+ private final String name;
+ private final AtomicInteger calls;
+
+ EmptyOp(String name, AtomicInteger calls) {
+ this.name = name;
+ this.calls = calls;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Optional> prepare(TableDto dto) {
+ calls.incrementAndGet();
+ return Optional.empty();
+ }
+ }
+
+ private static class SimpleOp implements PostCommitOperation {
+ private final String name;
+ private final java.util.function.Supplier> work;
+
+ SimpleOp(String name, java.util.function.Supplier> work) {
+ this.name = name;
+ this.work = work;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Optional> prepare(TableDto dto) {
+ return Optional.of(work.get());
+ }
+ }
+}