Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import com.linkedin.openhouse.tables.audit.model.OperationStatus;
import com.linkedin.openhouse.tables.audit.model.OperationType;
import com.linkedin.openhouse.tables.audit.model.TableAuditEvent;
import com.linkedin.openhouse.tables.config.InternalCatalogProperties;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -44,6 +46,8 @@ public class TableAuditAspect {

@Autowired private AuditHandler<TableAuditEvent> tableAuditHandler;

@Autowired private InternalCatalogProperties internalCatalogProperties;

/**
* Install the Around advice for getTable() method in OpenHouseTablesApiHandler.
*
Expand Down Expand Up @@ -378,50 +382,65 @@ protected ApiResponse<GetTableResponseBody> auditPutIcebergSnapshots(
.tableName(tableId)
.operationType(operationType);
extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder);
TableAuditEvent event = eventBuilder.build();
try {
result = (ApiResponse<GetTableResponseBody>) point.proceed();
// Read tableProperties from the response, not the request body: OpenHouse mutates
// properties server-side during commit (e.g. openhouse.tableVersion,
// openhouse.lastModifiedTime), and the audit event should reflect the committed state.
TableAuditEvent event =
eventBuilder
.tableProperties(filterTableProperties(result.getResponseBody().getTableProperties()))
.build();
buildAndSendEvent(
event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation());
} catch (Throwable t) {
buildAndSendEvent(event, OperationStatus.FAILED, null);
// On failure there is no committed state to read from, so tableProperties stays null.
buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null);
throw t;
}
return result;
}

/**
* Extracts snapshot ID and timestamp of the main branch from the request body. The snapshotRefs
* map contains branch name to JSON-serialized SnapshotRef. We read the main branch's snapshot-id
* (this is what Iceberg treats as current-snapshot-id — see TableMetadata.Builder.setRef()) and
* then find the matching snapshot in jsonSnapshots to get its timestamp-ms.
* Extracts snapshot ID, timestamp, and branch ref name from the request body.
*
* <p>branchRefName is the ref whose snapshot-id matches the last snapshot in jsonSnapshots.
* Iceberg appends snapshots chronologically, so the last one is always the newly-committed
* snapshot, and the ref pointing to it is the branch that was written.
*
* <p>Leaves both fields null if the main branch ref is absent (e.g. branch-only commits where
* main didn't advance, or non-commit operations) or if the matching snapshot can't be found.
* <p>currentSnapshotId and currentSnapshotTimestampMs track the main branch ref for backwards
* compatibility. They are null when main is absent from snapshotRefs.
*/
private void extractSnapshotInfo(
IcebergSnapshotsRequestBody requestBody,
TableAuditEvent.TableAuditEventBuilder eventBuilder) {
try {
Map<String, String> snapshotRefs = requestBody.getSnapshotRefs();
if (snapshotRefs == null) {
List<String> jsonSnapshots = requestBody.getJsonSnapshots();
if (snapshotRefs == null || jsonSnapshots == null || jsonSnapshots.isEmpty()) {
return;
}

// The last snapshot in the list is the one being committed (Iceberg appends chronologically).
// Find which ref points to it — that is the branch being written.
long lastSnapshotId =
SnapshotParser.fromJson(jsonSnapshots.get(jsonSnapshots.size() - 1)).snapshotId();
for (Map.Entry<String, String> entry : snapshotRefs.entrySet()) {
if (SnapshotRefParser.fromJson(entry.getValue()).snapshotId() == lastSnapshotId) {
eventBuilder.branchRefName(entry.getKey());
break;
}
}

// Extract snapshot ID and timestamp for main branch (backwards-compatible).
// Iterate jsonSnapshots in reverse: main's snapshot is typically the most recent.
String mainRefJson = snapshotRefs.get(SnapshotRef.MAIN_BRANCH);
if (mainRefJson == null) {
return;
}
long mainSnapshotId = SnapshotRefParser.fromJson(mainRefJson).snapshotId();
eventBuilder.currentSnapshotId(mainSnapshotId);

// Find the matching snapshot in jsonSnapshots to get its timestamp-ms. Iterate in reverse
// because Iceberg appends snapshots chronologically and main's snapshot is typically the
// most recent. Skip snapshots whose JSON doesn't contain the target id as a cheap
// pre-filter before invoking the JSON parser.
List<String> jsonSnapshots = requestBody.getJsonSnapshots();
if (jsonSnapshots == null) {
return;
}
String mainSnapshotIdStr = Long.toString(mainSnapshotId);
for (int i = jsonSnapshots.size() - 1; i >= 0; i--) {
String snapshotJson = jsonSnapshots.get(i);
Expand Down Expand Up @@ -529,6 +548,30 @@ protected ApiResponse<Void> auditUpdateDatabaseAclPolicies(
return result;
}

/**
* Narrows the committed table properties down to the configured allowlist ({@code
* cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is
* nothing to emit so downstream audit handlers can skip the field entirely. Iterates the
* allowlist rather than the source so cost is O(|allowlist|) regardless of source size.
*/
private Map<String, String> filterTableProperties(Map<String, String> source) {
if (source == null || source.isEmpty()) {
return null;
}
List<String> allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist();
if (allowlist == null || allowlist.isEmpty()) {
return null;
}
Map<String, String> filtered = new HashMap<>();
for (String key : allowlist) {
String value = source.get(key);
if (value != null) {
filtered.put(key, value);
}
}
return filtered.isEmpty() ? null : filtered;
}

private void buildAndSendEvent(
TableAuditEvent event, OperationStatus status, String currentTableRoot) {
TableAuditEvent completeEvent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.openhouse.common.audit.model.BaseAuditEvent;
import java.time.Instant;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -40,4 +41,8 @@ public class TableAuditEvent extends BaseAuditEvent {
private Long currentSnapshotId;

private Long currentSnapshotTimestampMs;

private String branchRefName;

private Map<String, String> tableProperties;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.openhouse.tables.config;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -13,11 +15,19 @@ public class InternalCatalogProperties {

private MetadataCache metadataCache = new MetadataCache();

private Audit audit = new Audit();

@Getter
@Setter
public static class MetadataCache {
private Boolean enabled;
private Duration ttl;
private DataSize maxWeight;
}

@Getter
@Setter
public static class Audit {
private List<String> tablePropertiesAllowlist = Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ public ApiResponse<GetTableResponseBody> putIcebergSnapshots(
.responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY)
.build();
case "d200":
// Echo the request's table properties on the response so the audit aspect can read
// committed state from result.getResponseBody().getTableProperties().
return ApiResponse.<GetTableResponseBody>builder()
.httpStatus(HttpStatus.OK)
.responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY)
.responseBody(
RequestConstants.TEST_GET_TABLE_RESPONSE_BODY
.toBuilder()
.tableProperties(
icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null
? null
: icebergSnapshotRequestBody
.getCreateUpdateTableRequestBody()
.getTableProperties())
.build())
.build();
case "d400":
throw new RequestValidationFailureException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@
import org.springframework.http.MediaType;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;

@SpringBootTest
@AutoConfigureMockMvc
@ContextConfiguration
@TestPropertySource(
properties = {
"cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark",
"cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType"
})
@WithMockUser(username = "testUser")
public class IcebergSnapshotsApiHandlerAuditTest {
@Autowired private MockMvc mvc;
Expand Down Expand Up @@ -107,11 +113,65 @@ public void testPutIcebergSnapshotsFailedPathStillHasSnapshotInfo() throws Excep
assertEquals(1669126937912L, actualEvent.getCurrentSnapshotTimestampMs().longValue());
}

@Test
public void testPutIcebergSnapshotsMainCommitSetsBranchRefNameToMain() throws Exception {
mvc.perform(
MockMvcRequestBuilders.put(
String.format(
CURRENT_MAJOR_VERSION_PREFIX
+ "/databases/d200/tables/tb1/iceberg/v2/snapshots"))
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY.toJson()));
Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture());
assertEquals("main", argCaptor.getValue().getBranchRefName());
}

@Test
public void testPutIcebergSnapshotsNamedBranchCommitSetsBranchRefName() throws Exception {
// Realistic named-branch commit: main ref exists but its snapshot is NOT in jsonSnapshots
// (main didn't advance). Only the feature branch got a new snapshot.
String newSnapshotJson =
"{\n"
+ " \"snapshot-id\" : 999,\n"
+ " \"timestamp-ms\" : 5000,\n"
+ " \"summary\" : {\"operation\": \"append\"},\n"
+ " \"manifest-list\" : \"/tmp/feature.avro\",\n"
+ " \"schema-id\" : 0\n"
+ "}";
Map<String, String> refs = new HashMap<>();
refs.put("main", "{\"snapshot-id\":100,\"type\":\"branch\"}"); // main stayed at old snapshot
refs.put("feature", "{\"snapshot-id\":999,\"type\":\"branch\"}"); // feature got new snapshot

IcebergSnapshotsRequestBody requestBody =
IcebergSnapshotsRequestBody.builder()
.baseTableVersion("v1")
.jsonSnapshots(Collections.singletonList(newSnapshotJson))
.snapshotRefs(refs)
.createUpdateTableRequestBody(RequestConstants.TEST_CREATE_TABLE_REQUEST_BODY)
.build();

mvc.perform(
MockMvcRequestBuilders.put(
String.format(
CURRENT_MAJOR_VERSION_PREFIX
+ "/databases/d200/tables/tb1/iceberg/v2/snapshots"))
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(requestBody.toJson()));
Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture());
TableAuditEvent actualEvent = argCaptor.getValue();
assertEquals("feature", actualEvent.getBranchRefName());
// main didn't advance, so currentSnapshotId is main's old snapshot and timestamp is null
assertEquals(100L, actualEvent.getCurrentSnapshotId().longValue());
assertNull(actualEvent.getCurrentSnapshotTimestampMs());
}

@Test
public void testPutIcebergSnapshotsBranchOnlyCommitLeavesSnapshotInfoNull() throws Exception {
// Simulate a branch-only commit where main is absent from snapshotRefs.
// In this case the main branch ref doesn't exist, so currentSnapshotId /
// currentSnapshotTimestampMs should be null.
// Simulate a branch-only commit where main is absent from snapshotRefs entirely.
// currentSnapshotId / currentSnapshotTimestampMs are null (no main), but branchRefName
// is still populated from the ref that received the new snapshot.
IcebergSnapshotsRequestBody branchOnlyRequestBody =
IcebergSnapshotsRequestBody.builder()
.baseTableVersion("v1")
Expand All @@ -132,6 +192,7 @@ public void testPutIcebergSnapshotsBranchOnlyCommitLeavesSnapshotInfoNull() thro
.content(branchOnlyRequestBody.toJson()));
Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture());
TableAuditEvent actualEvent = argCaptor.getValue();
assertEquals("my_branch", actualEvent.getBranchRefName());
assertNull(actualEvent.getCurrentSnapshotId());
assertNull(actualEvent.getCurrentSnapshotTimestampMs());
}
Expand Down Expand Up @@ -182,6 +243,42 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception
TableAuditEvent actualEvent = argCaptor.getValue();
assertEquals(100L, actualEvent.getCurrentSnapshotId().longValue());
assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue());
// The last snapshot (200) belongs to feature — that is the committed branch.
assertEquals("feature", actualEvent.getBranchRefName());
}

@Test
public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Exception {
Map<String, String> requestProperties = new HashMap<>();
requestProperties.put("openhouse.watermark", "100");
requestProperties.put("openhouse.tableType", "PRIMARY_TABLE");
requestProperties.put("user.custom.key", "v");
IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY;
IcebergSnapshotsRequestBody requestBody =
IcebergSnapshotsRequestBody.builder()
.baseTableVersion(base.getBaseTableVersion())
.jsonSnapshots(base.getJsonSnapshots())
.snapshotRefs(base.getSnapshotRefs())
.createUpdateTableRequestBody(
base.getCreateUpdateTableRequestBody()
.toBuilder()
.tableProperties(requestProperties)
.build())
.build();
mvc.perform(
MockMvcRequestBuilders.put(
String.format(
CURRENT_MAJOR_VERSION_PREFIX
+ "/databases/d200/tables/tb1/iceberg/v2/snapshots"))
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(requestBody.toJson()));
Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture());
TableAuditEvent actualEvent = argCaptor.getValue();
Map<String, String> expected = new HashMap<>();
expected.put("openhouse.watermark", "100");
expected.put("openhouse.tableType", "PRIMARY_TABLE");
assertEquals(expected, actualEvent.getTableProperties());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public final class TableAuditModelConstants {
.operationType(OperationType.COMMIT)
.currentSnapshotId(2151407017102313398L)
.currentSnapshotTimestampMs(1669126937912L)
.branchRefName("main")
.build();

public static final TableAuditEvent TABLE_AUDIT_EVENT_PUT_ICEBERG_SNAPSHOTS_FAILED =
Expand All @@ -237,6 +238,7 @@ public final class TableAuditModelConstants {
.operationType(OperationType.COMMIT)
.currentSnapshotId(2151407017102313398L)
.currentSnapshotTimestampMs(1669126937912L)
.branchRefName("main")
.build();

public static final TableAuditEvent TABLE_AUDIT_EVENT_PUT_ICEBERG_SNAPSHOTS_CTAS =
Expand All @@ -249,6 +251,7 @@ public final class TableAuditModelConstants {
.operationType(OperationType.STAGED_COMMIT)
.currentSnapshotId(2151407017102313398L)
.currentSnapshotTimestampMs(1669126937912L)
.branchRefName("main")
.build();

public static final TableAuditEvent TABLE_AUDIT_EVENT_GET_ALL_DATABASES_SUCCESS =
Expand Down