diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index d63aa224e..03e47806c 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -17,7 +17,9 @@ import com.linkedin.openhouse.tables.audit.model.OperationStatus; import com.linkedin.openhouse.tables.audit.model.OperationType; import com.linkedin.openhouse.tables.audit.model.TableAuditEvent; +import com.linkedin.openhouse.tables.config.InternalCatalogProperties; import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -44,6 +46,8 @@ public class TableAuditAspect { @Autowired private AuditHandler tableAuditHandler; + @Autowired private InternalCatalogProperties internalCatalogProperties; + /** * Install the Around advice for getTable() method in OpenHouseTablesApiHandler. * @@ -378,35 +382,58 @@ protected ApiResponse auditPutIcebergSnapshots( .tableName(tableId) .operationType(operationType); extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder); - TableAuditEvent event = eventBuilder.build(); try { result = (ApiResponse) point.proceed(); + // Read tableProperties from the response, not the request body: OpenHouse mutates + // properties server-side during commit (e.g. openhouse.tableVersion, + // openhouse.lastModifiedTime), and the audit event should reflect the committed state. + TableAuditEvent event = + eventBuilder + .tableProperties(filterTableProperties(result.getResponseBody().getTableProperties())) + .build(); buildAndSendEvent( event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation()); } catch (Throwable t) { - buildAndSendEvent(event, OperationStatus.FAILED, null); + // On failure there is no committed state to read from, so tableProperties stays null. + buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null); throw t; } return result; } /** - * Extracts snapshot ID and timestamp of the main branch from the request body. The snapshotRefs - * map contains branch name to JSON-serialized SnapshotRef. We read the main branch's snapshot-id - * (this is what Iceberg treats as current-snapshot-id — see TableMetadata.Builder.setRef()) and - * then find the matching snapshot in jsonSnapshots to get its timestamp-ms. + * Extracts snapshot ID, timestamp, and branch ref name from the request body. + * + *

branchRefName is the ref whose snapshot-id matches the last snapshot in jsonSnapshots. + * Iceberg appends snapshots chronologically, so the last one is always the newly-committed + * snapshot, and the ref pointing to it is the branch that was written. * - *

Leaves both fields null if the main branch ref is absent (e.g. branch-only commits where - * main didn't advance, or non-commit operations) or if the matching snapshot can't be found. + *

currentSnapshotId and currentSnapshotTimestampMs track the main branch ref for backwards + * compatibility. They are null when main is absent from snapshotRefs. */ private void extractSnapshotInfo( IcebergSnapshotsRequestBody requestBody, TableAuditEvent.TableAuditEventBuilder eventBuilder) { try { Map snapshotRefs = requestBody.getSnapshotRefs(); - if (snapshotRefs == null) { + List jsonSnapshots = requestBody.getJsonSnapshots(); + if (snapshotRefs == null || jsonSnapshots == null || jsonSnapshots.isEmpty()) { return; } + + // The last snapshot in the list is the one being committed (Iceberg appends chronologically). + // Find which ref points to it — that is the branch being written. + long lastSnapshotId = + SnapshotParser.fromJson(jsonSnapshots.get(jsonSnapshots.size() - 1)).snapshotId(); + for (Map.Entry entry : snapshotRefs.entrySet()) { + if (SnapshotRefParser.fromJson(entry.getValue()).snapshotId() == lastSnapshotId) { + eventBuilder.branchRefName(entry.getKey()); + break; + } + } + + // Extract snapshot ID and timestamp for main branch (backwards-compatible). + // Iterate jsonSnapshots in reverse: main's snapshot is typically the most recent. String mainRefJson = snapshotRefs.get(SnapshotRef.MAIN_BRANCH); if (mainRefJson == null) { return; @@ -414,14 +441,6 @@ private void extractSnapshotInfo( long mainSnapshotId = SnapshotRefParser.fromJson(mainRefJson).snapshotId(); eventBuilder.currentSnapshotId(mainSnapshotId); - // Find the matching snapshot in jsonSnapshots to get its timestamp-ms. Iterate in reverse - // because Iceberg appends snapshots chronologically and main's snapshot is typically the - // most recent. Skip snapshots whose JSON doesn't contain the target id as a cheap - // pre-filter before invoking the JSON parser. - List jsonSnapshots = requestBody.getJsonSnapshots(); - if (jsonSnapshots == null) { - return; - } String mainSnapshotIdStr = Long.toString(mainSnapshotId); for (int i = jsonSnapshots.size() - 1; i >= 0; i--) { String snapshotJson = jsonSnapshots.get(i); @@ -529,6 +548,30 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( return result; } + /** + * Narrows the committed table properties down to the configured allowlist ({@code + * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is + * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the + * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. + */ + private Map filterTableProperties(Map source) { + if (source == null || source.isEmpty()) { + return null; + } + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + if (allowlist == null || allowlist.isEmpty()) { + return null; + } + Map filtered = new HashMap<>(); + for (String key : allowlist) { + String value = source.get(key); + if (value != null) { + filtered.put(key, value); + } + } + return filtered.isEmpty() ? null : filtered; + } + private void buildAndSendEvent( TableAuditEvent event, OperationStatus status, String currentTableRoot) { TableAuditEvent completeEvent = diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java index 8d797b625..49a2d980d 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java @@ -2,6 +2,7 @@ import com.linkedin.openhouse.common.audit.model.BaseAuditEvent; import java.time.Instant; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -40,4 +41,8 @@ public class TableAuditEvent extends BaseAuditEvent { private Long currentSnapshotId; private Long currentSnapshotTimestampMs; + + private String branchRefName; + + private Map tableProperties; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 0b080edda..899792f4a 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -1,6 +1,8 @@ package com.linkedin.openhouse.tables.config; import java.time.Duration; +import java.util.Collections; +import java.util.List; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -13,6 +15,8 @@ public class InternalCatalogProperties { private MetadataCache metadataCache = new MetadataCache(); + private Audit audit = new Audit(); + @Getter @Setter public static class MetadataCache { @@ -20,4 +24,10 @@ public static class MetadataCache { private Duration ttl; private DataSize maxWeight; } + + @Getter + @Setter + public static class Audit { + private List tablePropertiesAllowlist = Collections.emptyList(); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java index 3695d678c..4ee788e80 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java @@ -26,9 +26,20 @@ public ApiResponse putIcebergSnapshots( .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) .build(); case "d200": + // Echo the request's table properties on the response so the audit aspect can read + // committed state from result.getResponseBody().getTableProperties(). return ApiResponse.builder() .httpStatus(HttpStatus.OK) - .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) + .responseBody( + RequestConstants.TEST_GET_TABLE_RESPONSE_BODY + .toBuilder() + .tableProperties( + icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null + ? null + : icebergSnapshotRequestBody + .getCreateUpdateTableRequestBody() + .getTableProperties()) + .build()) .build(); case "d400": throw new RequestValidationFailureException(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 0ae65c793..0eadb8f60 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -25,12 +25,18 @@ import org.springframework.http.MediaType; import org.springframework.security.test.context.support.WithMockUser; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; @SpringBootTest @AutoConfigureMockMvc @ContextConfiguration +@TestPropertySource( + properties = { + "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" + }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @Autowired private MockMvc mvc; @@ -107,11 +113,65 @@ public void testPutIcebergSnapshotsFailedPathStillHasSnapshotInfo() throws Excep assertEquals(1669126937912L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); } + @Test + public void testPutIcebergSnapshotsMainCommitSetsBranchRefNameToMain() throws Exception { + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + assertEquals("main", argCaptor.getValue().getBranchRefName()); + } + + @Test + public void testPutIcebergSnapshotsNamedBranchCommitSetsBranchRefName() throws Exception { + // Realistic named-branch commit: main ref exists but its snapshot is NOT in jsonSnapshots + // (main didn't advance). Only the feature branch got a new snapshot. + String newSnapshotJson = + "{\n" + + " \"snapshot-id\" : 999,\n" + + " \"timestamp-ms\" : 5000,\n" + + " \"summary\" : {\"operation\": \"append\"},\n" + + " \"manifest-list\" : \"/tmp/feature.avro\",\n" + + " \"schema-id\" : 0\n" + + "}"; + Map refs = new HashMap<>(); + refs.put("main", "{\"snapshot-id\":100,\"type\":\"branch\"}"); // main stayed at old snapshot + refs.put("feature", "{\"snapshot-id\":999,\"type\":\"branch\"}"); // feature got new snapshot + + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion("v1") + .jsonSnapshots(Collections.singletonList(newSnapshotJson)) + .snapshotRefs(refs) + .createUpdateTableRequestBody(RequestConstants.TEST_CREATE_TABLE_REQUEST_BODY) + .build(); + + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals("feature", actualEvent.getBranchRefName()); + // main didn't advance, so currentSnapshotId is main's old snapshot and timestamp is null + assertEquals(100L, actualEvent.getCurrentSnapshotId().longValue()); + assertNull(actualEvent.getCurrentSnapshotTimestampMs()); + } + @Test public void testPutIcebergSnapshotsBranchOnlyCommitLeavesSnapshotInfoNull() throws Exception { - // Simulate a branch-only commit where main is absent from snapshotRefs. - // In this case the main branch ref doesn't exist, so currentSnapshotId / - // currentSnapshotTimestampMs should be null. + // Simulate a branch-only commit where main is absent from snapshotRefs entirely. + // currentSnapshotId / currentSnapshotTimestampMs are null (no main), but branchRefName + // is still populated from the ref that received the new snapshot. IcebergSnapshotsRequestBody branchOnlyRequestBody = IcebergSnapshotsRequestBody.builder() .baseTableVersion("v1") @@ -132,6 +192,7 @@ public void testPutIcebergSnapshotsBranchOnlyCommitLeavesSnapshotInfoNull() thro .content(branchOnlyRequestBody.toJson())); Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals("my_branch", actualEvent.getBranchRefName()); assertNull(actualEvent.getCurrentSnapshotId()); assertNull(actualEvent.getCurrentSnapshotTimestampMs()); } @@ -182,6 +243,42 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception TableAuditEvent actualEvent = argCaptor.getValue(); assertEquals(100L, actualEvent.getCurrentSnapshotId().longValue()); assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); + // The last snapshot (200) belongs to feature — that is the committed branch. + assertEquals("feature", actualEvent.getBranchRefName()); + } + + @Test + public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Exception { + Map requestProperties = new HashMap<>(); + requestProperties.put("openhouse.watermark", "100"); + requestProperties.put("openhouse.tableType", "PRIMARY_TABLE"); + requestProperties.put("user.custom.key", "v"); + IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion(base.getBaseTableVersion()) + .jsonSnapshots(base.getJsonSnapshots()) + .snapshotRefs(base.getSnapshotRefs()) + .createUpdateTableRequestBody( + base.getCreateUpdateTableRequestBody() + .toBuilder() + .tableProperties(requestProperties) + .build()) + .build(); + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + Map expected = new HashMap<>(); + expected.put("openhouse.watermark", "100"); + expected.put("openhouse.tableType", "PRIMARY_TABLE"); + assertEquals(expected, actualEvent.getTableProperties()); } @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java index b7486f617..1a9f5eab1 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java @@ -225,6 +225,7 @@ public final class TableAuditModelConstants { .operationType(OperationType.COMMIT) .currentSnapshotId(2151407017102313398L) .currentSnapshotTimestampMs(1669126937912L) + .branchRefName("main") .build(); public static final TableAuditEvent TABLE_AUDIT_EVENT_PUT_ICEBERG_SNAPSHOTS_FAILED = @@ -237,6 +238,7 @@ public final class TableAuditModelConstants { .operationType(OperationType.COMMIT) .currentSnapshotId(2151407017102313398L) .currentSnapshotTimestampMs(1669126937912L) + .branchRefName("main") .build(); public static final TableAuditEvent TABLE_AUDIT_EVENT_PUT_ICEBERG_SNAPSHOTS_CTAS = @@ -249,6 +251,7 @@ public final class TableAuditModelConstants { .operationType(OperationType.STAGED_COMMIT) .currentSnapshotId(2151407017102313398L) .currentSnapshotTimestampMs(1669126937912L) + .branchRefName("main") .build(); public static final TableAuditEvent TABLE_AUDIT_EVENT_GET_ALL_DATABASES_SUCCESS =