From b10cc4544ec566c281bd983a76050748f6c3eaf4 Mon Sep 17 00:00:00 2001 From: James Wang Date: Sun, 24 May 2026 16:35:32 -0700 Subject: [PATCH 1/5] add table properties to audit event --- .../tables/audit/TableAuditAspect.java | 8 ++++- .../tables/audit/model/TableAuditEvent.java | 3 ++ .../config/InternalCatalogProperties.java | 10 ++++++ .../IcebergSnapshotsApiHandlerAuditTest.java | 31 +++++++++++++++++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index d63aa224e..a960ee063 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -371,12 +371,18 @@ protected ApiResponse auditPutIcebergSnapshots( } else { operationType = OperationType.COMMIT; } + CreateUpdateTableRequestBody createUpdateTableRequestBody = + icebergSnapshotRequestBody.getCreateUpdateTableRequestBody(); TableAuditEvent.TableAuditEventBuilder eventBuilder = TableAuditEvent.builder() .eventTimestamp(Instant.now()) .databaseName(databaseId) .tableName(tableId) - .operationType(operationType); + .operationType(operationType) + .tableProperties( + createUpdateTableRequestBody == null + ? null + : createUpdateTableRequestBody.getTableProperties()); extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder); TableAuditEvent event = eventBuilder.build(); try { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java index 8d797b625..27a1eef37 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java @@ -2,6 +2,7 @@ import com.linkedin.openhouse.common.audit.model.BaseAuditEvent; import java.time.Instant; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -40,4 +41,6 @@ public class TableAuditEvent extends BaseAuditEvent { private Long currentSnapshotId; private Long currentSnapshotTimestampMs; + + private Map tableProperties; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 0b080edda..899792f4a 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -1,6 +1,8 @@ package com.linkedin.openhouse.tables.config; import java.time.Duration; +import java.util.Collections; +import java.util.List; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -13,6 +15,8 @@ public class InternalCatalogProperties { private MetadataCache metadataCache = new MetadataCache(); + private Audit audit = new Audit(); + @Getter @Setter public static class MetadataCache { @@ -20,4 +24,10 @@ public static class MetadataCache { private Duration ttl; private DataSize maxWeight; } + + @Getter + @Setter + public static class Audit { + private List tablePropertiesAllowlist = Collections.emptyList(); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 0ae65c793..bfed41ec9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -184,6 +184,37 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); } + @Test + public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { + Map properties = new HashMap<>(); + properties.put("openhouse.watermark", "100"); + properties.put("openhouse.tableType", "PRIMARY_TABLE"); + properties.put("user.custom.key", "v"); + IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion(base.getBaseTableVersion()) + .jsonSnapshots(base.getJsonSnapshots()) + .snapshotRefs(base.getSnapshotRefs()) + .createUpdateTableRequestBody( + base.getCreateUpdateTableRequestBody() + .toBuilder() + .tableProperties(properties) + .build()) + .build(); + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals(properties, actualEvent.getTableProperties()); + } + @Test public void testCTASCommitPhase() throws Exception { mvc.perform( From 3f98e7d82f9be9ca90556c9022ace4ddf4eb6b33 Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 26 May 2026 12:06:22 -0700 Subject: [PATCH 2/5] add filterTableProperties and read tableProperties from response --- .../groovy/openhouse.maven-publish.gradle | 2 +- .../tables/audit/TableAuditAspect.java | 47 +++++++++++++++---- .../mock/MockIcebergSnapshotApiHandler.java | 13 ++++- .../IcebergSnapshotsApiHandlerAuditTest.java | 23 ++++++--- 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle index d6c3f40d2..45f7b3b3d 100644 --- a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle +++ b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle @@ -13,7 +13,7 @@ task javadocJar(type: Jar) { } tasks.withType(GenerateModuleMetadata) { - enabled = false + enabled = project.version.toString().endsWith('-SNAPSHOT') } [jar, sourcesJar, javadocJar].each { task -> diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index a960ee063..8a5d9d79b 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -17,7 +17,9 @@ import com.linkedin.openhouse.tables.audit.model.OperationStatus; import com.linkedin.openhouse.tables.audit.model.OperationType; import com.linkedin.openhouse.tables.audit.model.TableAuditEvent; +import com.linkedin.openhouse.tables.config.InternalCatalogProperties; import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -44,6 +46,8 @@ public class TableAuditAspect { @Autowired private AuditHandler tableAuditHandler; + @Autowired private InternalCatalogProperties internalCatalogProperties; + /** * Install the Around advice for getTable() method in OpenHouseTablesApiHandler. * @@ -371,26 +375,27 @@ protected ApiResponse auditPutIcebergSnapshots( } else { operationType = OperationType.COMMIT; } - CreateUpdateTableRequestBody createUpdateTableRequestBody = - icebergSnapshotRequestBody.getCreateUpdateTableRequestBody(); TableAuditEvent.TableAuditEventBuilder eventBuilder = TableAuditEvent.builder() .eventTimestamp(Instant.now()) .databaseName(databaseId) .tableName(tableId) - .operationType(operationType) - .tableProperties( - createUpdateTableRequestBody == null - ? null - : createUpdateTableRequestBody.getTableProperties()); + .operationType(operationType); extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder); - TableAuditEvent event = eventBuilder.build(); try { result = (ApiResponse) point.proceed(); + // Read tableProperties from the response, not the request body: OpenHouse mutates + // properties server-side during commit (e.g. openhouse.tableVersion, + // openhouse.lastModifiedTime), and the audit event should reflect the committed state. + TableAuditEvent event = + eventBuilder + .tableProperties(filterTableProperties(result.getResponseBody().getTableProperties())) + .build(); buildAndSendEvent( event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation()); } catch (Throwable t) { - buildAndSendEvent(event, OperationStatus.FAILED, null); + // On failure there is no committed state to read from, so tableProperties stays null. + buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null); throw t; } return result; @@ -535,6 +540,30 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( return result; } + /** + * Narrows the request-body table properties down to the configured allowlist ({@code + * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is + * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the + * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. + */ + private Map filterTableProperties(Map source) { + if (source == null || source.isEmpty()) { + return null; + } + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + if (allowlist == null || allowlist.isEmpty()) { + return null; + } + Map filtered = new HashMap<>(); + for (String key : allowlist) { + String value = source.get(key); + if (value != null) { + filtered.put(key, value); + } + } + return filtered.isEmpty() ? null : filtered; + } + private void buildAndSendEvent( TableAuditEvent event, OperationStatus status, String currentTableRoot) { TableAuditEvent completeEvent = diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java index 3695d678c..4ee788e80 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java @@ -26,9 +26,20 @@ public ApiResponse putIcebergSnapshots( .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) .build(); case "d200": + // Echo the request's table properties on the response so the audit aspect can read + // committed state from result.getResponseBody().getTableProperties(). return ApiResponse.builder() .httpStatus(HttpStatus.OK) - .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) + .responseBody( + RequestConstants.TEST_GET_TABLE_RESPONSE_BODY + .toBuilder() + .tableProperties( + icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null + ? null + : icebergSnapshotRequestBody + .getCreateUpdateTableRequestBody() + .getTableProperties()) + .build()) .build(); case "d400": throw new RequestValidationFailureException(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index bfed41ec9..4f0f77a11 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -25,12 +25,18 @@ import org.springframework.http.MediaType; import org.springframework.security.test.context.support.WithMockUser; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; @SpringBootTest @AutoConfigureMockMvc @ContextConfiguration +@TestPropertySource( + properties = { + "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" + }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @Autowired private MockMvc mvc; @@ -185,11 +191,11 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception } @Test - public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { - Map properties = new HashMap<>(); - properties.put("openhouse.watermark", "100"); - properties.put("openhouse.tableType", "PRIMARY_TABLE"); - properties.put("user.custom.key", "v"); + public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Exception { + Map requestProperties = new HashMap<>(); + requestProperties.put("openhouse.watermark", "100"); + requestProperties.put("openhouse.tableType", "PRIMARY_TABLE"); + requestProperties.put("user.custom.key", "v"); IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; IcebergSnapshotsRequestBody requestBody = IcebergSnapshotsRequestBody.builder() @@ -199,7 +205,7 @@ public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { .createUpdateTableRequestBody( base.getCreateUpdateTableRequestBody() .toBuilder() - .tableProperties(properties) + .tableProperties(requestProperties) .build()) .build(); mvc.perform( @@ -212,7 +218,10 @@ public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { .content(requestBody.toJson())); Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); TableAuditEvent actualEvent = argCaptor.getValue(); - assertEquals(properties, actualEvent.getTableProperties()); + Map expected = new HashMap<>(); + expected.put("openhouse.watermark", "100"); + expected.put("openhouse.tableType", "PRIMARY_TABLE"); + assertEquals(expected, actualEvent.getTableProperties()); } @Test From f94d384c9f4346ab03c37b968bd8c9441b805e72 Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 26 May 2026 15:01:04 -0700 Subject: [PATCH 3/5] add table-property-value-max-length --- .../groovy/openhouse.maven-publish.gradle | 2 +- .../tables/audit/TableAuditAspect.java | 27 ++++++++++---- .../config/InternalCatalogProperties.java | 8 ++++ .../IcebergSnapshotsApiHandlerAuditTest.java | 37 ++++++++++++++++++- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle index 45f7b3b3d..d6c3f40d2 100644 --- a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle +++ b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle @@ -13,7 +13,7 @@ task javadocJar(type: Jar) { } tasks.withType(GenerateModuleMetadata) { - enabled = project.version.toString().endsWith('-SNAPSHOT') + enabled = false } [jar, sourcesJar, javadocJar].each { task -> diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index 8a5d9d79b..f2a9dbf54 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -541,25 +541,38 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( } /** - * Narrows the request-body table properties down to the configured allowlist ({@code - * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is - * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the - * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. + * Narrows the committed table properties down to the configured allowlist ({@code + * cluster.iceberg.tables.audit.table-properties-allowlist}) and drops any value longer than + * {@code cluster.iceberg.tables.audit.table-property-value-max-length} characters. Returns {@code + * null} when there is nothing to emit so downstream audit handlers can skip the field entirely. + * Iterates the allowlist rather than the source so cost is O(|allowlist|) regardless of source + * size. */ private Map filterTableProperties(Map source) { if (source == null || source.isEmpty()) { return null; } - List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + InternalCatalogProperties.Audit auditConfig = internalCatalogProperties.getAudit(); + List allowlist = auditConfig.getTablePropertiesAllowlist(); if (allowlist == null || allowlist.isEmpty()) { return null; } + int maxLength = auditConfig.getTablePropertyValueMaxLength(); Map filtered = new HashMap<>(); for (String key : allowlist) { String value = source.get(key); - if (value != null) { - filtered.put(key, value); + if (value == null) { + continue; } + if (maxLength > 0 && value.length() > maxLength) { + log.warn( + "Dropping table-property '{}' from audit event: value length {} exceeds cap {}", + key, + value.length(), + maxLength); + continue; + } + filtered.put(key, value); } return filtered.isEmpty() ? null : filtered; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 899792f4a..28aaa48ec 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -29,5 +29,13 @@ public static class MetadataCache { @Setter public static class Audit { private List tablePropertiesAllowlist = Collections.emptyList(); + + /** + * Maximum character length of a single allowlisted property value. Values exceeding this are + * dropped from the audit event (with a warning log) to prevent oversized events from blowing + * the Kafka producer's max.request.size budget or OOM-ing downstream consumers. {@code 0} + * disables the cap (current/legacy behavior). + */ + private int tablePropertyValueMaxLength = 0; } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 4f0f77a11..ddb14f8f9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -35,7 +35,8 @@ @TestPropertySource( properties = { "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", - "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType", + "cluster.iceberg.tables.audit.table-property-value-max-length=20" }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @@ -224,6 +225,40 @@ public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Ex assertEquals(expected, actualEvent.getTableProperties()); } + @Test + public void testPutIcebergSnapshotsDropsOversizedTableProperty() throws Exception { + // Cap is 20 chars (set at class level). "openhouse.watermark"="100" stays; + // "openhouse.tableType" with a 30+ char value is dropped. + Map requestProperties = new HashMap<>(); + requestProperties.put("openhouse.watermark", "100"); + requestProperties.put( + "openhouse.tableType", "THIS_VALUE_IS_DEFINITELY_LONGER_THAN_TWENTY_CHARS"); + IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion(base.getBaseTableVersion()) + .jsonSnapshots(base.getJsonSnapshots()) + .snapshotRefs(base.getSnapshotRefs()) + .createUpdateTableRequestBody( + base.getCreateUpdateTableRequestBody() + .toBuilder() + .tableProperties(requestProperties) + .build()) + .build(); + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals( + Collections.singletonMap("openhouse.watermark", "100"), actualEvent.getTableProperties()); + } + @Test public void testCTASCommitPhase() throws Exception { mvc.perform( From 0fc323bd52e2e5325e2a0a5da7bd07b85c16af56 Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 26 May 2026 18:13:58 -0700 Subject: [PATCH 4/5] remove table-property-value-max-length --- .../tables/audit/TableAuditAspect.java | 25 +++---------- .../config/InternalCatalogProperties.java | 8 ---- .../IcebergSnapshotsApiHandlerAuditTest.java | 37 +------------------ 3 files changed, 7 insertions(+), 63 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index f2a9dbf54..7f75ce009 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -542,37 +542,24 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( /** * Narrows the committed table properties down to the configured allowlist ({@code - * cluster.iceberg.tables.audit.table-properties-allowlist}) and drops any value longer than - * {@code cluster.iceberg.tables.audit.table-property-value-max-length} characters. Returns {@code - * null} when there is nothing to emit so downstream audit handlers can skip the field entirely. - * Iterates the allowlist rather than the source so cost is O(|allowlist|) regardless of source - * size. + * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is + * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the + * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. */ private Map filterTableProperties(Map source) { if (source == null || source.isEmpty()) { return null; } - InternalCatalogProperties.Audit auditConfig = internalCatalogProperties.getAudit(); - List allowlist = auditConfig.getTablePropertiesAllowlist(); + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); if (allowlist == null || allowlist.isEmpty()) { return null; } - int maxLength = auditConfig.getTablePropertyValueMaxLength(); Map filtered = new HashMap<>(); for (String key : allowlist) { String value = source.get(key); - if (value == null) { - continue; + if (value != null) { + filtered.put(key, value); } - if (maxLength > 0 && value.length() > maxLength) { - log.warn( - "Dropping table-property '{}' from audit event: value length {} exceeds cap {}", - key, - value.length(), - maxLength); - continue; - } - filtered.put(key, value); } return filtered.isEmpty() ? null : filtered; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 28aaa48ec..899792f4a 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -29,13 +29,5 @@ public static class MetadataCache { @Setter public static class Audit { private List tablePropertiesAllowlist = Collections.emptyList(); - - /** - * Maximum character length of a single allowlisted property value. Values exceeding this are - * dropped from the audit event (with a warning log) to prevent oversized events from blowing - * the Kafka producer's max.request.size budget or OOM-ing downstream consumers. {@code 0} - * disables the cap (current/legacy behavior). - */ - private int tablePropertyValueMaxLength = 0; } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index ddb14f8f9..4f0f77a11 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -35,8 +35,7 @@ @TestPropertySource( properties = { "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", - "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType", - "cluster.iceberg.tables.audit.table-property-value-max-length=20" + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @@ -225,40 +224,6 @@ public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Ex assertEquals(expected, actualEvent.getTableProperties()); } - @Test - public void testPutIcebergSnapshotsDropsOversizedTableProperty() throws Exception { - // Cap is 20 chars (set at class level). "openhouse.watermark"="100" stays; - // "openhouse.tableType" with a 30+ char value is dropped. - Map requestProperties = new HashMap<>(); - requestProperties.put("openhouse.watermark", "100"); - requestProperties.put( - "openhouse.tableType", "THIS_VALUE_IS_DEFINITELY_LONGER_THAN_TWENTY_CHARS"); - IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; - IcebergSnapshotsRequestBody requestBody = - IcebergSnapshotsRequestBody.builder() - .baseTableVersion(base.getBaseTableVersion()) - .jsonSnapshots(base.getJsonSnapshots()) - .snapshotRefs(base.getSnapshotRefs()) - .createUpdateTableRequestBody( - base.getCreateUpdateTableRequestBody() - .toBuilder() - .tableProperties(requestProperties) - .build()) - .build(); - mvc.perform( - MockMvcRequestBuilders.put( - String.format( - CURRENT_MAJOR_VERSION_PREFIX - + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) - .accept(MediaType.APPLICATION_JSON) - .contentType(MediaType.APPLICATION_JSON) - .content(requestBody.toJson())); - Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); - TableAuditEvent actualEvent = argCaptor.getValue(); - assertEquals( - Collections.singletonMap("openhouse.watermark", "100"), actualEvent.getTableProperties()); - } - @Test public void testCTASCommitPhase() throws Exception { mvc.perform( From 189ff882d7d63f020958bd5800168294a1922503 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Thu, 28 May 2026 11:33:53 -0700 Subject: [PATCH 5/5] Add branchRefName to TableAuditEvent for Git for Data observability Table operations against named Iceberg branches are not currently observable: the existing TableAuditEvent records currentSnapshotId and currentSnapshotTimestampMs but has no signal for which branch ref was written. Add branchRefName to TableAuditEvent, populated in extractSnapshotInfo() from the existing snapshotRefs and jsonSnapshots fields on the request body. The committed branch is identified by matching the last snapshot in jsonSnapshots (Iceberg always appends new snapshots chronologically) to the ref that points to it in snapshotRefs. This correctly handles main branch commits, named branch commits where main did not advance, and the case where no main ref is present at all. This is safe to rely on: every request through putIcebergSnapshots has a non-empty jsonSnapshots (the repository layer gates snapshotRefs processing on jsonSnapshots being present), so the last-snapshot invariant holds for all reachable code paths. --- .../tables/audit/TableAuditAspect.java | 38 ++++++----- .../tables/audit/model/TableAuditEvent.java | 2 + .../IcebergSnapshotsApiHandlerAuditTest.java | 63 ++++++++++++++++++- .../model/TableAuditModelConstants.java | 3 + 4 files changed, 88 insertions(+), 18 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index 7f75ce009..03e47806c 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -402,22 +402,38 @@ protected ApiResponse auditPutIcebergSnapshots( } /** - * Extracts snapshot ID and timestamp of the main branch from the request body. The snapshotRefs - * map contains branch name to JSON-serialized SnapshotRef. We read the main branch's snapshot-id - * (this is what Iceberg treats as current-snapshot-id — see TableMetadata.Builder.setRef()) and - * then find the matching snapshot in jsonSnapshots to get its timestamp-ms. + * Extracts snapshot ID, timestamp, and branch ref name from the request body. * - *

Leaves both fields null if the main branch ref is absent (e.g. branch-only commits where - * main didn't advance, or non-commit operations) or if the matching snapshot can't be found. + *

branchRefName is the ref whose snapshot-id matches the last snapshot in jsonSnapshots. + * Iceberg appends snapshots chronologically, so the last one is always the newly-committed + * snapshot, and the ref pointing to it is the branch that was written. + * + *

currentSnapshotId and currentSnapshotTimestampMs track the main branch ref for backwards + * compatibility. They are null when main is absent from snapshotRefs. */ private void extractSnapshotInfo( IcebergSnapshotsRequestBody requestBody, TableAuditEvent.TableAuditEventBuilder eventBuilder) { try { Map snapshotRefs = requestBody.getSnapshotRefs(); - if (snapshotRefs == null) { + List jsonSnapshots = requestBody.getJsonSnapshots(); + if (snapshotRefs == null || jsonSnapshots == null || jsonSnapshots.isEmpty()) { return; } + + // The last snapshot in the list is the one being committed (Iceberg appends chronologically). + // Find which ref points to it — that is the branch being written. + long lastSnapshotId = + SnapshotParser.fromJson(jsonSnapshots.get(jsonSnapshots.size() - 1)).snapshotId(); + for (Map.Entry entry : snapshotRefs.entrySet()) { + if (SnapshotRefParser.fromJson(entry.getValue()).snapshotId() == lastSnapshotId) { + eventBuilder.branchRefName(entry.getKey()); + break; + } + } + + // Extract snapshot ID and timestamp for main branch (backwards-compatible). + // Iterate jsonSnapshots in reverse: main's snapshot is typically the most recent. String mainRefJson = snapshotRefs.get(SnapshotRef.MAIN_BRANCH); if (mainRefJson == null) { return; @@ -425,14 +441,6 @@ private void extractSnapshotInfo( long mainSnapshotId = SnapshotRefParser.fromJson(mainRefJson).snapshotId(); eventBuilder.currentSnapshotId(mainSnapshotId); - // Find the matching snapshot in jsonSnapshots to get its timestamp-ms. Iterate in reverse - // because Iceberg appends snapshots chronologically and main's snapshot is typically the - // most recent. Skip snapshots whose JSON doesn't contain the target id as a cheap - // pre-filter before invoking the JSON parser. - List jsonSnapshots = requestBody.getJsonSnapshots(); - if (jsonSnapshots == null) { - return; - } String mainSnapshotIdStr = Long.toString(mainSnapshotId); for (int i = jsonSnapshots.size() - 1; i >= 0; i--) { String snapshotJson = jsonSnapshots.get(i); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java index 27a1eef37..49a2d980d 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java @@ -42,5 +42,7 @@ public class TableAuditEvent extends BaseAuditEvent { private Long currentSnapshotTimestampMs; + private String branchRefName; + private Map tableProperties; } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 4f0f77a11..0eadb8f60 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -113,11 +113,65 @@ public void testPutIcebergSnapshotsFailedPathStillHasSnapshotInfo() throws Excep assertEquals(1669126937912L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); } + @Test + public void testPutIcebergSnapshotsMainCommitSetsBranchRefNameToMain() throws Exception { + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + assertEquals("main", argCaptor.getValue().getBranchRefName()); + } + + @Test + public void testPutIcebergSnapshotsNamedBranchCommitSetsBranchRefName() throws Exception { + // Realistic named-branch commit: main ref exists but its snapshot is NOT in jsonSnapshots + // (main didn't advance). Only the feature branch got a new snapshot. + String newSnapshotJson = + "{\n" + + " \"snapshot-id\" : 999,\n" + + " \"timestamp-ms\" : 5000,\n" + + " \"summary\" : {\"operation\": \"append\"},\n" + + " \"manifest-list\" : \"/tmp/feature.avro\",\n" + + " \"schema-id\" : 0\n" + + "}"; + Map refs = new HashMap<>(); + refs.put("main", "{\"snapshot-id\":100,\"type\":\"branch\"}"); // main stayed at old snapshot + refs.put("feature", "{\"snapshot-id\":999,\"type\":\"branch\"}"); // feature got new snapshot + + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion("v1") + .jsonSnapshots(Collections.singletonList(newSnapshotJson)) + .snapshotRefs(refs) + .createUpdateTableRequestBody(RequestConstants.TEST_CREATE_TABLE_REQUEST_BODY) + .build(); + + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals("feature", actualEvent.getBranchRefName()); + // main didn't advance, so currentSnapshotId is main's old snapshot and timestamp is null + assertEquals(100L, actualEvent.getCurrentSnapshotId().longValue()); + assertNull(actualEvent.getCurrentSnapshotTimestampMs()); + } + @Test public void testPutIcebergSnapshotsBranchOnlyCommitLeavesSnapshotInfoNull() throws Exception { - // Simulate a branch-only commit where main is absent from snapshotRefs. - // In this case the main branch ref doesn't exist, so currentSnapshotId / - // currentSnapshotTimestampMs should be null. + // Simulate a branch-only commit where main is absent from snapshotRefs entirely. + // currentSnapshotId / currentSnapshotTimestampMs are null (no main), but branchRefName + // is still populated from the ref that received the new snapshot. IcebergSnapshotsRequestBody branchOnlyRequestBody = IcebergSnapshotsRequestBody.builder() .baseTableVersion("v1") @@ -138,6 +192,7 @@ public void testPutIcebergSnapshotsBranchOnlyCommitLeavesSnapshotInfoNull() thro .content(branchOnlyRequestBody.toJson())); Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals("my_branch", actualEvent.getBranchRefName()); assertNull(actualEvent.getCurrentSnapshotId()); assertNull(actualEvent.getCurrentSnapshotTimestampMs()); } @@ -188,6 +243,8 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception TableAuditEvent actualEvent = argCaptor.getValue(); assertEquals(100L, actualEvent.getCurrentSnapshotId().longValue()); assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); + // The last snapshot (200) belongs to feature — that is the committed branch. + assertEquals("feature", actualEvent.getBranchRefName()); } @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java index b7486f617..1a9f5eab1 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableAuditModelConstants.java @@ -225,6 +225,7 @@ public final class TableAuditModelConstants { .operationType(OperationType.COMMIT) .currentSnapshotId(2151407017102313398L) .currentSnapshotTimestampMs(1669126937912L) + .branchRefName("main") .build(); public static final TableAuditEvent TABLE_AUDIT_EVENT_PUT_ICEBERG_SNAPSHOTS_FAILED = @@ -237,6 +238,7 @@ public final class TableAuditModelConstants { .operationType(OperationType.COMMIT) .currentSnapshotId(2151407017102313398L) .currentSnapshotTimestampMs(1669126937912L) + .branchRefName("main") .build(); public static final TableAuditEvent TABLE_AUDIT_EVENT_PUT_ICEBERG_SNAPSHOTS_CTAS = @@ -249,6 +251,7 @@ public final class TableAuditModelConstants { .operationType(OperationType.STAGED_COMMIT) .currentSnapshotId(2151407017102313398L) .currentSnapshotTimestampMs(1669126937912L) + .branchRefName("main") .build(); public static final TableAuditEvent TABLE_AUDIT_EVENT_GET_ALL_DATABASES_SUCCESS =