diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java index d63aa224e..468944195 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/TableAuditAspect.java @@ -17,9 +17,15 @@ import com.linkedin.openhouse.tables.audit.model.OperationStatus; import com.linkedin.openhouse.tables.audit.model.OperationType; import com.linkedin.openhouse.tables.audit.model.TableAuditEvent; +import com.linkedin.openhouse.tables.config.InternalCatalogProperties; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; import lombok.extern.slf4j.Slf4j; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotParser; @@ -44,6 +50,15 @@ public class TableAuditAspect { @Autowired private AuditHandler tableAuditHandler; + @Autowired private InternalCatalogProperties internalCatalogProperties; + + /** + * Cached compiled allowlist patterns. Built lazily on first use from {@code + * cluster.iceberg.tables.audit.table-properties-allowlist}. The aspect is a singleton and the + * config is bound once at startup, so a one-shot lazy init is sufficient. + */ + private volatile List allowlistPatterns; + /** * Install the Around advice for getTable() method in OpenHouseTablesApiHandler. * @@ -378,13 +393,20 @@ protected ApiResponse auditPutIcebergSnapshots( .tableName(tableId) .operationType(operationType); extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder); - TableAuditEvent event = eventBuilder.build(); try { result = (ApiResponse) point.proceed(); + // Read tableProperties from the response, not the request body: OpenHouse mutates + // properties server-side during commit (e.g. openhouse.tableVersion, + // openhouse.lastModifiedTime), and the audit event should reflect the committed state. + TableAuditEvent event = + eventBuilder + .tableProperties(filterTableProperties(result.getResponseBody().getTableProperties())) + .build(); buildAndSendEvent( event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation()); } catch (Throwable t) { - buildAndSendEvent(event, OperationStatus.FAILED, null); + // On failure there is no committed state to read from, so tableProperties stays null. + buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null); throw t; } return result; @@ -529,6 +551,60 @@ protected ApiResponse auditUpdateDatabaseAclPolicies( return result; } + /** + * Narrows the committed table properties down to the configured allowlist ({@code + * cluster.iceberg.tables.audit.table-properties-allowlist}). Allowlist entries are Java regular + * expressions matched against the property key (full match, via {@link Pattern#matches}). Invalid + * patterns are logged and skipped — they never block audit emission. Returns {@code null} when + * there is nothing to emit so downstream audit handlers can skip the field entirely. + */ + private Map filterTableProperties(Map source) { + if (source == null || source.isEmpty()) { + return null; + } + List patterns = compiledAllowlistPatterns(); + if (patterns.isEmpty()) { + return null; + } + Map filtered = new HashMap<>(); + for (Map.Entry entry : source.entrySet()) { + String key = entry.getKey(); + for (Pattern pattern : patterns) { + if (pattern.matcher(key).matches()) { + filtered.put(key, entry.getValue()); + break; + } + } + } + return filtered.isEmpty() ? null : filtered; + } + + /** + * Compiles the configured allowlist into {@link Pattern}s once and caches the result. Returns + * {@link Collections#emptyList()} when the allowlist is unset or every pattern is invalid. + */ + private List compiledAllowlistPatterns() { + List cached = allowlistPatterns; + if (cached != null) { + return cached; + } + List allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist(); + if (allowlist == null || allowlist.isEmpty()) { + allowlistPatterns = Collections.emptyList(); + return allowlistPatterns; + } + List compiled = new ArrayList<>(allowlist.size()); + for (String regex : allowlist) { + try { + compiled.add(Pattern.compile(regex)); + } catch (PatternSyntaxException e) { + log.warn("Skipping invalid table-property allowlist regex '{}': {}", regex, e.getMessage()); + } + } + allowlistPatterns = Collections.unmodifiableList(compiled); + return allowlistPatterns; + } + private void buildAndSendEvent( TableAuditEvent event, OperationStatus status, String currentTableRoot) { TableAuditEvent completeEvent = diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java index 8d797b625..27a1eef37 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/audit/model/TableAuditEvent.java @@ -2,6 +2,7 @@ import com.linkedin.openhouse.common.audit.model.BaseAuditEvent; import java.time.Instant; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -40,4 +41,6 @@ public class TableAuditEvent extends BaseAuditEvent { private Long currentSnapshotId; private Long currentSnapshotTimestampMs; + + private Map tableProperties; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java index 0b080edda..d79295236 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -1,6 +1,8 @@ package com.linkedin.openhouse.tables.config; import java.time.Duration; +import java.util.Collections; +import java.util.List; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -13,6 +15,8 @@ public class InternalCatalogProperties { private MetadataCache metadataCache = new MetadataCache(); + private Audit audit = new Audit(); + @Getter @Setter public static class MetadataCache { @@ -20,4 +24,24 @@ public static class MetadataCache { private Duration ttl; private DataSize maxWeight; } + + @Getter + @Setter + public static class Audit { + /** + * Allowlist of property-key regular expressions. A committed table property is included on the + * TableAuditEvent if its key fully matches at least one pattern (Java regex semantics, {@link + * java.util.regex.Pattern#matches}). Default empty = nothing emitted. + * + *

Examples: + * + *

    + *
  • {@code completionWatermark} matches exactly the {@code completionWatermark} key. + *
  • {@code openhouse\..*} matches every property whose key starts with {@code openhouse.}. + *
+ * + *

Invalid patterns are logged and skipped — they do not block audit emission. + */ + private List tablePropertiesAllowlist = Collections.emptyList(); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java index 3695d678c..4ee788e80 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/MockIcebergSnapshotApiHandler.java @@ -26,9 +26,20 @@ public ApiResponse putIcebergSnapshots( .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) .build(); case "d200": + // Echo the request's table properties on the response so the audit aspect can read + // committed state from result.getResponseBody().getTableProperties(). return ApiResponse.builder() .httpStatus(HttpStatus.OK) - .responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY) + .responseBody( + RequestConstants.TEST_GET_TABLE_RESPONSE_BODY + .toBuilder() + .tableProperties( + icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null + ? null + : icebergSnapshotRequestBody + .getCreateUpdateTableRequestBody() + .getTableProperties()) + .build()) .build(); case "d400": throw new RequestValidationFailureException(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java index 0ae65c793..13d6a90ba 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/audit/IcebergSnapshotsApiHandlerAuditTest.java @@ -25,12 +25,20 @@ import org.springframework.http.MediaType; import org.springframework.security.test.context.support.WithMockUser; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; @SpringBootTest @AutoConfigureMockMvc @ContextConfiguration +@TestPropertySource( + properties = { + // Regex matching every property whose key begins with "openhouse." — four backslashes in + // the Java source survive Properties.load() unescaping as a single backslash in the bound + // value, which the regex engine then reads as literal-dot. + "cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse\\\\..*" + }) @WithMockUser(username = "testUser") public class IcebergSnapshotsApiHandlerAuditTest { @Autowired private MockMvc mvc; @@ -184,6 +192,43 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue()); } + @Test + public void testPutIcebergSnapshotsFiltersTablePropertiesByRegexAllowlist() throws Exception { + Map requestProperties = new HashMap<>(); + requestProperties.put("openhouse.watermark", "100"); + requestProperties.put("openhouse.tableType", "PRIMARY_TABLE"); + requestProperties.put("openhouse.replication.config", "{\"target\":\"war\"}"); + requestProperties.put("user.custom.key", "v"); + requestProperties.put("openhousewatermark", "should-not-match"); + IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY; + IcebergSnapshotsRequestBody requestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion(base.getBaseTableVersion()) + .jsonSnapshots(base.getJsonSnapshots()) + .snapshotRefs(base.getSnapshotRefs()) + .createUpdateTableRequestBody( + base.getCreateUpdateTableRequestBody() + .toBuilder() + .tableProperties(requestProperties) + .build()) + .build(); + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + CURRENT_MAJOR_VERSION_PREFIX + + "/databases/d200/tables/tb1/iceberg/v2/snapshots")) + .accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON) + .content(requestBody.toJson())); + Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture()); + TableAuditEvent actualEvent = argCaptor.getValue(); + Map expected = new HashMap<>(); + expected.put("openhouse.watermark", "100"); + expected.put("openhouse.tableType", "PRIMARY_TABLE"); + expected.put("openhouse.replication.config", "{\"target\":\"war\"}"); + assertEquals(expected, actualEvent.getTableProperties()); + } + @Test public void testCTASCommitPhase() throws Exception { mvc.perform(