Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
import com.linkedin.openhouse.tables.audit.model.OperationStatus;
import com.linkedin.openhouse.tables.audit.model.OperationType;
import com.linkedin.openhouse.tables.audit.model.TableAuditEvent;
import com.linkedin.openhouse.tables.config.InternalCatalogProperties;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotParser;
Expand All @@ -44,6 +50,15 @@ public class TableAuditAspect {

@Autowired private AuditHandler<TableAuditEvent> tableAuditHandler;

@Autowired private InternalCatalogProperties internalCatalogProperties;

/**
* Cached compiled allowlist patterns. Built lazily on first use from {@code
* cluster.iceberg.tables.audit.table-properties-allowlist}. The aspect is a singleton and the
* config is bound once at startup, so a one-shot lazy init is sufficient.
*/
private volatile List<Pattern> allowlistPatterns;

/**
* Install the Around advice for getTable() method in OpenHouseTablesApiHandler.
*
Expand Down Expand Up @@ -378,13 +393,20 @@ protected ApiResponse<GetTableResponseBody> auditPutIcebergSnapshots(
.tableName(tableId)
.operationType(operationType);
extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder);
TableAuditEvent event = eventBuilder.build();
try {
result = (ApiResponse<GetTableResponseBody>) point.proceed();
// Read tableProperties from the response, not the request body: OpenHouse mutates
// properties server-side during commit (e.g. openhouse.tableVersion,
// openhouse.lastModifiedTime), and the audit event should reflect the committed state.
TableAuditEvent event =
eventBuilder
.tableProperties(filterTableProperties(result.getResponseBody().getTableProperties()))
.build();
buildAndSendEvent(
event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation());
} catch (Throwable t) {
buildAndSendEvent(event, OperationStatus.FAILED, null);
// On failure there is no committed state to read from, so tableProperties stays null.
buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null);
throw t;
}
return result;
Expand Down Expand Up @@ -529,6 +551,60 @@ protected ApiResponse<Void> auditUpdateDatabaseAclPolicies(
return result;
}

/**
* Narrows the committed table properties down to the configured allowlist ({@code
* cluster.iceberg.tables.audit.table-properties-allowlist}). Allowlist entries are Java regular
* expressions matched against the property key (full match, via {@link Pattern#matches}). Invalid
* patterns are logged and skipped — they never block audit emission. Returns {@code null} when
* there is nothing to emit so downstream audit handlers can skip the field entirely.
*/
private Map<String, String> filterTableProperties(Map<String, String> source) {
if (source == null || source.isEmpty()) {
return null;
}
List<Pattern> patterns = compiledAllowlistPatterns();
if (patterns.isEmpty()) {
return null;
}
Map<String, String> filtered = new HashMap<>();
for (Map.Entry<String, String> entry : source.entrySet()) {
String key = entry.getKey();
for (Pattern pattern : patterns) {
if (pattern.matcher(key).matches()) {
filtered.put(key, entry.getValue());
break;
}
}
}
return filtered.isEmpty() ? null : filtered;
}

/**
* Compiles the configured allowlist into {@link Pattern}s once and caches the result. Returns
* {@link Collections#emptyList()} when the allowlist is unset or every pattern is invalid.
*/
private List<Pattern> compiledAllowlistPatterns() {
List<Pattern> cached = allowlistPatterns;
if (cached != null) {
return cached;
}
List<String> allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist();
if (allowlist == null || allowlist.isEmpty()) {
allowlistPatterns = Collections.emptyList();
return allowlistPatterns;
}
List<Pattern> compiled = new ArrayList<>(allowlist.size());
for (String regex : allowlist) {
try {
compiled.add(Pattern.compile(regex));
} catch (PatternSyntaxException e) {
log.warn("Skipping invalid table-property allowlist regex '{}': {}", regex, e.getMessage());
}
}
allowlistPatterns = Collections.unmodifiableList(compiled);
return allowlistPatterns;
}

private void buildAndSendEvent(
TableAuditEvent event, OperationStatus status, String currentTableRoot) {
TableAuditEvent completeEvent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.openhouse.common.audit.model.BaseAuditEvent;
import java.time.Instant;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -40,4 +41,6 @@ public class TableAuditEvent extends BaseAuditEvent {
private Long currentSnapshotId;

private Long currentSnapshotTimestampMs;

private Map<String, String> tableProperties;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.openhouse.tables.config;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -13,11 +15,33 @@ public class InternalCatalogProperties {

private MetadataCache metadataCache = new MetadataCache();

private Audit audit = new Audit();

@Getter
@Setter
public static class MetadataCache {
private Boolean enabled;
private Duration ttl;
private DataSize maxWeight;
}

@Getter
@Setter
public static class Audit {
/**
* Allowlist of property-key regular expressions. A committed table property is included on the
* TableAuditEvent if its key fully matches at least one pattern (Java regex semantics, {@link
* java.util.regex.Pattern#matches}). Default empty = nothing emitted.
*
* <p>Examples:
*
* <ul>
* <li>{@code completionWatermark} matches exactly the {@code completionWatermark} key.
* <li>{@code openhouse\..*} matches every property whose key starts with {@code openhouse.}.
* </ul>
*
* <p>Invalid patterns are logged and skipped — they do not block audit emission.
*/
private List<String> tablePropertiesAllowlist = Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ public ApiResponse<GetTableResponseBody> putIcebergSnapshots(
.responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY)
.build();
case "d200":
// Echo the request's table properties on the response so the audit aspect can read
// committed state from result.getResponseBody().getTableProperties().
return ApiResponse.<GetTableResponseBody>builder()
.httpStatus(HttpStatus.OK)
.responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY)
.responseBody(
RequestConstants.TEST_GET_TABLE_RESPONSE_BODY
.toBuilder()
.tableProperties(
icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null
? null
: icebergSnapshotRequestBody
.getCreateUpdateTableRequestBody()
.getTableProperties())
.build())
.build();
case "d400":
throw new RequestValidationFailureException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@
import org.springframework.http.MediaType;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;

@SpringBootTest
@AutoConfigureMockMvc
@ContextConfiguration
@TestPropertySource(
properties = {
// Regex matching every property whose key begins with "openhouse." — four backslashes in
// the Java source survive Properties.load() unescaping as a single backslash in the bound
// value, which the regex engine then reads as literal-dot.
"cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse\\\\..*"
})
@WithMockUser(username = "testUser")
public class IcebergSnapshotsApiHandlerAuditTest {
@Autowired private MockMvc mvc;
Expand Down Expand Up @@ -184,6 +192,43 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception
assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue());
}

@Test
public void testPutIcebergSnapshotsFiltersTablePropertiesByRegexAllowlist() throws Exception {
Map<String, String> requestProperties = new HashMap<>();
requestProperties.put("openhouse.watermark", "100");
requestProperties.put("openhouse.tableType", "PRIMARY_TABLE");
requestProperties.put("openhouse.replication.config", "{\"target\":\"war\"}");
requestProperties.put("user.custom.key", "v");
requestProperties.put("openhousewatermark", "should-not-match");
IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY;
IcebergSnapshotsRequestBody requestBody =
IcebergSnapshotsRequestBody.builder()
.baseTableVersion(base.getBaseTableVersion())
.jsonSnapshots(base.getJsonSnapshots())
.snapshotRefs(base.getSnapshotRefs())
.createUpdateTableRequestBody(
base.getCreateUpdateTableRequestBody()
.toBuilder()
.tableProperties(requestProperties)
.build())
.build();
mvc.perform(
MockMvcRequestBuilders.put(
String.format(
CURRENT_MAJOR_VERSION_PREFIX
+ "/databases/d200/tables/tb1/iceberg/v2/snapshots"))
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(requestBody.toJson()));
Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture());
TableAuditEvent actualEvent = argCaptor.getValue();
Map<String, String> expected = new HashMap<>();
expected.put("openhouse.watermark", "100");
expected.put("openhouse.tableType", "PRIMARY_TABLE");
expected.put("openhouse.replication.config", "{\"target\":\"war\"}");
assertEquals(expected, actualEvent.getTableProperties());
}

@Test
public void testCTASCommitPhase() throws Exception {
mvc.perform(
Expand Down