Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import com.linkedin.openhouse.tables.audit.model.OperationStatus;
import com.linkedin.openhouse.tables.audit.model.OperationType;
import com.linkedin.openhouse.tables.audit.model.TableAuditEvent;
import com.linkedin.openhouse.tables.config.InternalCatalogProperties;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -44,6 +46,8 @@ public class TableAuditAspect {

@Autowired private AuditHandler<TableAuditEvent> tableAuditHandler;

@Autowired private InternalCatalogProperties internalCatalogProperties;

/**
* Install the Around advice for getTable() method in OpenHouseTablesApiHandler.
*
Expand Down Expand Up @@ -378,13 +382,20 @@ protected ApiResponse<GetTableResponseBody> auditPutIcebergSnapshots(
.tableName(tableId)
.operationType(operationType);
extractSnapshotInfo(icebergSnapshotRequestBody, eventBuilder);
TableAuditEvent event = eventBuilder.build();
try {
result = (ApiResponse<GetTableResponseBody>) point.proceed();
// Read tableProperties from the response, not the request body: OpenHouse mutates
// properties server-side during commit (e.g. openhouse.tableVersion,
// openhouse.lastModifiedTime), and the audit event should reflect the committed state.
TableAuditEvent event =
eventBuilder
.tableProperties(filterTableProperties(result.getResponseBody().getTableProperties()))
.build();
buildAndSendEvent(
event, OperationStatus.SUCCESS, result.getResponseBody().getTableLocation());
} catch (Throwable t) {
buildAndSendEvent(event, OperationStatus.FAILED, null);
// On failure there is no committed state to read from, so tableProperties stays null.
buildAndSendEvent(eventBuilder.build(), OperationStatus.FAILED, null);
throw t;
}
return result;
Expand Down Expand Up @@ -529,6 +540,30 @@ protected ApiResponse<Void> auditUpdateDatabaseAclPolicies(
return result;
}

/**
* Narrows the committed table properties down to the configured allowlist ({@code
* cluster.iceberg.tables.audit.table-properties-allowlist}). Returns {@code null} when there is
* nothing to emit so downstream audit handlers can skip the field entirely. Iterates the
* allowlist rather than the source so cost is O(|allowlist|) regardless of source size.
*/
private Map<String, String> filterTableProperties(Map<String, String> source) {
Copy link
Copy Markdown
Member

@abhisheknath2011 abhisheknath2011 May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we exclude some system level operation properties such as dlo specific which can be very large? (cc: @jiang95-dev )?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also ran into OOM issues earlier while logging large events.

if (source == null || source.isEmpty()) {
return null;
}
List<String> allowlist = internalCatalogProperties.getAudit().getTablePropertiesAllowlist();
if (allowlist == null || allowlist.isEmpty()) {
return null;
}
Map<String, String> filtered = new HashMap<>();
for (String key : allowlist) {
String value = source.get(key);
if (value != null) {
filtered.put(key, value);
}
}
return filtered.isEmpty() ? null : filtered;
}

private void buildAndSendEvent(
TableAuditEvent event, OperationStatus status, String currentTableRoot) {
TableAuditEvent completeEvent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.openhouse.common.audit.model.BaseAuditEvent;
import java.time.Instant;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -40,4 +41,6 @@ public class TableAuditEvent extends BaseAuditEvent {
private Long currentSnapshotId;

private Long currentSnapshotTimestampMs;

private Map<String, String> tableProperties;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.openhouse.tables.config;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -13,11 +15,19 @@ public class InternalCatalogProperties {

private MetadataCache metadataCache = new MetadataCache();

private Audit audit = new Audit();

@Getter
@Setter
public static class MetadataCache {
private Boolean enabled;
private Duration ttl;
private DataSize maxWeight;
}

@Getter
@Setter
public static class Audit {
private List<String> tablePropertiesAllowlist = Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ public ApiResponse<GetTableResponseBody> putIcebergSnapshots(
.responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY)
.build();
case "d200":
// Echo the request's table properties on the response so the audit aspect can read
// committed state from result.getResponseBody().getTableProperties().
return ApiResponse.<GetTableResponseBody>builder()
.httpStatus(HttpStatus.OK)
.responseBody(RequestConstants.TEST_GET_TABLE_RESPONSE_BODY)
.responseBody(
RequestConstants.TEST_GET_TABLE_RESPONSE_BODY
.toBuilder()
.tableProperties(
icebergSnapshotRequestBody.getCreateUpdateTableRequestBody() == null
? null
: icebergSnapshotRequestBody
.getCreateUpdateTableRequestBody()
.getTableProperties())
.build())
.build();
case "d400":
throw new RequestValidationFailureException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@
import org.springframework.http.MediaType;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;

@SpringBootTest
@AutoConfigureMockMvc
@ContextConfiguration
@TestPropertySource(
properties = {
"cluster.iceberg.tables.audit.table-properties-allowlist[0]=openhouse.watermark",
"cluster.iceberg.tables.audit.table-properties-allowlist[1]=openhouse.tableType"
})
@WithMockUser(username = "testUser")
public class IcebergSnapshotsApiHandlerAuditTest {
@Autowired private MockMvc mvc;
Expand Down Expand Up @@ -184,6 +190,40 @@ public void testPutIcebergSnapshotsMainPointsToOlderSnapshot() throws Exception
assertEquals(1000L, actualEvent.getCurrentSnapshotTimestampMs().longValue());
}

@Test
public void testPutIcebergSnapshotsFiltersTablePropertiesToAllowlist() throws Exception {
Map<String, String> requestProperties = new HashMap<>();
requestProperties.put("openhouse.watermark", "100");
requestProperties.put("openhouse.tableType", "PRIMARY_TABLE");
requestProperties.put("user.custom.key", "v");
IcebergSnapshotsRequestBody base = RequestConstants.TEST_ICEBERG_SNAPSHOTS_REQUEST_BODY;
IcebergSnapshotsRequestBody requestBody =
IcebergSnapshotsRequestBody.builder()
.baseTableVersion(base.getBaseTableVersion())
.jsonSnapshots(base.getJsonSnapshots())
.snapshotRefs(base.getSnapshotRefs())
.createUpdateTableRequestBody(
base.getCreateUpdateTableRequestBody()
.toBuilder()
.tableProperties(requestProperties)
.build())
.build();
mvc.perform(
MockMvcRequestBuilders.put(
String.format(
CURRENT_MAJOR_VERSION_PREFIX
+ "/databases/d200/tables/tb1/iceberg/v2/snapshots"))
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(requestBody.toJson()));
Mockito.verify(tableAuditHandler, atLeastOnce()).audit(argCaptor.capture());
TableAuditEvent actualEvent = argCaptor.getValue();
Map<String, String> expected = new HashMap<>();
expected.put("openhouse.watermark", "100");
expected.put("openhouse.tableType", "PRIMARY_TABLE");
assertEquals(expected, actualEvent.getTableProperties());
}

@Test
public void testCTASCommitPhase() throws Exception {
mvc.perform(
Expand Down