From b10cc4544ec566c281bd983a76050748f6c3eaf4 Mon Sep 17 00:00:00 2001 From: James Wang Date: Sun, 24 May 2026 16:35:32 -0700 Subject: [PATCH 1/5] add table properties to audit event --- .../tables/audit/TableAuditAspect.java | 8 ++++- .../tables/audit/model/TableAuditEvent.java | 3 ++ .../config/InternalCatalogProperties.java | 10 ++++++ .../IcebergSnapshotsApiHandlerAuditTest.java | 31 +++++++++++++++++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index d63aa224e..a960ee063 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -371,12 +371,18 @@ protected ApiResponse auditPutIcebergSnapshots( } else { operationType = OperationType.COMMIT; } + CreateUpdateTableRequestBody createUpdateTableRequestBody = + icebergSnapshotRequestBody.getCreateUpdateTableRequestBody(); TableAuditEvent.TableAuditEventBuilder eventBuilder = TableAuditEvent.builder() .eventTimestamp(Instant.now()) .databaseName(databaseId) .tableName(tableId) - .operationType(operationType); + .operationType(operationType) + .tableProperties( + createUpdateTableRequestBody == null + ? null + : createUpdateTableRequestBody.getTableProperties()); extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder); TableAuditEvent event = eventBuilder.build(); try { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java index 8d797b625..27a1eef37 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java @@ -2,6 +2,7 @@ import com.linkedin.openhouse.common.audit.model.BaseAuditEvent; import java.time.Instant; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -40,4 +41,6 @@ public class TableAuditEvent extends BaseAuditEvent { private Long currentSnapshotId; private Long currentSnapshotTimestampMs; + + private Map tableProperties; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 0b080edda..899792f4a 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -1,6 +1,8 @@ package com.linkedin.openhouse.tables.config; import java.time.Duration; +import java.util.Collections; +import java.util.List; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -13,6 +15,8 @@ public class InternalCatalogProperties { private MetadataCache metadataCache = new MetadataCache(); + private Audit audit = new Audit(); + @Getter @Setter public static class MetadataCache { @@ -20,4 +24,10 @@ public static class MetadataCache { private Duration ttl; private DataSize maxWeight; } + + @Getter + @Setter + public static class Audit { + private List tablePropertiesAllowlist = Collections.emptyList(); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 0ae65c793..bfed41ec9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -184,6 +184,37 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); } + @Test + public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { + Map properties = new HashMap<>(); + properties.put("openhouse.watermark", "100"); + properties.put("openhouse.tableType", "PRIMARY_TABLE"); + properties.put("user.custom.key", "v"); + IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion(base.getBaseTableVersion()) + .jsonSnapshots(base.getJsonSnapshots()) + .snapshotRefs(base.getSnapshotRefs()) + .createUpdateTableRequestBody( + base.getCreateUpdateTableRequestBody() + .toBuilder() + .tableProperties(properties) + .build()) + .build(); + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals(properties, actualEvent.getTableProperties()); + } + @Test public void testCTASCommitPhase() throws Exception { mvc.perform( From 3f98e7d82f9be9ca90556c9022ace4ddf4eb6b33 Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 26 May 2026 12:06:22 -0700 Subject: [PATCH 2/5] add filterTableProperties and read tableProperties from response --- .../groovy/openhouse.maven-publish.gradle | 2 +- .../tables/audit/TableAuditAspect.java | 47 +++++++++++++++---- .../mock/MockIcebergSnapshotApiHandler.java | 13 ++++- .../IcebergSnapshotsApiHandlerAuditTest.java | 23 ++++++--- 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle index d6c3f40d2..45f7b3b3d 100644 --- a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle +++ b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle @@ -13,7 +13,7 @@ task javadocJar(type: Jar) { } tasks.withType(GenerateModuleMetadata) { - enabled = false + enabled = project.version.toString().endsWith('-SNAPSHOT') } [jar, sourcesJar, javadocJar].each { task -> diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index a960ee063..8a5d9d79b 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -17,7 +17,9 @@ import com.linkedin.openhouse.tables.audit.model.OperationStatus; import com.linkedin.openhouse.tables.audit.model.OperationType; import com.linkedin.openhouse.tables.audit.model.TableAuditEvent; +import com.linkedin.openhouse.tables.config.InternalCatalogProperties; import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -44,6 +46,8 @@ public class TableAuditAspect { @Autowired private AuditHandler tableAuditHandler; + @Autowired private InternalCatalogProperties internalCatalogProperties; + /** * Install the Around advice for getTable() method in OpenHouseTablesApiHandler. * @@ -371,26 +375,27 @@ protected ApiResponse auditPutIcebergSnapshots( } else { operationType = OperationType.COMMIT; } - CreateUpdateTableRequestBody createUpdateTableRequestBody = - icebergSnapshotRequestBody.getCreateUpdateTableRequestBody(); TableAuditEvent.TableAuditEventBuilder eventBuilder = TableAuditEvent.builder() .eventTimestamp(Instant.now()) .databaseName(databaseId) .tableName(tableId) - .operationType(operationType) - .tableProperties( - createUpdateTableRequestBody == null - ? null - : createUpdateTableRequestBody.getTableProperties()); + .operationType(operationType); extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder); - TableAuditEvent event = eventBuilder.build(); try { result = (ApiResponse) point.proceed(); + // Read tableProperties from the response, not the request body: OpenHouse mutates + // properties server-side during commit (e.g. openhouse.tableVersion, + // openhouse.lastModifiedTime), and the audit event should reflect the committed state. + TableAuditEvent event = + eventBuilder + .tableProperties(filterTableProperties(result.getResponseBody().getTableProperties())) + .build(); buildAndSendEvent( event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation()); } catch (Throwable t) { - buildAndSendEvent(event, OperationStatus.FAILED, null); + // On failure there is no committed state to read from, so tableProperties stays null. + buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null); throw t; } return result; @@ -535,6 +540,30 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( return result; } + /** + * Narrows the request-body table properties down to the configured allowlist ({@code + * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is + * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the + * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. + */ + private Map filterTableProperties(Map source) { + if (source == null || source.isEmpty()) { + return null; + } + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + if (allowlist == null || allowlist.isEmpty()) { + return null; + } + Map filtered = new HashMap<>(); + for (String key : allowlist) { + String value = source.get(key); + if (value != null) { + filtered.put(key, value); + } + } + return filtered.isEmpty() ? null : filtered; + } + private void buildAndSendEvent( TableAuditEvent event, OperationStatus status, String currentTableRoot) { TableAuditEvent completeEvent = diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java index 3695d678c..4ee788e80 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java @@ -26,9 +26,20 @@ public ApiResponse putIcebergSnapshots( .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) .build(); case "d200": + // Echo the request's table properties on the response so the audit aspect can read + // committed state from result.getResponseBody().getTableProperties(). return ApiResponse.builder() .httpStatus(HttpStatus.OK) - .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) + .responseBody( + RequestConstants.TEST_GET_TABLE_RESPONSE_BODY + .toBuilder() + .tableProperties( + icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null + ? null + : icebergSnapshotRequestBody + .getCreateUpdateTableRequestBody() + .getTableProperties()) + .build()) .build(); case "d400": throw new RequestValidationFailureException(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index bfed41ec9..4f0f77a11 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -25,12 +25,18 @@ import org.springframework.http.MediaType; import org.springframework.security.test.context.support.WithMockUser; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; @SpringBootTest @AutoConfigureMockMvc @ContextConfiguration +@TestPropertySource( + properties = { + "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" + }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @Autowired private MockMvc mvc; @@ -185,11 +191,11 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception } @Test - public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { - Map properties = new HashMap<>(); - properties.put("openhouse.watermark", "100"); - properties.put("openhouse.tableType", "PRIMARY_TABLE"); - properties.put("user.custom.key", "v"); + public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Exception { + Map requestProperties = new HashMap<>(); + requestProperties.put("openhouse.watermark", "100"); + requestProperties.put("openhouse.tableType", "PRIMARY_TABLE"); + requestProperties.put("user.custom.key", "v"); IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; IcebergSnapshotsRequestBody requestBody = IcebergSnapshotsRequestBody.builder() @@ -199,7 +205,7 @@ public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { .createUpdateTableRequestBody( base.getCreateUpdateTableRequestBody() .toBuilder() - .tableProperties(properties) + .tableProperties(requestProperties) .build()) .build(); mvc.perform( @@ -212,7 +218,10 @@ public void testPutIcebergSnapshotsCarriesTableProperties() throws Exception { .content(requestBody.toJson())); Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); TableAuditEvent actualEvent = argCaptor.getValue(); - assertEquals(properties, actualEvent.getTableProperties()); + Map expected = new HashMap<>(); + expected.put("openhouse.watermark", "100"); + expected.put("openhouse.tableType", "PRIMARY_TABLE"); + assertEquals(expected, actualEvent.getTableProperties()); } @Test From f94d384c9f4346ab03c37b968bd8c9441b805e72 Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 26 May 2026 15:01:04 -0700 Subject: [PATCH 3/5] add table-property-value-max-length --- .../groovy/openhouse.maven-publish.gradle | 2 +- .../tables/audit/TableAuditAspect.java | 27 ++++++++++---- .../config/InternalCatalogProperties.java | 8 ++++ .../IcebergSnapshotsApiHandlerAuditTest.java | 37 ++++++++++++++++++- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle index 45f7b3b3d..d6c3f40d2 100644 --- a/buildSrc/src/main/groovy/openhouse.maven-publish.gradle +++ b/buildSrc/src/main/groovy/openhouse.maven-publish.gradle @@ -13,7 +13,7 @@ task javadocJar(type: Jar) { } tasks.withType(GenerateModuleMetadata) { - enabled = project.version.toString().endsWith('-SNAPSHOT') + enabled = false } [jar, sourcesJar, javadocJar].each { task -> diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index 8a5d9d79b..f2a9dbf54 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -541,25 +541,38 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( } /** - * Narrows the request-body table properties down to the configured allowlist ({@code - * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is - * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the - * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. + * Narrows the committed table properties down to the configured allowlist ({@code + * cluster.iceberg.tables.audit.table-properties-allowlist}) and drops any value longer than + * {@code cluster.iceberg.tables.audit.table-property-value-max-length} characters. Returns {@code + * null} when there is nothing to emit so downstream audit handlers can skip the field entirely. + * Iterates the allowlist rather than the source so cost is O(|allowlist|) regardless of source + * size. */ private Map filterTableProperties(Map source) { if (source == null || source.isEmpty()) { return null; } - List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + InternalCatalogProperties.Audit auditConfig = internalCatalogProperties.getAudit(); + List allowlist = auditConfig.getTablePropertiesAllowlist(); if (allowlist == null || allowlist.isEmpty()) { return null; } + int maxLength = auditConfig.getTablePropertyValueMaxLength(); Map filtered = new HashMap<>(); for (String key : allowlist) { String value = source.get(key); - if (value != null) { - filtered.put(key, value); + if (value == null) { + continue; } + if (maxLength > 0 && value.length() > maxLength) { + log.warn( + "Dropping table-property '{}' from audit event: value length {} exceeds cap {}", + key, + value.length(), + maxLength); + continue; + } + filtered.put(key, value); } return filtered.isEmpty() ? null : filtered; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 899792f4a..28aaa48ec 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -29,5 +29,13 @@ public static class MetadataCache { @Setter public static class Audit { private List tablePropertiesAllowlist = Collections.emptyList(); + + /** + * Maximum character length of a single allowlisted property value. Values exceeding this are + * dropped from the audit event (with a warning log) to prevent oversized events from blowing + * the Kafka producer's max.request.size budget or OOM-ing downstream consumers. {@code 0} + * disables the cap (current/legacy behavior). + */ + private int tablePropertyValueMaxLength = 0; } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 4f0f77a11..ddb14f8f9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -35,7 +35,8 @@ @TestPropertySource( properties = { "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", - "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType", + "cluster.iceberg.tables.audit.table-property-value-max-length=20" }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @@ -224,6 +225,40 @@ public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Ex assertEquals(expected, actualEvent.getTableProperties()); } + @Test + public void testPutIcebergSnapshotsDropsOversizedTableProperty() throws Exception { + // Cap is 20 chars (set at class level). "openhouse.watermark"="100" stays; + // "openhouse.tableType" with a 30+ char value is dropped. + Map requestProperties = new HashMap<>(); + requestProperties.put("openhouse.watermark", "100"); + requestProperties.put( + "openhouse.tableType", "THIS_VALUE_IS_DEFINITELY_LONGER_THAN_TWENTY_CHARS"); + IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion(base.getBaseTableVersion()) + .jsonSnapshots(base.getJsonSnapshots()) + .snapshotRefs(base.getSnapshotRefs()) + .createUpdateTableRequestBody( + base.getCreateUpdateTableRequestBody() + .toBuilder() + .tableProperties(requestProperties) + .build()) + .build(); + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + assertEquals( + Collections.singletonMap("openhouse.watermark", "100"), actualEvent.getTableProperties()); + } + @Test public void testCTASCommitPhase() throws Exception { mvc.perform( From 0fc323bd52e2e5325e2a0a5da7bd07b85c16af56 Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 26 May 2026 18:13:58 -0700 Subject: [PATCH 4/5] remove table-property-value-max-length --- .../tables/audit/TableAuditAspect.java | 25 +++---------- .../config/InternalCatalogProperties.java | 8 ---- .../IcebergSnapshotsApiHandlerAuditTest.java | 37 +------------------ 3 files changed, 7 insertions(+), 63 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index f2a9dbf54..7f75ce009 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -542,37 +542,24 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( /** * Narrows the committed table properties down to the configured allowlist ({@code - * cluster.iceberg.tables.audit.table-properties-allowlist}) and drops any value longer than - * {@code cluster.iceberg.tables.audit.table-property-value-max-length} characters. Returns {@code - * null} when there is nothing to emit so downstream audit handlers can skip the field entirely. - * Iterates the allowlist rather than the source so cost is O(|allowlist|) regardless of source - * size. + * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is + * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the + * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. */ private Map filterTableProperties(Map source) { if (source == null || source.isEmpty()) { return null; } - InternalCatalogProperties.Audit auditConfig = internalCatalogProperties.getAudit(); - List allowlist = auditConfig.getTablePropertiesAllowlist(); + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); if (allowlist == null || allowlist.isEmpty()) { return null; } - int maxLength = auditConfig.getTablePropertyValueMaxLength(); Map filtered = new HashMap<>(); for (String key : allowlist) { String value = source.get(key); - if (value == null) { - continue; + if (value != null) { + filtered.put(key, value); } - if (maxLength > 0 && value.length() > maxLength) { - log.warn( - "Dropping table-property '{}' from audit event: value length {} exceeds cap {}", - key, - value.length(), - maxLength); - continue; - } - filtered.put(key, value); } return filtered.isEmpty() ? null : filtered; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 28aaa48ec..899792f4a 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -29,13 +29,5 @@ public static class MetadataCache { @Setter public static class Audit { private List tablePropertiesAllowlist = Collections.emptyList(); - - /** - * Maximum character length of a single allowlisted property value. Values exceeding this are - * dropped from the audit event (with a warning log) to prevent oversized events from blowing - * the Kafka producer's max.request.size budget or OOM-ing downstream consumers. {@code 0} - * disables the cap (current/legacy behavior). - */ - private int tablePropertyValueMaxLength = 0; } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index ddb14f8f9..4f0f77a11 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -35,8 +35,7 @@ @TestPropertySource( properties = { "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", - "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType", - "cluster.iceberg.tables.audit.table-property-value-max-length=20" + "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @@ -225,40 +224,6 @@ public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Ex assertEquals(expected, actualEvent.getTableProperties()); } - @Test - public void testPutIcebergSnapshotsDropsOversizedTableProperty() throws Exception { - // Cap is 20 chars (set at class level). "openhouse.watermark"="100" stays; - // "openhouse.tableType" with a 30+ char value is dropped. - Map requestProperties = new HashMap<>(); - requestProperties.put("openhouse.watermark", "100"); - requestProperties.put( - "openhouse.tableType", "THIS_VALUE_IS_DEFINITELY_LONGER_THAN_TWENTY_CHARS"); - IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; - IcebergSnapshotsRequestBody requestBody = - IcebergSnapshotsRequestBody.builder() - .baseTableVersion(base.getBaseTableVersion()) - .jsonSnapshots(base.getJsonSnapshots()) - .snapshotRefs(base.getSnapshotRefs()) - .createUpdateTableRequestBody( - base.getCreateUpdateTableRequestBody() - .toBuilder() - .tableProperties(requestProperties) - .build()) - .build(); - mvc.perform( - MockMvcRequestBuilders.put( - String.format( - CURRENT_MAJOR_VERSION_PREFIX - + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) - .accept(MediaType.APPLICATION_JSON) - .contentType(MediaType.APPLICATION_JSON) - .content(requestBody.toJson())); - Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); - TableAuditEvent actualEvent = argCaptor.getValue(); - assertEquals( - Collections.singletonMap("openhouse.watermark", "100"), actualEvent.getTableProperties()); - } - @Test public void testCTASCommitPhase() throws Exception { mvc.perform( From 3008476e3af055203989c4a51774f8f557f9875a Mon Sep 17 00:00:00 2001 From: James Wang Date: Fri, 29 May 2026 14:15:18 -0700 Subject: [PATCH 5/5] match table property allowlist entries as regex patterns --- .../tables/audit/TableAuditAspect.java | 59 ++++++++++++++++--- .../config/InternalCatalogProperties.java | 14 +++++ .../IcebergSnapshotsApiHandlerAuditTest.java | 11 +++- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index 7f75ce009..468944195 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -19,9 +19,13 @@ import com.linkedin.openhouse.tables.audit.model.TableAuditEvent; import com.linkedin.openhouse.tables.config.InternalCatalogProperties; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; import lombok.extern.slf4j.Slf4j; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotParser; @@ -48,6 +52,13 @@ public class TableAuditAspect { @Autowired private InternalCatalogProperties internalCatalogProperties; + /** + * Cached compiled allowlist patterns. Built lazily on first use from {@code + * cluster.iceberg.tables.audit.table-properties-allowlist}. The aspect is a singleton and the + * config is bound once at startup, so a one-shot lazy init is sufficient. + */ + private volatile List allowlistPatterns; + /** * Install the Around advice for getTable() method in OpenHouseTablesApiHandler. * @@ -542,28 +553,58 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( /** * Narrows the committed table properties down to the configured allowlist ({@code - * cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is - * nothing to emit so downstream audit handlers can skip the field entirely. Iterates the - * allowlist rather than the source so cost is O(|allowlist|) regardless of source size. + * cluster.iceberg.tables.audit.table-properties-allowlist}). Allowlist entries are Java regular + * expressions matched against the property key (full match, via {@link Pattern#matches}). Invalid + * patterns are logged and skipped — they never block audit emission. Returns {@code null} when + * there is nothing to emit so downstream audit handlers can skip the field entirely. */ private Map filterTableProperties(Map source) { if (source == null || source.isEmpty()) { return null; } - List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); - if (allowlist == null || allowlist.isEmpty()) { + List patterns = compiledAllowlistPatterns(); + if (patterns.isEmpty()) { return null; } Map filtered = new HashMap<>(); - for (String key : allowlist) { - String value = source.get(key); - if (value != null) { - filtered.put(key, value); + for (Map.Entry entry : source.entrySet()) { + String key = entry.getKey(); + for (Pattern pattern : patterns) { + if (pattern.matcher(key).matches()) { + filtered.put(key, entry.getValue()); + break; + } } } return filtered.isEmpty() ? null : filtered; } + /** + * Compiles the configured allowlist into {@link Pattern}s once and caches the result. Returns + * {@link Collections#emptyList()} when the allowlist is unset or every pattern is invalid. + */ + private List compiledAllowlistPatterns() { + List cached = allowlistPatterns; + if (cached != null) { + return cached; + } + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + if (allowlist == null || allowlist.isEmpty()) { + allowlistPatterns = Collections.emptyList(); + return allowlistPatterns; + } + List compiled = new ArrayList<>(allowlist.size()); + for (String regex : allowlist) { + try { + compiled.add(Pattern.compile(regex)); + } catch (PatternSyntaxException e) { + log.warn("Skipping invalid table-property allowlist regex '{}': {}", regex, e.getMessage()); + } + } + allowlistPatterns = Collections.unmodifiableList(compiled); + return allowlistPatterns; + } + private void buildAndSendEvent( TableAuditEvent event, OperationStatus status, String currentTableRoot) { TableAuditEvent completeEvent = diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 899792f4a..d79295236 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -28,6 +28,20 @@ public static class MetadataCache { @Getter @Setter public static class Audit { + /** + * Allowlist of property-key regular expressions. A committed table property is included on the + * TableAuditEvent if its key fully matches at least one pattern (Java regex semantics, {@link + * java.util.regex.Pattern#matches}). Default empty = nothing emitted. + * + *

Examples: + * + *

    + *
  • {@code completionWatermark} matches exactly the {@code completionWatermark} key. + *
  • {@code openhouse\..*} matches every property whose key starts with {@code openhouse.}. + *
+ * + *

Invalid patterns are logged and skipped — they do not block audit emission. + */ private List tablePropertiesAllowlist = Collections.emptyList(); } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 4f0f77a11..13d6a90ba 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -34,8 +34,10 @@ @ContextConfiguration @TestPropertySource( properties = { - "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark", - "cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType" + // Regex matching every property whose key begins with "openhouse." — four backslashes in + // the Java source survive Properties.load() unescaping as a single backslash in the bound + // value, which the regex engine then reads as literal-dot. + "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse\\\\..*" }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @@ -191,11 +193,13 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception } @Test - public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Exception { + public void testPutIcebergSnapshotsFiltersTablePropertiesByRegexAllowlist() throws Exception { Map requestProperties = new HashMap<>(); requestProperties.put("openhouse.watermark", "100"); requestProperties.put("openhouse.tableType", "PRIMARY_TABLE"); + requestProperties.put("openhouse.replication.config", "{\"target\":\"war\"}"); requestProperties.put("user.custom.key", "v"); + requestProperties.put("openhousewatermark", "should-not-match"); IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; IcebergSnapshotsRequestBody requestBody = IcebergSnapshotsRequestBody.builder() @@ -221,6 +225,7 @@ public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Ex Map expected = new HashMap<>(); expected.put("openhouse.watermark", "100"); expected.put("openhouse.tableType", "PRIMARY_TABLE"); + expected.put("openhouse.replication.config", "{\"target\":\"war\"}"); assertEquals(expected, actualEvent.getTableProperties()); }