diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java new file mode 100644 index 000000000..cd336e6f5 --- /dev/null +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java @@ -0,0 +1,371 @@ +package com.linkedin.openhouse.jobs.spark; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; +import com.linkedin.openhouse.common.metrics.OtelEmitter; +import com.linkedin.openhouse.jobs.spark.state.StateManager; +import com.linkedin.openhouse.jobs.util.AppConstants; +import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.Builder; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.iceberg.Table; + +/** + * Runs orphan file deletion for a batch of tables in one Spark session. + * + *

Spinning up a Spark session per table is expensive. This app amortizes that cost by processing + * many tables in a single job, using a driver-side thread pool so each table's deletion runs + * concurrently without competing for executors. + * + *

When {@code --resultsEndpoint} is supplied, each table's outcome is POSTed to the optimizer + * service's update-operation endpoint as it completes, letting the service track per-table status + * independently of the overall job. + */ +@Slf4j +public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp { + private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List tableNames; + private final int parallelism; + private final long ttlSeconds; + private final String backupDir; + private final int concurrentDeletes; + private final boolean streamResults; + private final int maxOrphanFileSampleSize; + private final List operationIds; + private final String resultsEndpoint; + + public BatchedOrphanFilesDeletionSparkApp( + String jobId, + StateManager stateManager, + List tableNames, + int parallelism, + long ttlSeconds, + OtelEmitter otelEmitter, + String backupDir, + int concurrentDeletes, + boolean streamResults, + int maxOrphanFileSampleSize, + List operationIds, + String resultsEndpoint) { + super(jobId, stateManager, otelEmitter); + this.tableNames = tableNames; + this.parallelism = parallelism; + this.ttlSeconds = ttlSeconds; + this.backupDir = backupDir; + this.concurrentDeletes = concurrentDeletes; + this.streamResults = streamResults; + this.maxOrphanFileSampleSize = maxOrphanFileSampleSize; + this.operationIds = operationIds; + this.resultsEndpoint = resultsEndpoint; + } + + @Override + protected void runInner(Operations ops) throws Exception { + if (resultsEndpoint != null && operationIds.size() != tableNames.size()) { + throw new IllegalArgumentException( + "operationIds count (" + + operationIds.size() + + ") must equal tableNames count (" + + tableNames.size() + + ") when resultsEndpoint is provided"); + } + + Map tableToOperationId = new HashMap<>(); + if (resultsEndpoint != null) { + for (int i = 0; i < tableNames.size(); i++) { + tableToOperationId.put(tableNames.get(i), operationIds.get(i)); + } + } + + long olderThanTimestampMillis = + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlSeconds); + + log.info( + "Starting batched orphan files deletion for {} tables with parallelism {}", + tableNames.size(), + parallelism); + + int poolSize = Math.min(parallelism, Math.max(1, tableNames.size())); + ExecutorService executor = Executors.newFixedThreadPool(poolSize); + List> futures = new ArrayList<>(); + + for (String tableName : tableNames) { + futures.add( + executor.submit( + () -> { + long startTime = System.currentTimeMillis(); + try { + Table table = ops.getTable(tableName); + boolean backupEnabled = + Boolean.parseBoolean( + table + .properties() + .getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")); + + log.info("Processing orphan files for table: {}", tableName); + Operations.OrphanFilesResult result = + ops.deleteOrphanFilesWithMetrics( + table, + olderThanTimestampMillis, + backupEnabled, + backupDir, + concurrentDeletes, + streamResults, + maxOrphanFileSampleSize); + + List orphanFiles = + Lists.newArrayList(result.orphanFileLocations().iterator()); + long durationMs = System.currentTimeMillis() - startTime; + + log.info( + "Successfully processed table {}: {} orphan files deleted in {}ms", + tableName, + orphanFiles.size(), + durationMs); + + return OrphanDeletionResult.success( + tableName, orphanFiles.size(), result.getBytesDeleted(), durationMs); + + } catch (Exception e) { + long durationMs = System.currentTimeMillis() - startTime; + log.error("Failed to process table: {}", tableName, e); + return OrphanDeletionResult.failure(tableName, e, durationMs); + } + })); + } + + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + + List allResults = new ArrayList<>(); + for (Future future : futures) { + try { + allResults.add(future.get()); + } catch (ExecutionException e) { + throw new RuntimeException("Unexpected exception in table processing", e.getCause()); + } + } + + reportResults(allResults, tableToOperationId); + } + + private void reportResults( + List results, Map tableToOperationId) throws Exception { + OkHttpClient client = + resultsEndpoint != null + ? new OkHttpClient.Builder() + .connectTimeout(10, TimeUnit.SECONDS) + .readTimeout(30, TimeUnit.SECONDS) + .build() + : null; + + int failureCount = 0; + for (OrphanDeletionResult result : results) { + if (result.isSuccess()) { + otelEmitter.count( + METRICS_SCOPE, + AppConstants.ORPHAN_FILE_COUNT, + result.getOrphanFilesDeleted(), + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), result.getTableName())); + } else { + failureCount++; + } + + if (client != null) { + String opId = tableToOperationId.get(result.getTableName()); + if (opId != null) { + updateOperation(client, opId, result); + } + } + } + + if (client == null && failureCount > 0) { + throw new RuntimeException(failureCount + " table(s) failed in batch"); + } + if (client != null && failureCount == results.size()) { + throw new RuntimeException("All tables failed in batch"); + } + } + + private void updateOperation(OkHttpClient client, String id, OrphanDeletionResult result) + throws Exception { + OperationResult opResult = + result.isSuccess() + ? OperationResult.success(result.getOrphanFilesDeleted(), result.getBytesDeleted()) + : OperationResult.failure(result.getErrorMessage(), result.getErrorType()); + + String json = OBJECT_MAPPER.writeValueAsString(opResult); + RequestBody body = RequestBody.create(json, MediaType.parse("application/json; charset=utf-8")); + Request request = + new Request.Builder().url(resultsEndpoint + "/" + id + "/update").post(body).build(); + try (Response response = client.newCall(request).execute()) { + int code = response.code(); + if (code < 200 || code >= 300) { + throw new RuntimeException("POST operation/" + id + "/update returned HTTP " + code); + } + } + } + + /** Result of orphan deletion for a single table. */ + @Value + @Builder + public static class OrphanDeletionResult implements Serializable { + String tableName; + boolean success; + int orphanFilesDeleted; + long bytesDeleted; + long durationMs; + String errorMessage; + String errorType; + + public static OrphanDeletionResult success( + String tableName, int orphanFileCount, long bytesDeleted, long durationMs) { + return OrphanDeletionResult.builder() + .tableName(tableName) + .success(true) + .orphanFilesDeleted(orphanFileCount) + .bytesDeleted(bytesDeleted) + .durationMs(durationMs) + .build(); + } + + public static OrphanDeletionResult failure(String tableName, Throwable error, long durationMs) { + return OrphanDeletionResult.builder() + .tableName(tableName) + .success(false) + .orphanFilesDeleted(0) + .bytesDeleted(0) + .durationMs(durationMs) + .errorMessage(error.getMessage()) + .errorType(error.getClass().getSimpleName()) + .build(); + } + } + + /** POST payload sent to the optimizer service's update-operation endpoint. */ + @JsonInclude(JsonInclude.Include.NON_NULL) + static class OperationResult { + public final String status; + public final Integer orphanFilesDeleted; + public final Long orphanBytesDeleted; + public final String errorMessage; + public final String errorType; + + private OperationResult( + String status, + Integer orphanFilesDeleted, + Long orphanBytesDeleted, + String errorMessage, + String errorType) { + this.status = status; + this.orphanFilesDeleted = orphanFilesDeleted; + this.orphanBytesDeleted = orphanBytesDeleted; + this.errorMessage = errorMessage; + this.errorType = errorType; + } + + static OperationResult success(int orphanFilesDeleted, long orphanBytesDeleted) { + return new OperationResult("SUCCESS", orphanFilesDeleted, orphanBytesDeleted, null, null); + } + + static OperationResult failure(String errorMessage, String errorType) { + return new OperationResult("FAILED", null, null, errorMessage, errorType); + } + } + + public static void main(String[] args) { + OtelEmitter otelEmitter = + new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry())); + createApp(args, otelEmitter).run(); + } + + public static BatchedOrphanFilesDeletionSparkApp createApp( + String[] args, OtelEmitter otelEmitter) { + List

These tests run the app's logic in-process against a real Spark session backed by an in-memory + * Iceberg catalog ({@link OpenHouseSparkITest}). The app is never submitted as an external Spark + * job; {@code runInner(ops)} is called directly. + * + *

When testing the per-table complete-operation callback path, the Optimizer service is replaced + * by an embedded {@link com.sun.net.httpserver.HttpServer} that captures request bodies. This + * verifies the shape and content of the JSON payloads without requiring a live service. + */ +@Slf4j +public class BatchedOrphanFilesDeletionSparkAppTest extends OpenHouseSparkITest { + private static final int MAX_SAMPLE_SIZE = 20000; + + private final OtelEmitter otelEmitter = + new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry())); + + @Test + public void testSuccessfulOrphanFilesDeletionForMultipleTables() throws Exception { + final List tableNames = Arrays.asList("db.test_batch1", "db.test_batch2"); + final int parallelism = 2; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + for (String tableName : tableNames) { + prepareTable(ops, tableName); + populateTable(ops, tableName, 2); + } + + BatchedOrphanFilesDeletionSparkApp app = + newApp( + tableNames, parallelism, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null); + + app.runInner(ops); + + for (String tableName : tableNames) { + Table table = ops.getTable(tableName); + Assertions.assertNotNull(table); + } + } + } + + @Test + public void testBatchedDeletionWithSingleTable() throws Exception { + final List tableNames = Arrays.asList("db.test_single"); + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + prepareTable(ops, tableNames.get(0)); + populateTable(ops, tableNames.get(0), 3); + + BatchedOrphanFilesDeletionSparkApp app = + newApp(tableNames, 1, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null); + + app.runInner(ops); + + Assertions.assertNotNull(ops.getTable(tableNames.get(0))); + } + } + + @Test + public void testBatchedDeletionWithInvalidTable() throws Exception { + final List tableNames = + Arrays.asList("db.test_valid1", "db.nonexistent_table", "db.test_valid2"); + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + prepareTable(ops, "db.test_valid1"); + populateTable(ops, "db.test_valid1", 2); + prepareTable(ops, "db.test_valid2"); + populateTable(ops, "db.test_valid2", 2); + + BatchedOrphanFilesDeletionSparkApp app = + newApp(tableNames, 3, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null); + + Assertions.assertThrows(RuntimeException.class, () -> app.runInner(ops)); + } + } + + @Test + public void testBatchedDeletionWithEmptyTables() throws Exception { + final List tableNames = Arrays.asList("db.test_empty1", "db.test_empty2"); + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + for (String tableName : tableNames) { + prepareTable(ops, tableName); + } + + BatchedOrphanFilesDeletionSparkApp app = + newApp(tableNames, 2, TimeUnit.DAYS.toSeconds(1), Collections.emptyList(), null); + + app.runInner(ops); + + for (String tableName : tableNames) { + Assertions.assertNotNull(ops.getTable(tableName)); + } + } + } + + @Test + public void testOrphanDeletionResultSuccess() { + BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult result = + BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult.success( + "db.test_result", 42, 1024L, 5000L); + + Assertions.assertTrue(result.isSuccess()); + Assertions.assertEquals("db.test_result", result.getTableName()); + Assertions.assertEquals(42, result.getOrphanFilesDeleted()); + Assertions.assertEquals(1024L, result.getBytesDeleted()); + Assertions.assertEquals(5000L, result.getDurationMs()); + Assertions.assertNull(result.getErrorMessage()); + Assertions.assertNull(result.getErrorType()); + } + + @Test + public void testOrphanDeletionResultFailure() { + BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult result = + BatchedOrphanFilesDeletionSparkApp.OrphanDeletionResult.failure( + "db.test_failure", new RuntimeException("Test error"), 1000L); + + Assertions.assertFalse(result.isSuccess()); + Assertions.assertEquals("db.test_failure", result.getTableName()); + Assertions.assertEquals(0, result.getOrphanFilesDeleted()); + Assertions.assertEquals(0, result.getBytesDeleted()); + Assertions.assertEquals(1000L, result.getDurationMs()); + Assertions.assertEquals("Test error", result.getErrorMessage()); + Assertions.assertEquals("RuntimeException", result.getErrorType()); + } + + @Test + public void testCreateAppFromCommandLineArgs() { + String[] args = { + "--jobId", "test-job-123", + "--storageURL", "http://localhost:8080", + "--tableNames", "db.table1,db.table2,db.table3", + "--parallelism", "5", + "--ttl", "86400", + "--backupDir", "/backup", + "--concurrentDeletes", "20", + "--operationIds", "uuid-1,uuid-2,uuid-3", + "--resultsEndpoint", "http://localhost:8083/v1/optimizer/operations" + }; + + Assertions.assertNotNull(BatchedOrphanFilesDeletionSparkApp.createApp(args, otelEmitter)); + } + + @Test + public void testCreateAppWithDefaultValues() { + String[] args = { + "--jobId", "test-job-456", + "--storageURL", "http://localhost:8080", + "--tableNames", "db.table1" + }; + + Assertions.assertNotNull(BatchedOrphanFilesDeletionSparkApp.createApp(args, otelEmitter)); + } + + @Test + public void testCreateAppWithOperationIdsAndEndpoint() { + String[] args = { + "--jobId", "test-job-789", + "--storageURL", "http://localhost:8080", + "--tableNames", "db.table1,db.table2", + "--operationIds", "uuid-1,uuid-2", + "--resultsEndpoint", "http://localhost:8083/v1/optimizer/operations" + }; + + Assertions.assertNotNull(BatchedOrphanFilesDeletionSparkApp.createApp(args, otelEmitter)); + } + + @Test + public void testBatchedDeletionWithActualOrphanFiles() throws Exception { + final String tableName = "db.test_orphans"; + final List tableNames = Arrays.asList(tableName); + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + prepareTable(ops, tableName); + populateTable(ops, tableName, 3); + + Assertions.assertNotNull(ops.getTable(tableName).currentSnapshot()); + + // TTL=0 bypasses the minimum-age guard so seeded orphan files are eligible. + BatchedOrphanFilesDeletionSparkApp app = + newApp(tableNames, 1, 0L, Collections.emptyList(), null); + + app.runInner(ops); + + Assertions.assertNotNull(ops.getTable(tableName).currentSnapshot()); + } + } + + @Test + public void testBatchedDeletionPartialSuccessWithEndpoint() throws Exception { + final List tableNames = Arrays.asList("db.test_ep_valid", "db.nonexistent_ep"); + final List operationIds = Arrays.asList("op-100", "op-200"); + + List receivedBodies = Collections.synchronizedList(new ArrayList<>()); + HttpServer httpServer = HttpServer.create(new InetSocketAddress(0), 0); + httpServer.createContext( + "/ops", + exchange -> { + byte[] body = exchange.getRequestBody().readAllBytes(); + receivedBodies.add(new String(body, StandardCharsets.UTF_8)); + exchange.sendResponseHeaders(200, 0); + exchange.close(); + }); + httpServer.start(); + String endpoint = "http://localhost:" + httpServer.getAddress().getPort() + "/ops"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + prepareTable(ops, "db.test_ep_valid"); + populateTable(ops, "db.test_ep_valid", 2); + + BatchedOrphanFilesDeletionSparkApp app = + newApp(tableNames, 2, TimeUnit.DAYS.toSeconds(1), operationIds, endpoint); + + app.runInner(ops); + + Assertions.assertEquals(2, receivedBodies.size()); + long successCount = receivedBodies.stream().filter(b -> b.contains("\"SUCCESS\"")).count(); + long failureCount = receivedBodies.stream().filter(b -> b.contains("\"FAILED\"")).count(); + Assertions.assertEquals(1, successCount); + Assertions.assertEquals(1, failureCount); + + // SUCCESS payload carries orphan-file/byte metrics; no error fields. + String successBody = + receivedBodies.stream().filter(b -> b.contains("\"SUCCESS\"")).findFirst().get(); + Assertions.assertTrue(successBody.contains("\"orphanFilesDeleted\"")); + Assertions.assertTrue(successBody.contains("\"orphanBytesDeleted\"")); + Assertions.assertFalse(successBody.contains("\"errorMessage\"")); + Assertions.assertFalse(successBody.contains("\"errorType\"")); + + // FAILED payload carries error fields; no orphan metrics. + String failureBody = + receivedBodies.stream().filter(b -> b.contains("\"FAILED\"")).findFirst().get(); + Assertions.assertTrue(failureBody.contains("\"errorMessage\"")); + Assertions.assertTrue(failureBody.contains("\"errorType\"")); + Assertions.assertFalse(failureBody.contains("\"orphanFilesDeleted\"")); + Assertions.assertFalse(failureBody.contains("\"orphanBytesDeleted\"")); + } finally { + httpServer.stop(0); + } + } + + @Test + public void testBatchedDeletionAllFailWithEndpoint() throws Exception { + final List tableNames = Arrays.asList("db.nonexistent_ep1", "db.nonexistent_ep2"); + final List operationIds = Arrays.asList("op-300", "op-400"); + + List receivedBodies = Collections.synchronizedList(new ArrayList<>()); + HttpServer httpServer = HttpServer.create(new InetSocketAddress(0), 0); + httpServer.createContext( + "/ops", + exchange -> { + byte[] body = exchange.getRequestBody().readAllBytes(); + receivedBodies.add(new String(body, StandardCharsets.UTF_8)); + exchange.sendResponseHeaders(200, 0); + exchange.close(); + }); + httpServer.start(); + String endpoint = "http://localhost:" + httpServer.getAddress().getPort() + "/ops"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + BatchedOrphanFilesDeletionSparkApp app = + newApp(tableNames, 2, TimeUnit.DAYS.toSeconds(1), operationIds, endpoint); + + Assertions.assertThrows(RuntimeException.class, () -> app.runInner(ops)); + + Assertions.assertEquals(2, receivedBodies.size()); + long failures = receivedBodies.stream().filter(b -> b.contains("\"FAILED\"")).count(); + Assertions.assertEquals(2, failures); + for (String body : receivedBodies) { + Assertions.assertTrue(body.contains("\"errorMessage\"")); + Assertions.assertTrue(body.contains("\"errorType\"")); + } + } finally { + httpServer.stop(0); + } + } + + @Test + public void testBatchedDeletionMismatchedOperationIdsThrows() throws Exception { + final List tableNames = Arrays.asList("db.table1", "db.table2"); + final List operationIds = Arrays.asList("op-1", "op-2", "op-3"); + + BatchedOrphanFilesDeletionSparkApp app = + newApp( + tableNames, 2, TimeUnit.DAYS.toSeconds(1), operationIds, "http://localhost:9999/ops"); + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + Assertions.assertThrows(IllegalArgumentException.class, () -> app.runInner(ops)); + } + } + + private BatchedOrphanFilesDeletionSparkApp newApp( + List tableNames, + int parallelism, + long ttlSeconds, + List operationIds, + String resultsEndpoint) { + return new BatchedOrphanFilesDeletionSparkApp( + "test-job", + null, + tableNames, + parallelism, + ttlSeconds, + otelEmitter, + ".backup", + 10, + false, + MAX_SAMPLE_SIZE, + operationIds, + resultsEndpoint); + } + + private static void prepareTable(Operations ops, String tableName) { + ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show(); + ops.spark().sql(String.format("CREATE TABLE %s (data string, ts timestamp)", tableName)).show(); + } + + private static void populateTable(Operations ops, String tableName, int numRows) { + for (int row = 0; row < numRows; ++row) { + ops.spark() + .sql(String.format("INSERT INTO %s VALUES ('v%d', current_timestamp())", tableName, row)) + .show(); + } + } +} diff --git a/build.gradle b/build.gradle index ec75fd89d..1f951a91c 100644 --- a/build.gradle +++ b/build.gradle @@ -178,6 +178,8 @@ tasks.register('CopyGitHooksTask', Copy) { // housetables-service.Dockerfile -> :services:housetables:bootJar // jobs-service.Dockerfile -> :services:jobs:bootJar // optimizer-service.Dockerfile -> :services:optimizer:bootJar +// optimizer-analyzer.Dockerfile -> :apps:optimizer:analyzerapp:bootJar +// optimizer-scheduler.Dockerfile -> :apps:optimizer:schedulerapp:bootJar // jobs-scheduler.Dockerfile -> :apps:openhouse-spark-apps_2.12:shadowJar (uber JAR) // spark-base-hadoop2.8.dockerfile -> // :integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar (uber JAR) @@ -198,6 +200,8 @@ tasks.register('dockerPrereqs') { dependsOn ':services:housetables:bootJar' dependsOn ':services:jobs:bootJar' dependsOn ':services:optimizer:bootJar' + dependsOn ':apps:optimizer:analyzerapp:bootJar' + dependsOn ':apps:optimizer:schedulerapp:bootJar' // Spark runtime uber JARs (shadowJar) dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar' @@ -222,6 +226,8 @@ tasks.register('dockerPrereqs') { println ' build/housetables/libs/housetables.jar' println ' build/jobs/libs/jobs.jar' println ' build/optimizer/libs/optimizer.jar' + println ' build/analyzerapp/libs/analyzerapp.jar' + println ' build/schedulerapp/libs/schedulerapp.jar' println ' build/openhouse-spark-runtime_2.12/libs/openhouse-spark-runtime_2.12-uber.jar' println ' build/openhouse-spark-3.5-runtime_2.12/libs/openhouse-spark-3.5-runtime_2.12-uber.jar' println ' build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-uber.jar' diff --git a/demo-check.sh b/demo-check.sh new file mode 100755 index 000000000..d7c60c55d --- /dev/null +++ b/demo-check.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +# demo-check.sh — Run after demo-setup.sh. Verifies SUCCESS history rows and +# shows HDFS file counts per table. +set -e + +OPT_API="http://localhost:8005" + +if [ ! -f /tmp/demo_ofd_locs.txt ] || [ ! -f /tmp/demo_ofd_uuids.txt ]; then + echo "ERROR: /tmp/demo_ofd_locs.txt or /tmp/demo_ofd_uuids.txt not found — run demo-setup.sh first" + exit 1 +fi + +echo "=== HDFS data files after OFD ===" +while IFS='=' read -r TABLE TABLE_LOC; do + FILE_COUNT=$(docker exec local.namenode \ + hdfs dfs -ls -R "$TABLE_LOC/data/" 2>/dev/null \ + | grep -c "\.orc" || echo "0") + echo " $TABLE: $FILE_COUNT files remaining" +done < /tmp/demo_ofd_locs.txt + +echo "" +echo "=== Operation history per table ===" +while IFS='=' read -r TABLE TABLE_UUID; do + echo " --- $TABLE ($TABLE_UUID) ---" + curl -sf "$OPT_API/v1/optimizer/operations-history/$TABLE_UUID?limit=10" \ + | jq '.[] | {status, orphanFilesDeleted, orphanBytesDeleted, errorMessage, completedAt}' +done < /tmp/demo_ofd_uuids.txt diff --git a/demo-setup.sh b/demo-setup.sh new file mode 100755 index 000000000..6fc5491f7 --- /dev/null +++ b/demo-setup.sh @@ -0,0 +1,174 @@ +#!/usr/bin/env bash +# demo-setup.sh — Populate tables, expire snapshots, wait for the continuous +# analyzer + scheduler to schedule OFD jobs. +# +# Prerequisites: ./gradlew dockerUp (stack must be healthy) +# +# After this script completes, batched-OFD Spark jobs have been submitted via +# the continuous scheduler. Use demo-check.sh to watch for completion. +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +TOKEN=$(cat "$SCRIPT_DIR/services/common/src/main/resources/dummy.token") +TABLES_API="http://localhost:8000" +JOBS_API="http://localhost:8002/jobs" +OPT_API="http://localhost:8005" +LIVY_API="http://localhost:9003" + +TABLES="demo_ofd_a:5 demo_ofd_b:7 demo_ofd_c:4" +TABLE_COUNT=3 +ANALYZER_WAIT_SECS=90 +SCHEDULER_WAIT_SECS=90 + +wait_for_job() { + local JOB_ID="$1" LABEL="$2" MAX_SECS="${3:-180}" + local i=0 + while [ $i -lt "$MAX_SECS" ]; do + STATE=$(curl -sf "$JOBS_API/$JOB_ID" | jq -r '.state // empty' 2>/dev/null || echo "") + [ "$STATE" = "SUCCEEDED" ] && return 0 + [ "$STATE" = "FAILED" ] && { echo "FAIL: $LABEL job $JOB_ID FAILED"; exit 1; } + sleep 5 + i=$((i + 5)) + done + echo "FAIL: $LABEL job $JOB_ID timed out after ${MAX_SECS}s (last state: $STATE)" + exit 1 +} + +kill_idle_session() { + IDLE=$(curl -sf "$LIVY_API/sessions" \ + | jq -r '[.sessions[] | select(.state=="idle")] | first | .id // empty') + if [ -n "$IDLE" ]; then + curl -sf -X DELETE "$LIVY_API/sessions/$IDLE" > /dev/null + echo " Freed idle Livy session $IDLE" + fi +} + +wait_for_count() { + local URL="$1" EXPECTED="$2" LABEL="$3" MAX_SECS="$4" + local i=0 + while [ $i -lt "$MAX_SECS" ]; do + COUNT=$(curl -sf "$URL" | jq 'length' 2>/dev/null || echo "0") + [ "$COUNT" -ge "$EXPECTED" ] && return 0 + printf " %s: %d/%d (waited %ds)\r" "$LABEL" "$COUNT" "$EXPECTED" "$i" + sleep 5 + i=$((i + 5)) + done + echo "" + echo "FAIL: $LABEL expected $EXPECTED, got $COUNT after ${MAX_SECS}s" + exit 1 +} + +echo "=== [1/4] Create tables and populate with data ===" +rm -f /tmp/demo_ofd_locs.txt /tmp/demo_ofd_uuids.txt + +for entry in $TABLES; do + TABLE="${entry%%:*}" + WRITES="${entry##*:}" + ORPHANS=$((WRITES - 2)) + + "$SCRIPT_DIR/local-spark-sql.sh" "DROP TABLE IF EXISTS openhouse.db1.$TABLE" > /dev/null + "$SCRIPT_DIR/local-spark-sql.sh" "CREATE TABLE openhouse.db1.$TABLE ( + id STRING, val STRING + ) TBLPROPERTIES ( + 'maintenance.optimizer.ofd.enabled'='true', + 'maintenance.optimizer.stats.enabled'='true' + )" > /dev/null + + for i in $(seq 1 "$WRITES"); do + "$SCRIPT_DIR/local-spark-sql.sh" \ + "INSERT OVERWRITE openhouse.db1.$TABLE VALUES ('$i', 'row$i')" > /dev/null + printf " $TABLE: insert %d/%d\r" "$i" "$WRITES" + done + echo "" + + TBL_JSON=$(curl -sf -H "Authorization: Bearer $TOKEN" \ + "$TABLES_API/v1/databases/db1/tables/$TABLE") + TABLE_LOC=$(dirname "$(echo "$TBL_JSON" | jq -r '.tableLocation')") + TABLE_UUID=$(echo "$TBL_JSON" | jq -r '.tableUUID') + echo " $TABLE -> $TABLE_LOC ($WRITES snapshots, $ORPHANS will become orphans)" + echo " $TABLE uuid=$TABLE_UUID" + echo "$TABLE=$TABLE_LOC" >> /tmp/demo_ofd_locs.txt + echo "$TABLE=$TABLE_UUID" >> /tmp/demo_ofd_uuids.txt +done + +# Stats push is async fire-and-forget; the dispatcher subscribes on the Netty event +# loop after the commit thread returns. Poll briefly for the rows to settle so we +# don't false-fail before the loop wakes up. +echo "" +echo "=== [Wait] Tables on-commit stats push to land in optimizer DB ===" +STATS_WAIT_SECS=30 +i=0 +STATS_COUNT=0 +while [ $i -lt "$STATS_WAIT_SECS" ]; do + STATS_COUNT=$(curl -sf "$OPT_API/v1/optimizer/stats?limit=100" | jq 'length') + [ "$STATS_COUNT" -ge "$TABLE_COUNT" ] && break + printf " stats rows: %d/%d (waited %ds)\r" "$STATS_COUNT" "$TABLE_COUNT" "$i" + sleep 2 + i=$((i + 2)) +done +echo "" +[ "$STATS_COUNT" -ge "$TABLE_COUNT" ] \ + || { echo "FAIL: expected $TABLE_COUNT stats rows, got $STATS_COUNT after ${STATS_WAIT_SECS}s"; exit 1; } +echo "PASS: $STATS_COUNT stats rows posted by Tables Service on-commit hook" + +# Verify the new payload shape carries snapshot stats, not just identity fields. +# Pick one table and confirm the optimizer recorded a non-zero numCurrentFiles, a +# non-zero tableSizeBytes, and the snapshot ID we sent. +SAMPLE_UUID=$(head -n1 /tmp/demo_ofd_uuids.txt | cut -d= -f2) +SAMPLE_ROW=$(curl -sf "$OPT_API/v1/optimizer/stats/$SAMPLE_UUID") +SAMPLE_NUM_FILES=$(echo "$SAMPLE_ROW" | jq -r '.stats.snapshot.numCurrentFiles // 0') +SAMPLE_SIZE=$(echo "$SAMPLE_ROW" | jq -r '.stats.snapshot.tableSizeBytes // 0') +SAMPLE_SNAPSHOT_ID=$(echo "$SAMPLE_ROW" | jq -r '.stats.snapshot.snapshotId // empty') +[ "$SAMPLE_NUM_FILES" -gt 0 ] \ + || { echo "FAIL: sample stats row has numCurrentFiles=$SAMPLE_NUM_FILES, expected > 0"; exit 1; } +[ "$SAMPLE_SIZE" -gt 0 ] \ + || { echo "FAIL: sample stats row has tableSizeBytes=$SAMPLE_SIZE, expected > 0"; exit 1; } +[ -n "$SAMPLE_SNAPSHOT_ID" ] \ + || { echo "FAIL: sample stats row missing snapshotId"; exit 1; } +echo "PASS: sample stats row carries snapshot payload (files=$SAMPLE_NUM_FILES, bytes=$SAMPLE_SIZE, snapshotId=$SAMPLE_SNAPSHOT_ID)" + +echo "" +echo "=== [2/4] Expire old snapshots (creates orphan data files) ===" +kill_idle_session + +EXPIRE_JOBS="" +for entry in $TABLES; do + TABLE="${entry%%:*}" + BODY=$(jq -n --arg n "demo-expire-$TABLE" --arg t "db1.$TABLE" \ + '{jobName:$n, clusterId:"LocalHadoopCluster", + jobConf:{jobType:"SNAPSHOTS_EXPIRATION", + args:["--tableName",$t,"--maxAge","1","--granularity","days","--versions","1"]}}') + JOB_ID=$(curl -sf -X POST "$JOBS_API" -H "Content-Type: application/json" -d "$BODY" \ + | jq -r '.jobId') + [ -n "$JOB_ID" ] || { echo "FAIL: could not submit expiration job for $TABLE"; exit 1; } + echo " $TABLE: submitted $JOB_ID" + EXPIRE_JOBS="$EXPIRE_JOBS $JOB_ID" +done + +for JOB_ID in $EXPIRE_JOBS; do + wait_for_job "$JOB_ID" "snapshot-expiration" 180 + echo " $JOB_ID: SUCCEEDED" +done + +while IFS='=' read -r TABLE TABLE_LOC; do + COUNT=$(docker exec local.namenode \ + hdfs dfs -ls -R "$TABLE_LOC/data/" 2>/dev/null | grep -c "\.orc" || echo "0") + [ "$COUNT" -ge 2 ] \ + || { echo "FAIL: $TABLE has $COUNT files after expiration, expected >= 2"; exit 1; } + echo " $TABLE: $COUNT files on HDFS ($((COUNT-1)) orphans + 1 live)" +done < /tmp/demo_ofd_locs.txt + +echo "" +echo "=== [3/4] Wait for analyzer → scheduler → SCHEDULED ===" +echo " (analyzer and scheduler both run every ~30s; PENDING is transient)" +wait_for_count "$OPT_API/v1/optimizer/operations?status=SCHEDULED&limit=100" \ + "$TABLE_COUNT" "SCHEDULED ops" "$(($ANALYZER_WAIT_SECS + $SCHEDULER_WAIT_SECS))" +echo "" +echo "PASS: $TABLE_COUNT operations SCHEDULED" + +echo "" +echo "=== [4/4] (skip) — SCHEDULED is the SUCCESS-readiness signal ===" + +echo "" +echo "Setup complete. Batched-OFD Spark jobs have been launched." +echo "Run ./demo-check.sh to verify SUCCESS in history + orphan files deleted." diff --git a/demo.sh b/demo.sh new file mode 100755 index 000000000..89ea74af3 --- /dev/null +++ b/demo.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# demo.sh — Full end-to-end optimizer demo from a clean docker stack. +# +# Prerequisites: ./gradlew dockerUp (stack must be healthy) +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +OPT_API="http://localhost:8005" + +TABLE_COUNT=3 +SUCCESS_WAIT_SECS=300 + +bash "$SCRIPT_DIR/demo-setup.sh" + +echo "" +echo "=== [Wait] OFD Spark jobs to complete (up to 5 min) ===" +for i in $(seq 1 30); do + TOTAL_HISTORY=0 + SUCCESS_HISTORY=0 + if [ -f /tmp/demo_ofd_uuids.txt ]; then + while IFS='=' read -r _ UUID; do + LATEST=$(curl -sf "$OPT_API/v1/optimizer/operations-history/$UUID?limit=1" \ + | jq -r '.[0].status // empty' 2>/dev/null || echo "") + [ -n "$LATEST" ] && TOTAL_HISTORY=$((TOTAL_HISTORY + 1)) + [ "$LATEST" = "SUCCESS" ] && SUCCESS_HISTORY=$((SUCCESS_HISTORY + 1)) + done < /tmp/demo_ofd_uuids.txt + fi + [ "$SUCCESS_HISTORY" -ge "$TABLE_COUNT" ] && break + echo " $i/30: $SUCCESS_HISTORY SUCCESS / $TOTAL_HISTORY total history rows..." + sleep 10 +done + +[ "$SUCCESS_HISTORY" -ge "$TABLE_COUNT" ] \ + || { echo "FAIL: only $SUCCESS_HISTORY/$TABLE_COUNT operations reached SUCCESS"; bash "$SCRIPT_DIR/demo-check.sh"; exit 1; } + +echo "" +echo "PASS: all $TABLE_COUNT operations completed with SUCCESS" +echo "" +bash "$SCRIPT_DIR/demo-check.sh" diff --git a/infra/recipes/docker-compose/common/oh-services.yml b/infra/recipes/docker-compose/common/oh-services.yml index 73ef5a656..71c4f889e 100644 --- a/infra/recipes/docker-compose/common/oh-services.yml +++ b/infra/recipes/docker-compose/common/oh-services.yml @@ -22,6 +22,20 @@ services: build: context: ../../../.. dockerfile: jobs-scheduler.Dockerfile + openhouse-optimizer: + build: + context: ../../../.. + dockerfile: optimizer-service.Dockerfile + ports: + - 8005:8080 + openhouse-optimizer-analyzer: + build: + context: ../../../.. + dockerfile: optimizer-analyzer.Dockerfile + openhouse-optimizer-scheduler: + build: + context: ../../../.. + dockerfile: optimizer-scheduler.Dockerfile prometheus: image: prom/prometheus:v2.21.0 ports: diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml index bb72176c5..81d142815 100644 --- a/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml +++ b/infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml @@ -23,6 +23,8 @@ cluster: database: type: "MYSQL" url: "jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false" + optimizer: + base-uri: "http://openhouse-optimizer:8080" security: token: interceptor: diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml b/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml index 0b8df01ac..bcf4787bf 100644 --- a/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml +++ b/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml @@ -22,6 +22,10 @@ services: - OTEL_METRICS_EXPORTER=none - OTEL_LOGS_EXPORTER=none - OTEL_TRACES_SAMPLER=always_on + # Post-commit operation framework: enable the dispatcher and the optimizer-stats op. + - TABLES_POSTCOMMIT_ENABLED=true + - OPTIMIZER_STATS_ENABLED=true + - OPTIMIZER_STATS_BASE_URI=http://openhouse-optimizer:8080 openhouse-jobs: container_name: local.openhouse-jobs @@ -81,6 +85,53 @@ services: - OTEL_LOGS_EXPORTER=none - OTEL_TRACES_SAMPLER=always_on + openhouse-optimizer: + container_name: local.openhouse-optimizer + extends: + file: ../common/oh-services.yml + service: openhouse-optimizer + volumes: + - ./:/var/config/ + depends_on: + - mysql + environment: + - OPTIMIZER_DB_URL=jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false + - OPTIMIZER_DB_USER=oh_user + - OPTIMIZER_DB_PASSWORD=oh_password + + openhouse-optimizer-analyzer: + container_name: local.openhouse-optimizer-analyzer + extends: + file: ../common/oh-services.yml + service: openhouse-optimizer-analyzer + volumes: + - ./:/var/config/ + depends_on: + - openhouse-optimizer + environment: + - OPTIMIZER_DB_URL=jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false + - OPTIMIZER_DB_USER=oh_user + - OPTIMIZER_DB_PASSWORD=oh_password + - ANALYZER_INTERVAL_SECONDS=30 + + openhouse-optimizer-scheduler: + container_name: local.openhouse-optimizer-scheduler + extends: + file: ../common/oh-services.yml + service: openhouse-optimizer-scheduler + volumes: + - ./:/var/config/ + depends_on: + - openhouse-optimizer + - openhouse-jobs + environment: + - OPTIMIZER_DB_URL=jdbc:mysql://mysql:3306/oh_db?allowPublicKeyRetrieval=true&useSSL=false + - OPTIMIZER_DB_USER=oh_user + - OPTIMIZER_DB_PASSWORD=oh_password + - JOBS_BASE_URI=http://openhouse-jobs:8080 + - SCHEDULER_RESULTS_ENDPOINT=http://openhouse-optimizer:8080/v1/optimizer/operations + - SCHEDULER_INTERVAL_SECONDS=30 + jaeger: container_name: local.jaeger image: jaegertracing/all-in-one:1.54 diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml index 3e795d4db..7a0c51e01 100644 --- a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml +++ b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml @@ -51,8 +51,8 @@ jobs: << : *apps-defaults << : *livy-engine - type: ORPHAN_FILES_DELETION - class-name: com.linkedin.openhouse.jobs.spark.OrphanFilesDeletionSparkApp - args: ["--backupDir", ".backup"] + class-name: com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp + args: ["--backupDir", ".backup", "--ttl", "0"] <<: *apps-defaults spark-properties: <<: *spark-defaults diff --git a/local-spark-sql.sh b/local-spark-sql.sh new file mode 100755 index 000000000..45582f698 --- /dev/null +++ b/local-spark-sql.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash +# local-spark-sql.sh — Execute a SQL statement against the local OpenHouse docker cluster. +# +# Reuses an existing idle Livy session if one exists. First run pays the cold start (~20s), +# subsequent runs execute immediately. +# +# Usage: ./local-spark-sql.sh "INSERT INTO openhouse.db1.smoke_tbl VALUES ('3', 'c')" +# ./local-spark-sql.sh --kill # tear down the reusable session +set -e + +LIVY="http://localhost:9003" +TOKEN=$(cat /Users/mkuchenb/code/openhouse/services/common/src/main/resources/dummy.token) + +if [ "$1" = "--kill" ]; then + SESSIONS=$(curl -sf "$LIVY/sessions" | jq -r '.sessions[].id') + for sid in $SESSIONS; do + curl -sf -X DELETE "$LIVY/sessions/$sid" > /dev/null 2>&1 + echo "Killed session $sid" + done + exit 0 +fi + +if [ -z "$1" ]; then + echo "Usage: local-spark-sql.sh \"\"" + echo " local-spark-sql.sh --kill" + exit 1 +fi + +SQL="$1" + +SESSION_ID=$(curl -sf "$LIVY/sessions" | jq -r '[.sessions[] | select(.state == "idle")] | first | .id // empty') + +if [ -z "$SESSION_ID" ]; then + SPARK_CONF=$(jq -n --arg token "$TOKEN" '{ + kind: "sql", + conf: { + "spark.jars": "local:/opt/spark/openhouse-spark-runtime_2.12-latest-all.jar", + "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions", + "spark.sql.catalog.openhouse": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.openhouse.catalog-impl": "com.linkedin.openhouse.spark.OpenHouseCatalog", + "spark.sql.catalog.openhouse.uri": "http://openhouse-tables:8080", + "spark.sql.catalog.openhouse.auth-token": $token, + "spark.sql.catalog.openhouse.cluster": "LocalHadoopCluster" + } + }') + + echo "No idle session found, creating one..." + SESSION_ID=$(curl -sf -X POST "$LIVY/sessions" \ + -H "Content-Type: application/json" \ + -d "$SPARK_CONF" | jq -r '.id') + + for i in $(seq 1 60); do + STATE=$(curl -sf "$LIVY/sessions/$SESSION_ID/state" | jq -r '.state') + [ "$STATE" = "idle" ] && break + if [ "$STATE" = "error" ] || [ "$STATE" = "dead" ]; then + echo "FAIL: session state=$STATE" + exit 1 + fi + sleep 2 + done + [ "$STATE" = "idle" ] || { echo "FAIL: session never reached idle (state=$STATE)"; exit 1; } +fi + +STMT_ID=$(curl -sf -X POST "$LIVY/sessions/$SESSION_ID/statements" \ + -H "Content-Type: application/json" \ + -d "$(jq -n --arg code "$SQL" '{code: $code}')" | jq -r '.id') + +for i in $(seq 1 60); do + STMT_STATE=$(curl -sf "$LIVY/sessions/$SESSION_ID/statements/$STMT_ID" | jq -r '.state') + [ "$STMT_STATE" = "available" ] && break + [ "$STMT_STATE" = "error" ] && break + sleep 1 +done + +RESULT=$(curl -sf "$LIVY/sessions/$SESSION_ID/statements/$STMT_ID") +STATUS=$(echo "$RESULT" | jq -r '.output.status') + +if [ "$STATUS" = "error" ]; then + echo "FAIL: SQL execution error:" + echo "$RESULT" | jq -r '.output.evalue, .output.traceback[]?' + exit 1 +fi + +echo "$RESULT" | jq -r '.output.data["text/plain"] // ""' diff --git a/optimizer-analyzer.Dockerfile b/optimizer-analyzer.Dockerfile new file mode 100644 index 000000000..925018dc7 --- /dev/null +++ b/optimizer-analyzer.Dockerfile @@ -0,0 +1,44 @@ +FROM openjdk:23-ea-11-slim + +ARG USER=openhouse +ARG USER_ID=1000 +ARG GROUP_ID=1000 +ENV APP_NAME=optimizer-analyzer +ENV USER_HOME=/home/$USER +ENV ANALYZER_INTERVAL_SECONDS=30 + +# Create an openhouse user as there's no reason to run as root user +RUN groupadd --force -g $GROUP_ID $USER && useradd -l -d $USER_HOME -m $USER -u $USER_ID -g $GROUP_ID + +WORKDIR $USER_HOME + +# IMAGE does not set the necessary paths by default. +ENV PATH=$PATH:/export/apps/jdk/JDK-1_8_0_172/bin/:$USER_HOME + +ARG VERSION="1.0.0-SNAPSHOT" +ARG BUILD_DIR="build/analyzerapp/libs" +ARG JAR_FILES=$BUILD_DIR/*.jar + +COPY $JAR_FILES ./ + +# Delete unwanted JAR files +RUN find . -name "*-sources.jar" -delete +RUN find . -name "*-javadoc.jar" -delete +RUN find . -name "*-lib.jar" -delete + +# Rename the JAR file. +RUN find ./ -name "*.jar" -exec mv {} $APP_NAME.jar \; +RUN ls $APP_NAME.jar + +# Ensure that everything in $USER_HOME is owned by openhouse user +RUN chown -R openhouse:openhouse $USER_HOME + +# Setup default path for Java +RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default + +USER $USER + +# Loop the analyzer on a fixed interval. Each pass is a fresh JVM, so opt-3's +# analyzer (which runs once per CommandLineRunner invocation and exits) becomes +# a continuous service in this container. +ENTRYPOINT ["sh", "-c", "while true; do echo \"Running $APP_NAME at $(date)\"; java -Xmx256M -Xms64M -XX:NativeMemoryTracking=summary -jar $APP_NAME.jar; echo \"Exited; sleeping ${ANALYZER_INTERVAL_SECONDS}s\"; sleep ${ANALYZER_INTERVAL_SECONDS}; done"] diff --git a/optimizer-scheduler.Dockerfile b/optimizer-scheduler.Dockerfile new file mode 100644 index 000000000..9be0f3d4c --- /dev/null +++ b/optimizer-scheduler.Dockerfile @@ -0,0 +1,43 @@ +FROM openjdk:23-ea-11-slim + +ARG USER=openhouse +ARG USER_ID=1000 +ARG GROUP_ID=1000 +ENV APP_NAME=optimizer-scheduler +ENV USER_HOME=/home/$USER +ENV SCHEDULER_INTERVAL_SECONDS=30 + +# Create an openhouse user as there's no reason to run as root user +RUN groupadd --force -g $GROUP_ID $USER && useradd -l -d $USER_HOME -m $USER -u $USER_ID -g $GROUP_ID + +WORKDIR $USER_HOME + +# IMAGE does not set the necessary paths by default. +ENV PATH=$PATH:/export/apps/jdk/JDK-1_8_0_172/bin/:$USER_HOME + +ARG VERSION="1.0.0-SNAPSHOT" +ARG BUILD_DIR="build/schedulerapp/libs" +ARG JAR_FILES=$BUILD_DIR/*.jar + +COPY $JAR_FILES ./ + +# Delete unwanted JAR files +RUN find . -name "*-sources.jar" -delete +RUN find . -name "*-javadoc.jar" -delete +RUN find . -name "*-lib.jar" -delete + +# Rename the JAR file. +RUN find ./ -name "*.jar" -exec mv {} $APP_NAME.jar \; +RUN ls $APP_NAME.jar + +# Ensure that everything in $USER_HOME is owned by openhouse user +RUN chown -R openhouse:openhouse $USER_HOME + +# Setup default path for Java +RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default + +USER $USER + +# Loop the scheduler on a fixed interval; each pass is a fresh JVM (opt-4's +# scheduler runs once per CommandLineRunner invocation and exits). +ENTRYPOINT ["sh", "-c", "while true; do echo \"Running $APP_NAME at $(date)\"; java -Xmx256M -Xms64M -XX:NativeMemoryTracking=summary -jar $APP_NAME.jar; echo \"Exited; sleeping ${SCHEDULER_INTERVAL_SECONDS}s\"; sleep ${SCHEDULER_INTERVAL_SECONDS}; done"] diff --git a/optimizer-service.Dockerfile b/optimizer-service.Dockerfile new file mode 100644 index 000000000..8f2af923c --- /dev/null +++ b/optimizer-service.Dockerfile @@ -0,0 +1,47 @@ +FROM openjdk:23-ea-11-slim + +ARG USER=openhouse +ARG USER_ID=1000 +ARG GROUP_ID=1000 +ENV APP_NAME=optimizer +ENV USER_HOME=/home/$USER + +# Create an openhouse user as there's no reason to run as root user +RUN groupadd --force -g $GROUP_ID $USER && useradd -l -d $USER_HOME -m $USER -u $USER_ID -g $GROUP_ID + +WORKDIR $USER_HOME + +# IMAGE does not set the necessary paths by default. +ENV PATH=$PATH:/export/apps/jdk/JDK-1_8_0_172/bin/:$USER_HOME + +ARG VERSION="1.0.0-SNAPSHOT" +ARG BUILD_DIR="build/$APP_NAME/libs" +ARG JAR_FILES=$BUILD_DIR/*.jar + +COPY $JAR_FILES ./ + +# Delete unwanted JAR files +RUN ls ./ +RUN find . -name "*-sources.jar" -delete +RUN find . -name "*-javadoc.jar" -delete +RUN find . -name "*-lib.jar" -delete + +# Rename the JAR file. +RUN ls ./ +RUN find ./ -name "*.jar" -exec mv {} $APP_NAME.jar \; + +RUN ls $APP_NAME.jar + +COPY run.sh . + + +# Ensure that everything in $USER_HOME is owned by openhouse user +RUN chown -R openhouse:openhouse $USER_HOME + +# Setup default path for Java +RUN mkdir -p /usr/java && ln -sfn /export/apps/jdk/JDK-1_8_0_172 /usr/java/default + +USER $USER + +EXPOSE 8080 +ENTRYPOINT ["sh", "-c", "./run.sh $APP_NAME.jar $@"] diff --git a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java index baf65c627..1c7945f9c 100644 --- a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java +++ b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java @@ -63,4 +63,12 @@ private MetricsConstant() {} public static final String HTS_LIST_TABLES_REQUEST = "hts_list_tables_request"; public static final String HTS_LIST_TABLES_TIME = "hts_list_tables_time"; public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time"; + + // Tables post-commit operation framework (bounded, best-effort actions after commit). + // Tagged with op={operation name}; on failure additionally tagged with outcome={timeout, + // network_error, server_error, client_error, prepare_threw, unknown_error}. + // The duration timer is bounded by tables.postcommit.per-op-timeout-ms. + public static final String POSTCOMMIT_OP_DURATION = "postcommit_op_duration"; + public static final String POSTCOMMIT_OP_SKIPPED = "postcommit_op_skipped"; + public static final String POSTCOMMIT_OP_FAILED = "postcommit_op_failed"; } diff --git a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java index 1f4b31542..f344dcb2b 100644 --- a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java +++ b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java @@ -51,6 +51,7 @@ public class CadenceBasedOrphanFilesDeletionAnalyzer implements OperationAnalyze private final CadencePolicy cadencePolicy; + @org.springframework.beans.factory.annotation.Autowired public CadenceBasedOrphanFilesDeletionAnalyzer( @Value("${ofd.success-retry-hours:16}") long successRetryHours, @Value("${ofd.failure-retry-hours:1}") long failureRetryHours) { diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java index 2ee40802f..ae4ee20b3 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java @@ -9,13 +9,11 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -34,10 +32,10 @@ public class TableOperationsController { private final OptimizerDataService service; /** - * Report an update to an operation. {@code id} comes from the URL; the body's {@code operationId} - * must match (the controller rejects mismatched requests with 400). The backend looks up the - * operation row, writes a history entry with the operation's table metadata, and returns 201 - * Created with the history row, or 404 if the operation does not exist. + * Report an update to an operation. {@code id} comes from the URL; the body carries the terminal + * status and any per-operation metrics or error details. The backend looks up the operation row, + * writes a history entry with the operation's table metadata plus the supplied metrics, and + * returns 201 Created with the history row, or 404 if the operation does not exist. */ @ApiResponses( value = { @@ -48,21 +46,17 @@ public class TableOperationsController { @PostMapping("/{id}/update") public ResponseEntity updateOperation( @PathVariable String id, @RequestBody UpdateOperationRequest request) { - if (!StringUtils.hasText(request.getOperationId())) { - throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "operationId is required"); - } - if (!Objects.equals(id, request.getOperationId())) { - throw new ResponseStatusException( - HttpStatus.BAD_REQUEST, - String.format( - "operationId in body (%s) does not match path id (%s)", - request.getOperationId(), id)); - } if (request.getStatus() == null) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "status is required"); } return service - .updateOperation(id, request.getStatus().toModel()) + .updateOperation( + id, + request.getStatus().toModel(), + request.getOrphanFilesDeleted(), + request.getOrphanBytesDeleted(), + request.getErrorMessage(), + request.getErrorType()) .map( history -> ResponseEntity.status(HttpStatus.CREATED) diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java index b119dd1c7..87fbfa709 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java @@ -24,10 +24,22 @@ /** REST controller for managing per-table stats in the optimizer DB. */ @RestController -@RequestMapping("/v1/optimizer/stats") +@RequestMapping(TableStatsController.BASE_PATH) @RequiredArgsConstructor public class TableStatsController { + /** + * Base path for the stats endpoints. Single source of truth — external callers must reference. + */ + public static final String BASE_PATH = "/v1/optimizer/stats"; + + /** + * URI template (with {@code {tableUuid}} placeholder) for the per-table stats upsert/fetch + * endpoint. Use this from any client that calls the optimizer over HTTP, rather than rebuilding + * the path inline. + */ + public static final String TABLE_PATH_TEMPLATE = BASE_PATH + "/{tableUuid}"; + private final OptimizerDataService service; /** diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java index 7a000f840..307a3a892 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java @@ -35,6 +35,18 @@ public class TableOperationsHistory { /** {@code SUCCESS} or {@code FAILED}. */ private HistoryStatus status; + /** OFD-specific: number of orphan files deleted; null if not OFD or on failure. */ + private Long orphanFilesDeleted; + + /** OFD-specific: bytes reclaimed by orphan file deletion; null if not OFD or on failure. */ + private Long orphanBytesDeleted; + + /** On failure, the message from the Spark-side exception. Null on success. */ + private String errorMessage; + + /** On failure, the simple name of the Spark-side exception class. Null on success. */ + private String errorType; + /** Convert to the internal-model counterpart. */ public TableOperationsHistoryDto toModel() { return TableOperationsHistoryDto.builder() @@ -45,6 +57,10 @@ public TableOperationsHistoryDto toModel() { .operationType(operationType == null ? null : operationType.toModel()) .completedAt(completedAt) .status(status == null ? null : status.toModel()) + .orphanFilesDeleted(orphanFilesDeleted) + .orphanBytesDeleted(orphanBytesDeleted) + .errorMessage(errorMessage) + .errorType(errorType) .build(); } @@ -61,6 +77,10 @@ public static TableOperationsHistory fromModel(TableOperationsHistoryDto h) { .operationType(OperationType.fromModel(h.getOperationType())) .completedAt(h.getCompletedAt()) .status(HistoryStatus.fromModel(h.getStatus())) + .orphanFilesDeleted(h.getOrphanFilesDeleted()) + .orphanBytesDeleted(h.getOrphanBytesDeleted()) + .errorMessage(h.getErrorMessage()) + .errorType(h.getErrorType()) .build(); } } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java index a216e9db3..3931242db 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java @@ -6,20 +6,19 @@ import lombok.NoArgsConstructor; /** - * Request body for {@code POST /v1/table-operations/update}. + * Request body for {@code POST /v1/optimizer/operations/{id}/update}. * - *

Reports the outcome of a single operation update. The service looks up the operation row by - * {@link #operationId} and writes a history entry for it. + *

Reports an update to a single operation. The operation row's UUID lives in the URL path; the + * body carries the terminal status and any operation-type-specific result metrics. * *

A single Spark job typically processes N tables and yields N independent (status) outcomes — * one per operation. Callers issue one update request per operation; the service does not * bulk-update by job. * - *

The remaining fields ({@link #tableUuid}, {@link #databaseName}, {@link #tableName}, {@link - * #operationType}) are debug-only echo information. The server does not key off them; they are - * preserved on log lines and traces so an operator looking at a failing update call can see which - * (db, table, operation) the caller believed it was updating without joining back to the operation - * row. + *

The debug-echo fields ({@link #tableUuid}, {@link #databaseName}, {@link #tableName}, {@link + * #operationType}) are optional. The server does not key off them; they are preserved on log lines + * and traces so an operator looking at a failing update call can see which (db, table, operation) + * the caller believed it was updating without joining back to the operation row. */ @Data @Builder @@ -27,12 +26,21 @@ @AllArgsConstructor public class UpdateOperationRequest { - /** Operation row's UUID — the primary lookup key. */ - private String operationId; - /** Terminal outcome for this single operation. */ private HistoryStatus status; + /** OFD-specific: number of orphan files deleted. Null on failure or non-OFD operation. */ + private Long orphanFilesDeleted; + + /** OFD-specific: bytes reclaimed. Null on failure or non-OFD operation. */ + private Long orphanBytesDeleted; + + /** On failure, the exception message from the Spark-side worker. Null on success. */ + private String errorMessage; + + /** On failure, the simple name of the exception class. Null on success. */ + private String errorType; + /** Debug echo: stable table identity the caller believed it was completing. */ private String tableUuid; diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java index 5f4a598d9..484c3e8dd 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java @@ -72,4 +72,20 @@ public class TableOperationsHistoryRow { @Enumerated(EnumType.STRING) @Column(name = "status", nullable = false, length = 20) private HistoryStatus status; + + /** OFD-specific: number of orphan files deleted; null if not an OFD operation or on failure. */ + @Column(name = "orphan_files_deleted") + private Long orphanFilesDeleted; + + /** OFD-specific: bytes reclaimed by orphan file deletion; null if not OFD or on failure. */ + @Column(name = "orphan_bytes_deleted") + private Long orphanBytesDeleted; + + /** On failure, the message from the Spark-side exception. Null on success. */ + @Column(name = "error_message", length = 1024) + private String errorMessage; + + /** On failure, the simple name of the Spark-side exception class. Null on success. */ + @Column(name = "error_type", length = 256) + private String errorType; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java index 74922e7b0..20225fc84 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java @@ -40,6 +40,18 @@ public class TableOperationsHistoryDto { /** Terminal outcome: {@link HistoryStatusDto#SUCCESS} or {@link HistoryStatusDto#FAILED}. */ private HistoryStatusDto status; + /** OFD-specific: number of orphan files deleted; null if not an OFD operation or on failure. */ + private Long orphanFilesDeleted; + + /** OFD-specific: bytes reclaimed by orphan file deletion; null if not OFD or on failure. */ + private Long orphanBytesDeleted; + + /** On failure, the message from the Spark-side exception. Null on success. */ + private String errorMessage; + + /** On failure, the simple name of the Spark-side exception class. Null on success. */ + private String errorType; + /** Convert to the corresponding DB row. */ public TableOperationsHistoryRow toRow() { return TableOperationsHistoryRow.builder() @@ -50,6 +62,10 @@ public TableOperationsHistoryRow toRow() { .operationType(operationType == null ? null : operationType.toDb()) .completedAt(completedAt) .status(status == null ? null : status.toDb()) + .orphanFilesDeleted(orphanFilesDeleted) + .orphanBytesDeleted(orphanBytesDeleted) + .errorMessage(errorMessage) + .errorType(errorType) .build(); } @@ -66,6 +82,10 @@ public static TableOperationsHistoryDto fromRow(TableOperationsHistoryRow row) { .operationType(OperationTypeDto.fromDb(row.getOperationType())) .completedAt(row.getCompletedAt()) .status(HistoryStatusDto.fromDb(row.getStatus())) + .orphanFilesDeleted(row.getOrphanFilesDeleted()) + .orphanBytesDeleted(row.getOrphanBytesDeleted()) + .errorMessage(row.getErrorMessage()) + .errorType(row.getErrorType()) .build(); } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java index c20ae7bf2..365e062b8 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java @@ -37,10 +37,20 @@ List listTableOperations( /** * Update an operation by writing a history entry. Looks up the operation row by {@code * operationId}, copies its table metadata into a new history row with the supplied terminal - * {@code status}, and saves it. Returns the history record, or empty if the operation does not - * exist. + * {@code status} and optional per-operation metrics / error details, and saves it. Returns the + * history record, or empty if the operation does not exist. + * + *

The four trailing parameters are all nullable. Successful OFD operations populate {@code + * orphanFilesDeleted} / {@code orphanBytesDeleted}; failures populate {@code errorMessage} / + * {@code errorType}. Non-OFD operations leave all four null. */ - Optional updateOperation(String operationId, HistoryStatusDto status); + Optional updateOperation( + String operationId, + HistoryStatusDto status, + Long orphanFilesDeleted, + Long orphanBytesDeleted, + String errorMessage, + String errorType); /** * Return the operation row for {@code id} regardless of status, or empty if it does not exist. diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java index 29fd0eeee..26408abb9 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java @@ -67,7 +67,12 @@ public List listTableOperations( @Override @Transactional public Optional updateOperation( - String operationId, HistoryStatusDto status) { + String operationId, + HistoryStatusDto status, + Long orphanFilesDeleted, + Long orphanBytesDeleted, + String errorMessage, + String errorType) { return operationsRepository .findById(operationId) .map( @@ -80,6 +85,10 @@ public Optional updateOperation( .operationType(OperationTypeDto.fromDb(row.getOperationType())) .completedAt(Instant.now()) .status(status) + .orphanFilesDeleted(orphanFilesDeleted) + .orphanBytesDeleted(orphanBytesDeleted) + .errorMessage(errorMessage) + .errorType(errorType) .build()) .map(history -> TableOperationsHistoryDto.fromRow(historyRepository.save(history.toRow()))); } diff --git a/services/optimizer/src/main/resources/db/optimizer-schema.sql b/services/optimizer/src/main/resources/db/optimizer-schema.sql index 892c1c55f..24071e2e8 100644 --- a/services/optimizer/src/main/resources/db/optimizer-schema.sql +++ b/services/optimizer/src/main/resources/db/optimizer-schema.sql @@ -38,13 +38,17 @@ CREATE TABLE IF NOT EXISTS table_stats_history ( ); CREATE TABLE IF NOT EXISTS table_operations_history ( - id VARCHAR(36) NOT NULL, - table_uuid VARCHAR(36) NOT NULL, - database_name VARCHAR(128) NOT NULL, - table_name VARCHAR(128) NOT NULL, - operation_type VARCHAR(50) NOT NULL, - completed_at TIMESTAMP(6) NOT NULL, - status VARCHAR(20) NOT NULL, + id VARCHAR(36) NOT NULL, + table_uuid VARCHAR(36) NOT NULL, + database_name VARCHAR(128) NOT NULL, + table_name VARCHAR(128) NOT NULL, + operation_type VARCHAR(50) NOT NULL, + completed_at TIMESTAMP(6) NOT NULL, + status VARCHAR(20) NOT NULL, + orphan_files_deleted BIGINT, + orphan_bytes_deleted BIGINT, + error_message VARCHAR(1024), + error_type VARCHAR(256), PRIMARY KEY (id), INDEX idx_toph_db_table (database_name, table_name), -- Drives TableOperationHistoryRepository.findLatestPerTable: the correlated diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java index b9c8dc3dc..18a077d45 100644 --- a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java @@ -20,12 +20,12 @@ import org.springframework.transaction.annotation.Transactional; /** - * Exercises what the controllers own: server-side validation on {@code updateOperation} (path/body - * mismatch, missing fields) and 404s on missing rows. Assertions are status-code-only: MockMvc does - * not trigger Spring's error-dispatch to {@code BasicErrorController}, so the response body of a - * {@link org.springframework.web.server.ResponseStatusException} is empty in tests even though it - * is populated in production (with {@code server.error.include-message=always}). Framework-level - * 4xx (missing query param, malformed JSON, etc.) is left to Spring's defaults and not asserted. + * Exercises what the controllers own: server-side validation on {@code updateOperation} (status + * required) and 404s on missing rows. Assertions are status-code-only: MockMvc does not trigger + * Spring's error-dispatch to {@code BasicErrorController}, so the response body of a {@link + * org.springframework.web.server.ResponseStatusException} is empty in tests even though it is + * populated in production (with {@code server.error.include-message=always}). Framework-level 4xx + * (missing query param, malformed JSON, etc.) is left to Spring's defaults and not asserted. */ @SpringBootTest @AutoConfigureMockMvc @@ -39,7 +39,7 @@ class ControllerErrorHandlingTest { @Test void updateOperation_notFound_returns404() throws Exception { String id = UUID.randomUUID().toString(); - String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", id); + String body = "{\"status\":\"SUCCESS\"}"; mockMvc .perform( post("/v1/optimizer/operations/" + id + "/update") @@ -48,40 +48,14 @@ void updateOperation_notFound_returns404() throws Exception { .andExpect(status().isNotFound()); } - @Test - void updateOperation_pathBodyMismatch_returns400() throws Exception { - String pathId = UUID.randomUUID().toString(); - String bodyId = UUID.randomUUID().toString(); - String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", bodyId); - mockMvc - .perform( - post("/v1/optimizer/operations/" + pathId + "/update") - .contentType(MediaType.APPLICATION_JSON) - .content(body)) - .andExpect(status().isBadRequest()); - } - - @Test - void updateOperation_missingOperationId_returns400() throws Exception { - String pathId = UUID.randomUUID().toString(); - String body = "{\"status\":\"SUCCESS\"}"; - mockMvc - .perform( - post("/v1/optimizer/operations/" + pathId + "/update") - .contentType(MediaType.APPLICATION_JSON) - .content(body)) - .andExpect(status().isBadRequest()); - } - @Test void updateOperation_missingStatus_returns400() throws Exception { String id = UUID.randomUUID().toString(); - String body = String.format("{\"operationId\":\"%s\"}", id); mockMvc .perform( post("/v1/optimizer/operations/" + id + "/update") .contentType(MediaType.APPLICATION_JSON) - .content(body)) + .content("{}")) .andExpect(status().isBadRequest()); } @@ -112,7 +86,7 @@ void updateOperation_happyPath_returns201() throws Exception { .scheduledAt(Instant.now()) .jobId("job-x") .build()); - String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", id); + String body = "{\"status\":\"SUCCESS\"}"; mockMvc .perform( post("/v1/optimizer/operations/" + id + "/update") diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java index 2a3c1e676..8c4d6ecf2 100644 --- a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java @@ -37,7 +37,7 @@ class OptimizerDataServiceImplTest { // --- updateOperation --- @Test - void completeOperation_writesHistoryFromOperationRow() { + void updateOperation_writesHistoryFromOperationRow() { String operationId = UUID.randomUUID().toString(); String tableUuid = UUID.randomUUID().toString(); operationsRepository.save( @@ -54,7 +54,7 @@ void completeOperation_writesHistoryFromOperationRow() { .build()); Optional result = - service.updateOperation(operationId, HistoryStatusDto.SUCCESS); + service.updateOperation(operationId, HistoryStatusDto.SUCCESS, 42L, 1024L, null, null); assertThat(result).isPresent(); assertThat(result.get().getStatus()).isEqualTo(HistoryStatusDto.SUCCESS); @@ -62,12 +62,45 @@ void completeOperation_writesHistoryFromOperationRow() { assertThat(result.get().getOperationType()).isEqualTo(OperationTypeDto.ORPHAN_FILES_DELETION); assertThat(result.get().getDatabaseName()).isEqualTo("db1"); assertThat(result.get().getCompletedAt()).isNotNull(); + assertThat(result.get().getOrphanFilesDeleted()).isEqualTo(42L); + assertThat(result.get().getOrphanBytesDeleted()).isEqualTo(1024L); + assertThat(result.get().getErrorMessage()).isNull(); + assertThat(result.get().getErrorType()).isNull(); } @Test - void completeOperation_notFound_returnsEmpty() { + void updateOperation_failurePersistsErrorFields() { + String operationId = UUID.randomUUID().toString(); + operationsRepository.save( + TableOperationsRow.builder() + .id(operationId) + .tableUuid(UUID.randomUUID().toString()) + .databaseName("db1") + .tableName("tbl1") + .operationType(com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION) + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED) + .createdAt(Instant.now()) + .scheduledAt(Instant.now()) + .jobId("spark-job-456") + .build()); + + Optional result = + service.updateOperation( + operationId, HistoryStatusDto.FAILED, null, null, "boom", "RuntimeException"); + + assertThat(result).isPresent(); + assertThat(result.get().getStatus()).isEqualTo(HistoryStatusDto.FAILED); + assertThat(result.get().getOrphanFilesDeleted()).isNull(); + assertThat(result.get().getOrphanBytesDeleted()).isNull(); + assertThat(result.get().getErrorMessage()).isEqualTo("boom"); + assertThat(result.get().getErrorType()).isEqualTo("RuntimeException"); + } + + @Test + void updateOperation_notFound_returnsEmpty() { Optional result = - service.updateOperation(UUID.randomUUID().toString(), HistoryStatusDto.FAILED); + service.updateOperation( + UUID.randomUUID().toString(), HistoryStatusDto.FAILED, null, null, null, null); assertThat(result).isEmpty(); } diff --git a/services/tables/build.gradle b/services/tables/build.gradle index c85a57131..da42c32ef 100644 --- a/services/tables/build.gradle +++ b/services/tables/build.gradle @@ -44,6 +44,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version testImplementation 'org.springframework.security:spring-security-test:5.7.3' testImplementation 'org.springframework:spring-context-support:5.3.18' + testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0' testImplementation(testFixtures(project(':services:common'))) testImplementation (project(':tables-test-fixtures:tables-test-fixtures_2.12')) { exclude group: 'com.linkedin.iceberg' diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java index 08111cd82..9315df9d8 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java @@ -73,6 +73,7 @@ public interface TablesMapper { @Mapping(source = "requestBody.sortOrder", target = "sortOrder"), @Mapping(target = "lastModifiedTime", ignore = true), @Mapping(target = "creationTime", ignore = true), + @Mapping(target = "currentSnapshot", ignore = true) }) TableDto toTableDto(TableDto tableDto, CreateUpdateTableRequestBody requestBody); @@ -120,7 +121,8 @@ public interface TablesMapper { defaultExpression = "java(TableType.PRIMARY_TABLE)"), @Mapping(source = "requestBody.createUpdateTableRequestBody.sortOrder", target = "sortOrder"), @Mapping(target = "lastModifiedTime", ignore = true), - @Mapping(target = "creationTime", ignore = true) + @Mapping(target = "creationTime", ignore = true), + @Mapping(target = "currentSnapshot", ignore = true) }) TableDto toTableDto(TableDto tableDto, IcebergSnapshotsRequestBody requestBody); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java new file mode 100644 index 000000000..90437d418 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java @@ -0,0 +1,30 @@ +package com.linkedin.openhouse.tables.model; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +// In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a +// TableDto. +// +// This carries only fields already materialized by the catalog client. Constructing it never +// triggers a separate HDFS or object-store read. +// +// The value is present whenever the underlying table has at least one committed snapshot at +// construction time. It is absent (modeled as Optional.empty() on TableDto) for tables with no +// committed data, such as a CREATE TABLE with no rows yet. +@Getter +@Builder +@AllArgsConstructor +@EqualsAndHashCode +public class CurrentSnapshotInfo { + + // Iceberg snapshot ID. Stable per commit; usable as an idempotency token. + private final long snapshotId; + + // Iceberg Snapshot.summary() map, unmodified. Keys include total-data-files, total-files-size, + // added-data-files, deleted-data-files, added-files-size, removed-files-size. + private final Map summary; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java index 917c632d8..59dc2e80f 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java @@ -11,11 +11,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.persistence.Convert; import javax.persistence.ElementCollection; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.IdClass; +import javax.persistence.Transient; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -83,6 +85,23 @@ public class TableDto { private boolean replaceCommit; + // In-memory current-snapshot metadata captured when this TableDto was built from an Iceberg + // Table. + // + // Present whenever the underlying table has at least one committed snapshot at that point. + // Absent for tables with no committed data, such as a CREATE TABLE with no rows yet. + // + // Not persisted and not part of equality. Read through getCurrentSnapshot(). + @Getter(AccessLevel.NONE) + @Transient + @EqualsAndHashCode.Exclude + private CurrentSnapshotInfo currentSnapshot; + + // Returns the current-snapshot metadata if any, else Optional.empty(). + public Optional getCurrentSnapshot() { + return Optional.ofNullable(currentSnapshot); + } + /** * Bundling eligible string type field into a map as {@link org.mapstruct.Mapper} doesn't provide * easy interface to achieve so. diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index 0dde039c0..d3ced5458 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -9,6 +9,7 @@ import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.PoliciesSpecMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.TableTypeMapper; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.repository.PreservedKeyChecker; import java.net.URI; @@ -135,6 +136,13 @@ static TableDto convertToTableDto( .jsonSnapshots(null) .tableProperties(megaProps) .sortOrder(SortOrderParser.toJson(table.sortOrder())) + .currentSnapshot( + table.currentSnapshot() == null + ? null + : CurrentSnapshotInfo.builder() + .snapshotId(table.currentSnapshot().snapshotId()) + .summary(table.currentSnapshot().summary()) + .build()) .build(); return tableDto; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java index af43a169e..6623b5f37 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java @@ -10,6 +10,7 @@ import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher; import com.linkedin.openhouse.tables.utils.AuthorizationUtils; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; import java.util.Optional; @@ -32,6 +33,13 @@ public class IcebergSnapshotsServiceImpl implements IcebergSnapshotsService { @Autowired AuthorizationUtils authorizationUtils; + /** + * Present only when {@code tables.postcommit.enabled=true}. When absent, the on-commit hook is a + * literal no-op and no post-commit operations run. + */ + @Autowired(required = false) + Optional postCommitDispatcher = Optional.empty(); + @Override public Pair putIcebergSnapshots( String databaseId, @@ -83,7 +91,9 @@ public Pair putIcebergSnapshots( databaseId, tableCreatorUpdater, Privileges.CREATE_TABLE); } try { - return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent()); + TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave); + postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto)); + return Pair.of(savedDto, !tableDto.isPresent()); } catch (BadRequestException e) { throw new RequestValidationFailureException(e.getMessage(), e); } catch (CommitFailedException ce) { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java new file mode 100644 index 000000000..47613f98c --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java @@ -0,0 +1,36 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +// Wiring for the post-commit Tables-to-Optimizer stats push. +// +// This configuration is active only when optimizer.stats.enabled=true. When the flag is false, +// no WebClient bean is constructed, and the dispatcher sees no optimizer-stats operation. +@Configuration +@EnableConfigurationProperties(OptimizerStatsProperties.class) +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsConfig { + + // Dedicated WebClient for the optimizer stats endpoint. + // + // The per-attempt timeout is applied on the Reactor chain in OptimizerStatsPostCommitOperation, + // and the outer per-op timeout is applied by PostCommitDispatcher. Neither timeout is + // configured on the underlying Netty client. + // + // This arrangement ensures that any timeout always emerges as a standard + // java.util.concurrent.TimeoutException rather than a Netty ReadTimeoutException, which keeps + // the dispatcher's outcome classification simple. + @Bean("optimizerStatsWebClient") + public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { + return WebClient.builder() + .baseUrl(properties.getBaseUri()) + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .build(); + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java new file mode 100644 index 000000000..756da2d10 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java @@ -0,0 +1,168 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; +import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitOperation; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; + +// A PostCommitOperation that PUTs a snapshot-stats record to the optimizer's per-table stats +// endpoint. +// +// prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise. The +// dispatcher owns the timeout, the subscription, error swallowing, and metric emission. The +// returned value is Optional.empty() when the table is not opted in via OPT_IN_PROPERTY or has +// no current snapshot. +// +// Internal retry is bounded by OptimizerStatsProperties.maxAttempts and fires only on retryable +// errors: network failures, a TimeoutException, or HTTP 408 / 429 / 5xx responses. The +// dispatcher's per-op timeout is the hard ceiling on the whole chain. +// +// The bean is wired only when optimizer.stats.enabled=true. PATH_TEMPLATE is intentionally +// duplicated from TableStatsController.TABLE_PATH_TEMPLATE so the tables service does not take a +// compile-time dependency on the optimizer jar. Keep both copies in sync. +@Slf4j +@Component +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsPostCommitOperation implements PostCommitOperation { + + // Metric tag value for the "op" tag. + static final String OP_NAME = "optimizer_stats"; + + // Per-call URL path. Intentionally duplicated from TableStatsController.TABLE_PATH_TEMPLATE so + // that we avoid a compile-time dependency on the optimizer jar. Keep both copies in sync. + static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; + + // Table-property key that opts a table in for the post-commit stats push. + static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; + + // Iceberg snapshot-summary keys we read. All values are decimal-string longs. + static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; + static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; + static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; + static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; + static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size"; + static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; + + private final WebClient webClient; + private final OptimizerStatsProperties properties; + + public OptimizerStatsPostCommitOperation( + @Qualifier("optimizerStatsWebClient") WebClient webClient, + OptimizerStatsProperties properties) { + this.webClient = webClient; + this.properties = properties; + } + + @Override + public String name() { + return OP_NAME; + } + + @Override + public Optional> prepare(TableDto savedDto) { + if (!isOptedIn(savedDto)) { + return Optional.empty(); + } + Optional snapshot = savedDto.getCurrentSnapshot(); + if (!snapshot.isPresent()) { + return Optional.empty(); + } + + OptimizerStatsRequest body = buildRequest(savedDto, snapshot.get()); + String tableUuid = savedDto.getTableUUID(); + + RetrySpec retrySpec = + Retry.max(Math.max(0, properties.getMaxAttempts() - 1)).filter(this::isRetryable); + + Mono chain = + webClient + .put() + .uri(PATH_TEMPLATE, tableUuid) + .bodyValue(body) + .retrieve() + .toBodilessEntity() + .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs())) + .retryWhen(retrySpec.onRetryExhaustedThrow((spec, signal) -> signal.failure())) + .then(); + return Optional.of(chain); + } + + // Returns true when the table's properties contain the literal opt-in value "true" for the + // configured OPT_IN_PROPERTY. + private boolean isOptedIn(TableDto saved) { + Map props = saved.getTableProperties(); + return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); + } + + // Builds the wire body. Missing summary keys default to 0L. + OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) { + Map summary = snapshot.getSummary(); + OptimizerStatsRequest.Snapshot snapshotPayload = + OptimizerStatsRequest.Snapshot.builder() + .snapshotId(snapshot.getSnapshotId()) + .tableVersion(saved.getTableVersion()) + .tableLocation(saved.getTableLocation()) + .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE)) + .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES)) + .build(); + OptimizerStatsRequest.Delta delta = + OptimizerStatsRequest.Delta.builder() + .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES)) + .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES)) + .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE)) + .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE)) + .build(); + return OptimizerStatsRequest.builder() + .databaseName(saved.getDatabaseId()) + .tableName(saved.getTableId()) + .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshotPayload).delta(delta).build()) + .tableProperties(saved.getTableProperties()) + .build(); + } + + // Returns true for errors that a later attempt could plausibly succeed against: a per-attempt + // timeout, a network-level failure, or an HTTP 5xx / 408 / 429 response. + // + // Other 4xx responses are client errors that retries cannot fix, so we fail fast on them. + boolean isRetryable(Throwable e) { + if (e instanceof TimeoutException) { + return true; + } + if (e instanceof WebClientRequestException) { + return true; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + return code == 408 || code == 429 || (code >= 500 && code < 600); + } + return false; + } + + private static long longOrZero(Map m, String key) { + if (m == null) { + return 0L; + } + String v = m.get(key); + if (v == null) { + return 0L; + } + try { + return Long.parseLong(v); + } catch (NumberFormatException nfe) { + return 0L; + } + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java new file mode 100644 index 000000000..877ce3423 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java @@ -0,0 +1,35 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +// Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats +// endpoint. Property prefix is optimizer.stats. +// +// When enabled is false (the default), no client bean is constructed, and the operation is +// absent from the dispatcher. Environments that want the feed turn it on and set baseUri. +@ConfigurationProperties("optimizer.stats") +@Getter +@Setter +public class OptimizerStatsProperties { + + // Master switch. When false, no HTTP push is attempted and no client bean is wired. + private boolean enabled = false; + + // Base URI of the Optimizer service. The path /v1/optimizer/stats/{tableUuid} is appended at + // call time. No default; required when enabled is true. + private String baseUri; + + // Per-attempt HTTP timeout in milliseconds. + // + // Bounds each individual attempt so retries can fit inside the dispatcher's outer per-op budget + // (tables.postcommit.per-op-timeout-ms, default 3000 ms). Default is 1000 ms. + private long perAttemptTimeoutMs = 1000L; + + // Total attempt count (the initial try plus retries). Default is 3. + // + // Retries fire only on retryable errors: network failures, a timeout, or an HTTP 408 / 429 / + // 5xx response. Other 4xx responses fail fast without retry. + private int maxAttempts = 3; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java new file mode 100644 index 000000000..2cb2b2873 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -0,0 +1,67 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +// Wire body for PUT /v1/optimizer/stats/{tableUuid}. +// +// This mirrors the optimizer's UpsertTableStatsRequest field-for-field. The type is duplicated +// here so that the tables service does not take a compile-time dependency on the optimizer jar. +// Keep both copies in sync. +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class OptimizerStatsRequest { + + private String databaseName; + private String tableName; + private Stats stats; + private Map tableProperties; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Stats { + private Snapshot snapshot; + private Delta delta; + } + + // Point-in-time snapshot metrics. Maps to the optimizer's + // TableStatsPayload.SnapshotMetricsDto. + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Snapshot { + // Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and + // reject out-of-order replays. + private Long snapshotId; + + private String tableVersion; + private String tableLocation; + private Long tableSizeBytes; + private Long numCurrentFiles; + } + + // Per-commit incremental counters. Maps to the optimizer's TableStatsPayload.CommitDeltaDto. + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Delta { + private Long numFilesAdded; + private Long numFilesDeleted; + private Long addedSizeBytes; + private Long deletedSizeBytes; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java new file mode 100644 index 000000000..5a04e2df7 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java @@ -0,0 +1,136 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; + +// Runs PostCommitOperations after a successful Iceberg commit. +// +// Each operation receives a wall-clock timeout from tables.postcommit.per-op-timeout-ms. Any +// error the operation signals is recorded as a metric and a log line, then swallowed. Dispatch +// is fire-and-forget, so the commit thread is never blocked on operation work. +// +// Operations describe payload and endpoint only. The timeout, error swallowing, subscription, +// and metric emission all live here, so the contract across operations stays uniform. +// +// The bean is constructed only when tables.postcommit.enabled=true. +@Slf4j +@Component +@EnableConfigurationProperties(PostCommitProperties.class) +@ConditionalOnProperty(prefix = "tables.postcommit", name = "enabled", havingValue = "true") +public class PostCommitDispatcher { + + private final List operations; + private final PostCommitProperties properties; + private final MeterRegistry meterRegistry; + + public PostCommitDispatcher( + List operations, + PostCommitProperties properties, + MeterRegistry meterRegistry) { + this.operations = operations; + this.properties = properties; + this.meterRegistry = meterRegistry; + } + + // Dispatches all registered operations for savedDto. + // + // Returns immediately on the calling thread. Each operation runs on its underlying reactive + // scheduler. This method never throws. + public void dispatch(TableDto savedDto) { + for (PostCommitOperation op : operations) { + decorate(op, savedDto).ifPresent(Mono::subscribe); + } + } + + // Returns the fully-decorated Mono for op without subscribing to it. + // + // The decoration applies the per-op timeout, records the success or error metric, and swallows + // any error. When the operation does not apply (or its prepare() throws synchronously), this + // method emits the "skipped" or "prepare_threw" metric and returns Optional.empty(). + // + // Package-private so that tests can .block() on the chain rather than poll for metric emission + // after a fire-and-forget subscription. + Optional> decorate(PostCommitOperation op, TableDto savedDto) { + Optional> work; + try { + work = op.prepare(savedDto); + } catch (RuntimeException e) { + // Defensive: a prepare() that throws synchronously must not break dispatch of later ops. + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", "prepare_threw") + .increment(); + log.warn("Post-commit op {} prepare() threw {}", op.name(), e.toString()); + return Optional.empty(); + } + if (!work.isPresent()) { + meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", op.name()).increment(); + return Optional.empty(); + } + Timer.Sample sample = Timer.start(meterRegistry); + Mono decorated = + work.get() + .timeout(Duration.ofMillis(properties.getPerOpTimeoutMs())) + .doOnSuccess( + ignored -> + sample.stop( + meterRegistry.timer( + MetricsConstant.POSTCOMMIT_OP_DURATION, + "op", + op.name(), + "outcome", + "success"))) + .onErrorResume( + e -> { + String outcome = classifyOutcome(e); + sample.stop( + meterRegistry.timer( + MetricsConstant.POSTCOMMIT_OP_DURATION, + "op", + op.name(), + "outcome", + outcome)); + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", outcome) + .increment(); + log.warn("Post-commit op {} failed ({}): {}", op.name(), outcome, e.toString()); + return Mono.empty(); + }); + return Optional.of(decorated); + } + + // Maps a terminal error to a small set of outcome tags. The classifier lives here so that all + // operations share the same taxonomy. + private static String classifyOutcome(Throwable e) { + if (e instanceof TimeoutException) { + return "timeout"; + } + if (e instanceof WebClientRequestException) { + return "network_error"; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + if (code >= 500 && code < 600) { + return "server_error"; + } + if (code >= 400 && code < 500) { + return "client_error"; + } + } + return "unknown_error"; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java new file mode 100644 index 000000000..6cf99cd7e --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java @@ -0,0 +1,26 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import com.linkedin.openhouse.tables.model.TableDto; +import java.util.Optional; +import reactor.core.publisher.Mono; + +// A best-effort, bounded, asynchronous action that the Tables Service runs after a successful +// Iceberg commit. +// +// Operations are invoked by PostCommitDispatcher, which owns the per-op timeout, the +// subscription, error swallowing, and metric emission. An error signaled from prepare() is +// recorded and dropped, so it never affects commit correctness. +// +// Implementations describe what to push and where. They may apply internal retries within a +// bounded budget. They must not subscribe themselves or apply an outer timeout, because the +// dispatcher already owns both concerns. +public interface PostCommitOperation { + + // Returns a short, stable identifier for this operation. Used as the "op" metric tag. + String name(); + + // Builds the work to run for savedDto, or returns Optional.empty() when the operation does not + // apply to this commit (for example, when the table is not opted in or has no committed + // snapshot). The dispatcher records an empty return as a "skipped" metric. + Optional> prepare(TableDto savedDto); +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java new file mode 100644 index 000000000..98e8b02eb --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java @@ -0,0 +1,27 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +// Configuration for the Tables Service post-commit operation framework. Property prefix is +// tables.postcommit. +// +// When enabled is false (the default), the dispatcher bean is not constructed, and any +// registered PostCommitOperation beans are never invoked. +@ConfigurationProperties("tables.postcommit") +@Getter +@Setter +public class PostCommitProperties { + + // Master switch. When false, no operations are dispatched on commit. + private boolean enabled = false; + + // Wall-clock ceiling that the dispatcher applies to each operation's prepared Mono. + // + // This bounds resource occupancy (connections, threads) for a misbehaving operation. It does + // not block the commit thread, because operations run on the underlying reactive scheduler. + // + // Default is 3000 ms. + private long perOpTimeoutMs = 3000L; +} diff --git a/services/tables/src/main/resources/application.properties b/services/tables/src/main/resources/application.properties index 4a6c82a89..24f5554c5 100644 --- a/services/tables/src/main/resources/application.properties +++ b/services/tables/src/main/resources/application.properties @@ -23,4 +23,16 @@ management.metrics.distribution.maximum-expected-value.catalog_metadata_retrieva management.metrics.distribution.maximum-expected-value.catalog_metadata_update_latency=600s management.metrics.distribution.maximum-expected-value.http.server.requests=600s server.shutdown=graceful -spring.lifecycle.timeout-per-shutdown-phase=60s \ No newline at end of file +spring.lifecycle.timeout-per-shutdown-phase=60s + +# Post-commit operation framework. Disabled by default; flip per OH instance. +# Outer per-op wall-clock budget is enforced by PostCommitDispatcher. +tables.postcommit.enabled=${TABLES_POSTCOMMIT_ENABLED:false} +tables.postcommit.per-op-timeout-ms=${TABLES_POSTCOMMIT_PER_OP_TIMEOUT_MS:3000} + +# Optimizer stats post-commit operation (one impl of PostCommitOperation). +# Both flags must be true for the push to be wired and dispatched. +optimizer.stats.enabled=${OPTIMIZER_STATS_ENABLED:false} +optimizer.stats.base-uri=${OPTIMIZER_STATS_BASE_URI:} +optimizer.stats.per-attempt-timeout-ms=${OPTIMIZER_STATS_PER_ATTEMPT_TIMEOUT_MS:1000} +optimizer.stats.max-attempts=${OPTIMIZER_STATS_MAX_ATTEMPTS:3} \ No newline at end of file diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java index 44467b705..648ae1871 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java @@ -11,11 +11,14 @@ import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.dto.mapper.TablesMapper; import com.linkedin.openhouse.tables.dto.mapper.TablesMapperImpl; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; import com.linkedin.openhouse.tables.services.IcebergSnapshotsService; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; +import java.util.Collections; import java.util.Optional; import java.util.UUID; import org.apache.iceberg.exceptions.BadRequestException; @@ -45,6 +48,13 @@ public class IcebergSnapshotsServiceTest { @MockBean private TableUUIDGenerator tableUUIDGenerator; + /** + * Forces the optional post-commit dispatcher into the service so that we can verify it is invoked + * after a successful save. Production wiring is conditional on {@code + * tables.postcommit.enabled=true}; this {@code @MockBean} bypasses that condition. + */ + @MockBean private PostCommitDispatcher postCommitDispatcher; + private OpenHouseInternalRepository mockRepository; @Captor ArgumentCaptor tableDtoArgumentCaptor; @@ -90,6 +100,32 @@ public void testTableCreated() { verifyCalls(key, TEST_TABLE_CREATOR, requestBody.getCreateUpdateTableRequestBody()); } + @Test + public void testPostCommitDispatcherInvokedAfterSuccessfulCommit() { + final IcebergSnapshotsRequestBody requestBody = + TEST_ICEBERG_SNAPSHOTS_INITIAL_VERSION_REQUEST_BODY; + final String dbId = requestBody.getCreateUpdateTableRequestBody().getDatabaseId(); + final String tableId = requestBody.getCreateUpdateTableRequestBody().getTableId(); + final TableDtoPrimaryKey key = + TableDtoPrimaryKey.builder().databaseId(dbId).tableId(tableId).build(); + final CurrentSnapshotInfo snapshot = + CurrentSnapshotInfo.builder() + .snapshotId(42L) + .summary(Collections.singletonMap("total-data-files", "7")) + .build(); + final TableDto savedDto = + TableDto.builder().databaseId(dbId).tableId(tableId).currentSnapshot(snapshot).build(); + + Mockito.when(tableUUIDGenerator.generateUUID(Mockito.any(IcebergSnapshotsRequestBody.class))) + .thenReturn(UUID.randomUUID()); + Mockito.when(mockRepository.findById(key)).thenReturn(Optional.empty()); + Mockito.when(mockRepository.save(Mockito.any(TableDto.class))).thenReturn(savedDto); + + service.putIcebergSnapshots(dbId, tableId, requestBody, TEST_TABLE_CREATOR); + + Mockito.verify(postCommitDispatcher, Mockito.times(1)).dispatch(savedDto); + } + @Test public void testPutTableExceptionHandling() { final IcebergSnapshotsRequestBody requestBody = diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java index e3e19a7d8..53fb633e3 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java @@ -33,7 +33,8 @@ public class TableDtoMappingTest { "jsonSnapshots", "snapshotRefs", "policies", - "tableType"); + "tableType", + "currentSnapshot"); /** Making all fields making it to map is expected, and all expected field are making it there. */ @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java new file mode 100644 index 000000000..417db2abf --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java @@ -0,0 +1,211 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; +import com.linkedin.openhouse.tables.model.TableDto; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +class OptimizerStatsPostCommitOperationTest { + + private static final String TABLE_UUID = "uuid-1"; + private static final String DB = "db1"; + private static final String TABLE = "tbl1"; + private static final long SNAPSHOT_ID = 9876543210L; + private static final Duration BLOCK_MAX = Duration.ofSeconds(5); + + private MockWebServer server; + private OptimizerStatsProperties properties; + private OptimizerStatsPostCommitOperation op; + private final ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + void setUp() throws IOException { + server = new MockWebServer(); + server.start(); + properties = new OptimizerStatsProperties(); + properties.setEnabled(true); + properties.setBaseUri(server.url("/").toString()); + properties.setPerAttemptTimeoutMs(1000L); + properties.setMaxAttempts(3); + op = new OptimizerStatsPostCommitOperation(buildWebClient(), properties); + } + + @AfterEach + void tearDown() throws IOException { + server.shutdown(); + } + + private WebClient buildWebClient() { + return WebClient.builder() + .baseUrl(properties.getBaseUri()) + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .build(); + } + + @Test + void name_isStableTagValue() { + assertThat(op.name()).isEqualTo("optimizer_stats"); + } + + @Test + void prepare_optedIn_emitsRequestWithFullPayload() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200)); + + TableDto saved = + optedInBuilder() + .tableVersion("v3") + .tableLocation("/data/tables/db1/tbl1") + .currentSnapshot(snapshot()) + .build(); + + Optional> work = op.prepare(saved); + assertThat(work).isPresent(); + work.get().block(BLOCK_MAX); + + RecordedRequest req = server.takeRequest(2, TimeUnit.SECONDS); + assertThat(req).isNotNull(); + assertThat(req.getMethod()).isEqualTo("PUT"); + assertThat(req.getPath()).isEqualTo("/v1/optimizer/stats/" + TABLE_UUID); + + @SuppressWarnings("unchecked") + Map body = mapper.readValue(req.getBody().readUtf8(), Map.class); + assertThat(body).containsEntry("databaseName", DB).containsEntry("tableName", TABLE); + @SuppressWarnings("unchecked") + Map stats = (Map) body.get("stats"); + @SuppressWarnings("unchecked") + Map snapshot = (Map) stats.get("snapshot"); + @SuppressWarnings("unchecked") + Map delta = (Map) stats.get("delta"); + assertThat(snapshot) + .containsEntry("snapshotId", SNAPSHOT_ID) + .containsEntry("tableVersion", "v3") + .containsEntry("tableLocation", "/data/tables/db1/tbl1") + .containsEntry("tableSizeBytes", 4096) + .containsEntry("numCurrentFiles", 12); + assertThat(delta) + .containsEntry("numFilesAdded", 5) + .containsEntry("numFilesDeleted", 2) + .containsEntry("addedSizeBytes", 2048) + .containsEntry("deletedSizeBytes", 1024); + } + + @Test + void prepare_notOptedIn_returnsEmpty() { + TableDto saved = builderWithoutOptIn().currentSnapshot(snapshot()).build(); + assertThat(op.prepare(saved)).isEmpty(); + assertThat(server.getRequestCount()).isZero(); + } + + @Test + void prepare_noSnapshot_returnsEmpty() { + TableDto saved = optedInBuilder().currentSnapshot(null).build(); + assertThat(op.prepare(saved)).isEmpty(); + assertThat(server.getRequestCount()).isZero(); + } + + @Test + void prepare_5xxThenSuccess_retriesAndCompletes() { + server.enqueue(new MockResponse().setResponseCode(503)); + server.enqueue(new MockResponse().setResponseCode(200)); + + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + op.prepare(saved).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isEqualTo(2); + } + + @Test + void prepare_allAttemptsFail_propagatesUnderlyingError() { + properties.setMaxAttempts(2); + server.enqueue(new MockResponse().setResponseCode(503)); + server.enqueue(new MockResponse().setResponseCode(503)); + + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + Mono chain = op.prepare(saved).orElseThrow(AssertionError::new); + + assertThatChainErrors(chain, WebClientResponseException.class); + assertThat(server.getRequestCount()).isEqualTo(2); + } + + @Test + void prepare_4xxNonRetryable_doesNotRetry() { + server.enqueue(new MockResponse().setResponseCode(400)); + + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + Mono chain = op.prepare(saved).orElseThrow(AssertionError::new); + + assertThatChainErrors(chain, WebClientResponseException.class); + assertThat(server.getRequestCount()).isEqualTo(1); + } + + // ---- helpers ---- + + private void assertThatChainErrors(Mono chain, Class errorType) { + try { + chain.block(BLOCK_MAX); + throw new AssertionError("expected chain to signal an error of type " + errorType); + } catch (RuntimeException e) { + Throwable cause = unwrap(e); + assertThat(cause).isInstanceOf(errorType); + } + } + + private static Throwable unwrap(Throwable t) { + Throwable cur = t; + while (cur.getCause() != null && cur.getCause() != cur) { + cur = cur.getCause(); + } + return cur; + } + + private static CurrentSnapshotInfo snapshot() { + return CurrentSnapshotInfo.builder().snapshotId(SNAPSHOT_ID).summary(summary()).build(); + } + + private static Map summary() { + Map s = new HashMap<>(); + s.put("total-data-files", "12"); + s.put("total-files-size", "4096"); + s.put("added-data-files", "5"); + s.put("deleted-data-files", "2"); + s.put("added-files-size", "2048"); + s.put("removed-files-size", "1024"); + return s; + } + + private static TableDto.TableDtoBuilder optedInBuilder() { + Map props = new HashMap<>(); + props.put(OptimizerStatsPostCommitOperation.OPT_IN_PROPERTY, "true"); + return TableDto.builder() + .tableUUID(TABLE_UUID) + .databaseId(DB) + .tableId(TABLE) + .tableProperties(props); + } + + private static TableDto.TableDtoBuilder builderWithoutOptIn() { + return TableDto.builder() + .tableUUID(TABLE_UUID) + .databaseId(DB) + .tableId(TABLE) + .tableProperties(new HashMap<>()); + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java new file mode 100644 index 000000000..14b6f2dde --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java @@ -0,0 +1,177 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.time.Duration; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +class PostCommitDispatcherTest { + + private static final Duration BLOCK_MAX = Duration.ofSeconds(5); + + private MeterRegistry meterRegistry; + private PostCommitProperties properties; + private final TableDto savedDto = TableDto.builder().databaseId("db").tableId("t").build(); + + @BeforeEach + void setUp() { + meterRegistry = new SimpleMeterRegistry(); + properties = new PostCommitProperties(); + properties.setEnabled(true); + properties.setPerOpTimeoutMs(500L); + } + + private PostCommitDispatcher dispatcherWith(PostCommitOperation... ops) { + return new PostCommitDispatcher(Arrays.asList(ops), properties, meterRegistry); + } + + @Test + void dispatch_invokesEveryRegisteredOperationsPrepare() { + AtomicInteger aCount = new AtomicInteger(); + AtomicInteger bCount = new AtomicInteger(); + dispatcherWith(new EmptyOp("a", aCount), new EmptyOp("b", bCount)).dispatch(savedDto); + + assertThat(aCount.get()).isEqualTo(1); + assertThat(bCount.get()).isEqualTo(1); + } + + @Test + void decorate_emptyPrepare_incrementsSkippedAndReturnsEmpty() { + PostCommitOperation op = new EmptyOp("opx", new AtomicInteger()); + + Optional> decorated = dispatcherWith(op).decorate(op, savedDto); + + assertThat(decorated).isEmpty(); + assertThat(meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", "opx").count()) + .isEqualTo(1.0); + } + + @Test + void decorate_successfulWork_recordsDurationWithSuccessOutcome() { + PostCommitOperation op = new SimpleOp("ok", Mono::empty); + + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .timer(MetricsConstant.POSTCOMMIT_OP_DURATION, "op", "ok", "outcome", "success") + .count()) + .isEqualTo(1L); + } + + @Test + void decorate_workSignalsError_incrementsFailedWithClassifiedOutcomeAndSwallowsError() { + RuntimeException boom = new RuntimeException("boom"); + PostCommitOperation op = new SimpleOp("broke", () -> Mono.error(boom)); + + // Dispatcher swallows the error — block() returns normally. + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "broke", "outcome", "unknown_error") + .count()) + .isEqualTo(1.0); + } + + @Test + void decorate_prepareThrowsSynchronously_incrementsFailedAndDispatchContinuesToLaterOps() { + AtomicInteger laterCount = new AtomicInteger(); + PostCommitOperation thrower = + new PostCommitOperation() { + @Override + public String name() { + return "thrower"; + } + + @Override + public Optional> prepare(TableDto dto) { + throw new IllegalStateException("nope"); + } + }; + PostCommitOperation later = new EmptyOp("later", laterCount); + + dispatcherWith(thrower, later).dispatch(savedDto); + + assertThat( + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, + "op", + "thrower", + "outcome", + "prepare_threw") + .count()) + .isEqualTo(1.0); + assertThat(laterCount.get()) + .as("later operations must still run after a synchronous prepare() throw") + .isEqualTo(1); + } + + @Test + void decorate_workExceedsPerOpTimeout_incrementsFailedWithTimeoutOutcome() { + properties.setPerOpTimeoutMs(50L); + PostCommitOperation op = new SimpleOp("slow", Mono::never); + + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter(MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "slow", "outcome", "timeout") + .count()) + .isEqualTo(1.0); + } + + // ---- helpers ---- + + private static class EmptyOp implements PostCommitOperation { + private final String name; + private final AtomicInteger calls; + + EmptyOp(String name, AtomicInteger calls) { + this.name = name; + this.calls = calls; + } + + @Override + public String name() { + return name; + } + + @Override + public Optional> prepare(TableDto dto) { + calls.incrementAndGet(); + return Optional.empty(); + } + } + + private static class SimpleOp implements PostCommitOperation { + private final String name; + private final java.util.function.Supplier> work; + + SimpleOp(String name, java.util.function.Supplier> work) { + this.name = name; + this.work = work; + } + + @Override + public String name() { + return name; + } + + @Override + public Optional> prepare(TableDto dto) { + return Optional.of(work.get()); + } + } +}