e : newMetadata.properties().entrySet()) {
+ if (e.getKey() != null && e.getValue() != null) {
+ tableProperties.put(e.getKey(), e.getValue());
+ }
+ }
+ }
+
+ return CreateUpdateTableRequestBody.builder()
+ .tableId(tableName)
+ .databaseId(namespace.level(0))
+ .clusterId(clusterProperties.getClusterName())
+ .schema(SchemaParser.toJson(newMetadata.schema(), false))
+ .sortOrder(SortOrderParser.toJson(newMetadata.sortOrder()))
+ .baseTableVersion(baseTableVersion == null ? "INITIAL_VERSION" : baseTableVersion)
+ .tableProperties(tableProperties)
+ .tableType(TableType.PRIMARY_TABLE)
+ // v0: timePartitioning / clustering / policies / newIntermediateSchemas left null
+ .build();
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/CreateTableRequestAdapter.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/CreateTableRequestAdapter.java
new file mode 100644
index 000000000..a1ac7139f
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/CreateTableRequestAdapter.java
@@ -0,0 +1,63 @@
+package com.linkedin.openhouse.tables.rest.adapter;
+
+import com.linkedin.openhouse.cluster.configs.ClusterProperties;
+import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
+import com.linkedin.openhouse.tables.common.TableType;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.springframework.stereotype.Component;
+
+/**
+ * Adapter from Iceberg REST {@link CreateTableRequest} to OpenHouse {@link
+ * CreateUpdateTableRequestBody}.
+ *
+ * v0: partition-spec / clustering / policies are not propagated separately — OpenHouse stores
+ * the Iceberg PartitionSpec inside its own metadata file. Stage-create requests are rejected at the
+ * controller layer.
+ */
+@Component
+@RequiredArgsConstructor
+public class CreateTableRequestAdapter {
+
+ private final ClusterProperties clusterProperties;
+
+ public CreateUpdateTableRequestBody buildFromCreate(Namespace namespace, CreateTableRequest req) {
+ NamespaceUtilRest.requireDepthOne(namespace);
+
+ Map tableProperties = new HashMap<>();
+ Map incoming =
+ req.properties() == null ? Collections.emptyMap() : req.properties();
+ for (Map.Entry e : incoming.entrySet()) {
+ if (e.getKey() != null && e.getValue() != null) {
+ tableProperties.put(e.getKey(), e.getValue());
+ }
+ }
+
+ String sortOrderJson;
+ if (req.writeOrder() != null) {
+ sortOrderJson = SortOrderParser.toJson(req.writeOrder());
+ } else {
+ sortOrderJson = SortOrderParser.toJson(SortOrder.unsorted());
+ }
+
+ return CreateUpdateTableRequestBody.builder()
+ .tableId(req.name())
+ .databaseId(namespace.level(0))
+ .clusterId(clusterProperties.getClusterName())
+ .schema(SchemaParser.toJson(req.schema(), false))
+ .sortOrder(sortOrderJson)
+ .baseTableVersion("INITIAL_VERSION")
+ .tableProperties(tableProperties)
+ .tableType(TableType.PRIMARY_TABLE)
+ .stageCreate(false)
+ // v0: timePartitioning / clustering / policies / newIntermediateSchemas left null
+ .build();
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/IcebergRestJson.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/IcebergRestJson.java
new file mode 100644
index 000000000..8902a3e37
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/IcebergRestJson.java
@@ -0,0 +1,39 @@
+package com.linkedin.openhouse.tables.rest.adapter;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.iceberg.rest.RESTSerializers;
+
+/**
+ * Project-local replacement for Iceberg's package-private {@code RESTObjectMapper}. Builds a
+ * Jackson {@link ObjectMapper} configured with Iceberg REST request/response (de)serializers via
+ * {@link RESTSerializers#registerAll(ObjectMapper)}. Use this for parsing inbound bodies and
+ * rendering outbound bodies on all {@code /iceberg/v1/*} endpoints.
+ */
+public final class IcebergRestJson {
+
+ private static final ObjectMapper MAPPER;
+
+ static {
+ ObjectMapper m = new ObjectMapper();
+ // Iceberg's REST response types (ListNamespacesResponse, LoadTableResponse, ...) have
+ // private fields and method-style accessors like `namespaces()` — not getters.
+ // Without FIELD visibility ANY, Jackson sees no properties and emits {}.
+ m.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ m.setPropertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE);
+ m.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+ m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ m.setSerializationInclusion(com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL);
+ RESTSerializers.registerAll(m);
+ MAPPER = m;
+ }
+
+ private IcebergRestJson() {}
+
+ public static ObjectMapper mapper() {
+ return MAPPER;
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/NamespaceUtilRest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/NamespaceUtilRest.java
new file mode 100644
index 000000000..51d406708
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/NamespaceUtilRest.java
@@ -0,0 +1,52 @@
+package com.linkedin.openhouse.tables.rest.adapter;
+
+import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.RESTUtil;
+
+/**
+ * Helpers for translating between Iceberg REST namespace path variables and {@link Namespace}
+ * objects. OpenHouse only supports single-level namespaces ("databases"); this helper enforces
+ * that.
+ */
+public final class NamespaceUtilRest {
+
+ private NamespaceUtilRest() {}
+
+ /** Decode an Iceberg REST path-variable (unit-separator delimited) into a {@link Namespace}. */
+ public static Namespace decode(String pathVariable) {
+ if (pathVariable == null || pathVariable.isEmpty()) {
+ throw new RequestValidationFailureException("namespace path variable is empty");
+ }
+ return RESTUtil.decodeNamespace(pathVariable);
+ }
+
+ /** Encode a {@link Namespace} back to a path-variable string. */
+ public static String encode(Namespace ns) {
+ return RESTUtil.encodeNamespace(ns);
+ }
+
+ /**
+ * Reject multi-level namespaces — OpenHouse uses a single database level. Empty namespaces are
+ * also rejected.
+ */
+ public static void requireDepthOne(Namespace ns) {
+ if (ns == null || ns.isEmpty()) {
+ throw new RequestValidationFailureException(
+ "namespace must have exactly one level; got empty namespace");
+ }
+ if (ns.length() > 1) {
+ throw new RequestValidationFailureException(
+ "multi-level namespaces are not supported; depth=" + ns.length());
+ }
+ }
+
+ /**
+ * Return the single database level. Caller must have run {@link #requireDepthOne(Namespace)}
+ * first; this method does so defensively as well.
+ */
+ public static String singleLevelDb(Namespace ns) {
+ requireDepthOne(ns);
+ return ns.level(0);
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/TableLoadAdapter.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/TableLoadAdapter.java
new file mode 100644
index 000000000..6ba874711
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/adapter/TableLoadAdapter.java
@@ -0,0 +1,39 @@
+package com.linkedin.openhouse.tables.rest.adapter;
+
+import com.linkedin.openhouse.internal.catalog.OpenHouseInternalCatalog;
+import lombok.RequiredArgsConstructor;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.springframework.stereotype.Component;
+
+/**
+ * Loads an OpenHouse-backed Iceberg table via {@link OpenHouseInternalCatalog} and wraps it into an
+ * Iceberg REST {@link LoadTableResponse}. The metadata-location field of the response is filled
+ * from {@link TableMetadata#metadataFileLocation()} by the Iceberg builder.
+ */
+@Component
+@RequiredArgsConstructor
+public class TableLoadAdapter {
+
+ private final OpenHouseInternalCatalog openHouseInternalCatalog;
+
+ public LoadTableResponse buildLoadTableResponse(String databaseId, String tableId) {
+ TableIdentifier id = TableIdentifier.of(Namespace.of(databaseId), tableId);
+ Table table = openHouseInternalCatalog.loadTable(id);
+ BaseTable baseTable = (BaseTable) table;
+ TableMetadata metadata = baseTable.operations().current();
+ return LoadTableResponse.builder().withTableMetadata(metadata).build();
+ }
+
+ /** Convenience: also returns the underlying metadata, for callers that need both. */
+ public TableMetadata loadMetadata(String databaseId, String tableId) {
+ TableIdentifier id = TableIdentifier.of(Namespace.of(databaseId), tableId);
+ Table table = openHouseInternalCatalog.loadTable(id);
+ BaseTable baseTable = (BaseTable) table;
+ return baseTable.operations().current();
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/config/IcebergRestPaths.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/config/IcebergRestPaths.java
new file mode 100644
index 000000000..4c658305b
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/config/IcebergRestPaths.java
@@ -0,0 +1,10 @@
+package com.linkedin.openhouse.tables.rest.config;
+
+/** URL roots for the Iceberg REST Catalog adapter. */
+public final class IcebergRestPaths {
+ private IcebergRestPaths() {}
+
+ public static final String BASE = "/iceberg";
+ public static final String V1 = BASE + "/v1";
+ public static final String CONFIG = V1 + "/config";
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergConfigController.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergConfigController.java
new file mode 100644
index 000000000..b199811f2
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergConfigController.java
@@ -0,0 +1,42 @@
+package com.linkedin.openhouse.tables.rest.controller;
+
+import com.linkedin.openhouse.tables.rest.adapter.IcebergRestJson;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Iceberg REST Catalog {@code GET /v1/config} endpoint.
+ *
+ * Iceberg 1.5.2 {@link ConfigResponse.Builder} does not expose a {@code withEndpoints(...)}
+ * method (endpoint advertisement was added in newer Iceberg). Clients will probe each call instead
+ * and 404s for unsupported endpoints are tolerated by Spark/Flink REST catalog clients.
+ */
+@RestController
+@Slf4j
+public class IcebergConfigController {
+
+ @GetMapping(value = "/iceberg/v1/config", produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity getConfig(
+ @RequestParam(value = "warehouse", required = false) String warehouse) throws Exception {
+ // Echo the caller's warehouse name back as `overrides.prefix` so the Iceberg
+ // REST client routes every subsequent call through /v1/{prefix}/... — that is
+ // the URL shape this adapter's namespace/table controllers serve.
+ Map overrides = new HashMap<>();
+ overrides.put("prefix", warehouse == null || warehouse.isEmpty() ? "openhouse" : warehouse);
+ ConfigResponse response =
+ ConfigResponse.builder()
+ .withDefaults(Collections.emptyMap())
+ .withOverrides(overrides)
+ .build();
+ String json = IcebergRestJson.mapper().writeValueAsString(response);
+ return ResponseEntity.ok().contentType(MediaType.APPLICATION_JSON).body(json);
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergNamespacesController.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergNamespacesController.java
new file mode 100644
index 000000000..3b2fc3678
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergNamespacesController.java
@@ -0,0 +1,165 @@
+package com.linkedin.openhouse.tables.rest.controller;
+
+import com.linkedin.openhouse.common.api.spec.ApiResponse;
+import com.linkedin.openhouse.common.exception.NoSuchEntityException;
+import com.linkedin.openhouse.tables.api.handler.DatabasesApiHandler;
+import com.linkedin.openhouse.tables.api.spec.v0.response.GetAllDatabasesResponseBody;
+import com.linkedin.openhouse.tables.api.spec.v0.response.GetDatabaseResponseBody;
+import com.linkedin.openhouse.tables.rest.adapter.IcebergRestJson;
+import com.linkedin.openhouse.tables.rest.adapter.NamespaceUtilRest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Iceberg REST Catalog namespace endpoints under {@code /iceberg/v1/{prefix}/namespaces/...}.
+ *
+ * OpenHouse namespaces are implicit — they spring into existence when their first table is
+ * created and stop existing when their last table is dropped. We surface them via {@link
+ * DatabasesApiHandler#getAllDatabases()} for listing and existence checks. Creation is a no-op
+ * success (we don't pre-create), and deletion is accepted as a no-op success because OpenHouse has
+ * no explicit drop-namespace operation.
+ *
+ *
Only single-level namespaces are supported; deeper paths return 400.
+ */
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+@RequestMapping("/iceberg/v1/{prefix}")
+public class IcebergNamespacesController {
+
+ private final DatabasesApiHandler databasesApiHandler;
+
+ // ----------------------------------------------------------------------------------------------
+ // GET /namespaces
+ // ----------------------------------------------------------------------------------------------
+ @GetMapping(value = "/namespaces", produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity listNamespaces(@PathVariable("prefix") String prefix)
+ throws Exception {
+ ApiResponse resp = databasesApiHandler.getAllDatabases();
+ List dbs =
+ resp.getResponseBody() == null || resp.getResponseBody().getResults() == null
+ ? Collections.emptyList()
+ : resp.getResponseBody().getResults();
+
+ List namespaces = new ArrayList<>(dbs.size());
+ for (GetDatabaseResponseBody db : dbs) {
+ namespaces.add(Namespace.of(db.getDatabaseId()));
+ }
+ ListNamespacesResponse out = ListNamespacesResponse.builder().addAll(namespaces).build();
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(out));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // POST /namespaces — accept, but no-op (OpenHouse namespaces are auto-created on first table)
+ // ----------------------------------------------------------------------------------------------
+ @PostMapping(
+ value = "/namespaces",
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity createNamespace(
+ @PathVariable("prefix") String prefix, @RequestBody String requestBody) throws Exception {
+ CreateNamespaceRequest req =
+ IcebergRestJson.mapper().readValue(requestBody, CreateNamespaceRequest.class);
+ Namespace ns = req.namespace();
+ NamespaceUtilRest.requireDepthOne(ns);
+
+ // OpenHouse doesn't pre-create namespaces; we return success and silently drop properties.
+ CreateNamespaceResponse out =
+ CreateNamespaceResponse.builder()
+ .withNamespace(ns)
+ .setProperties(Collections.emptyMap())
+ .build();
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(out));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // GET /namespaces/{namespace}
+ // ----------------------------------------------------------------------------------------------
+ @GetMapping(value = "/namespaces/{namespace}", produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity loadNamespace(
+ @PathVariable("prefix") String prefix, @PathVariable("namespace") String namespacePath)
+ throws Exception {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ NamespaceUtilRest.requireDepthOne(ns);
+
+ if (!namespaceExists(ns)) {
+ throw new NoSuchEntityException("Namespace", ns.toString());
+ }
+
+ GetNamespaceResponse out =
+ GetNamespaceResponse.builder()
+ .withNamespace(ns)
+ .setProperties(Collections.emptyMap())
+ .build();
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(out));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // HEAD /namespaces/{namespace}
+ // ----------------------------------------------------------------------------------------------
+ @RequestMapping(value = "/namespaces/{namespace}", method = RequestMethod.HEAD)
+ public ResponseEntity namespaceExistsHead(
+ @PathVariable("prefix") String prefix, @PathVariable("namespace") String namespacePath) {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ NamespaceUtilRest.requireDepthOne(ns);
+
+ if (!namespaceExists(ns)) {
+ // Let the exception handler render the 404 ErrorResponse for consistency.
+ throw new NoSuchEntityException("Namespace", ns.toString());
+ }
+ return ResponseEntity.status(HttpStatus.NO_CONTENT).build();
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // DELETE /namespaces/{namespace} — OpenHouse has no drop-namespace; accept as no-op.
+ // ----------------------------------------------------------------------------------------------
+ @DeleteMapping(value = "/namespaces/{namespace}")
+ public ResponseEntity dropNamespace(
+ @PathVariable("prefix") String prefix, @PathVariable("namespace") String namespacePath) {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ NamespaceUtilRest.requireDepthOne(ns);
+ return ResponseEntity.status(HttpStatus.NO_CONTENT).build();
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // helpers
+ // ----------------------------------------------------------------------------------------------
+ private boolean namespaceExists(Namespace ns) {
+ ApiResponse resp = databasesApiHandler.getAllDatabases();
+ if (resp.getResponseBody() == null || resp.getResponseBody().getResults() == null) {
+ return false;
+ }
+ String dbId = ns.level(0);
+ for (GetDatabaseResponseBody db : resp.getResponseBody().getResults()) {
+ if (dbId.equals(db.getDatabaseId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergRestExceptionHandler.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergRestExceptionHandler.java
new file mode 100644
index 000000000..a75d9999c
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergRestExceptionHandler.java
@@ -0,0 +1,166 @@
+package com.linkedin.openhouse.tables.rest.controller;
+
+import com.linkedin.openhouse.common.exception.AlreadyExistsException;
+import com.linkedin.openhouse.common.exception.EntityConcurrentModificationException;
+import com.linkedin.openhouse.common.exception.InvalidSchemaEvolutionException;
+import com.linkedin.openhouse.common.exception.InvalidTableMetadataException;
+import com.linkedin.openhouse.common.exception.NoSuchEntityException;
+import com.linkedin.openhouse.common.exception.NoSuchSoftDeletedUserTableException;
+import com.linkedin.openhouse.common.exception.NoSuchUserTableException;
+import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
+import com.linkedin.openhouse.common.exception.UnsupportedClientOperationException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.ErrorResponseParser;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+import org.springframework.web.context.request.WebRequest;
+
+/**
+ * Spring REST controller advice scoped to {@code com.linkedin.openhouse.tables.rest}: translates
+ * OpenHouse internal exceptions and Iceberg exceptions to Iceberg's wire-format {@link
+ * ErrorResponse} JSON ({@code { "error": { "message": "...", "type": "...", "code": N } } }).
+ *
+ * The {@code basePackages} attribute is critical — it scopes this advice ONLY to the new REST
+ * adapter controllers so we don't clobber the existing OpenHouse exception handler for the native
+ * {@code /v1/databases/...} surface.
+ */
+@RestControllerAdvice(basePackages = "com.linkedin.openhouse.tables.rest")
+@Slf4j
+public class IcebergRestExceptionHandler {
+
+ // --------------------------------------------------------------------------------------------
+ // OpenHouse common exceptions
+ // --------------------------------------------------------------------------------------------
+
+ @ExceptionHandler(NoSuchEntityException.class)
+ public ResponseEntity handleNoSuchEntity(NoSuchEntityException ex, WebRequest req) {
+ String type = isTablePath(req) ? "NoSuchTableException" : "NoSuchNamespaceException";
+ return render(HttpStatus.NOT_FOUND, type, safeMessage(ex));
+ }
+
+ @ExceptionHandler(NoSuchUserTableException.class)
+ public ResponseEntity handleNoSuchUserTable(NoSuchUserTableException ex) {
+ return render(HttpStatus.NOT_FOUND, "NoSuchTableException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(NoSuchSoftDeletedUserTableException.class)
+ public ResponseEntity handleNoSuchSoftDeleted(NoSuchSoftDeletedUserTableException ex) {
+ return render(HttpStatus.NOT_FOUND, "NoSuchTableException", safeMessage(ex));
+ }
+
+ /**
+ * Catch-all for {@link java.util.NoSuchElementException} (parent of OpenHouse's
+ * NoSuchUserTableException). Some OpenHouse code paths throw the raw parent class.
+ */
+ @ExceptionHandler(java.util.NoSuchElementException.class)
+ public ResponseEntity handleNoSuchElement(
+ java.util.NoSuchElementException ex, WebRequest req) {
+ String type = isTablePath(req) ? "NoSuchTableException" : "NoSuchNamespaceException";
+ return render(HttpStatus.NOT_FOUND, type, safeMessage(ex));
+ }
+
+ @ExceptionHandler(AlreadyExistsException.class)
+ public ResponseEntity handleAlreadyExists(AlreadyExistsException ex) {
+ return render(HttpStatus.CONFLICT, "AlreadyExistsException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(EntityConcurrentModificationException.class)
+ public ResponseEntity handleConcurrentModification(
+ EntityConcurrentModificationException ex) {
+ return render(HttpStatus.CONFLICT, "CommitFailedException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(RequestValidationFailureException.class)
+ public ResponseEntity handleRequestValidation(RequestValidationFailureException ex) {
+ return render(HttpStatus.BAD_REQUEST, "BadRequestException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(InvalidSchemaEvolutionException.class)
+ public ResponseEntity handleSchemaEvolution(InvalidSchemaEvolutionException ex) {
+ return render(HttpStatus.BAD_REQUEST, "BadRequestException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(InvalidTableMetadataException.class)
+ public ResponseEntity handleInvalidTableMetadata(InvalidTableMetadataException ex) {
+ return render(HttpStatus.BAD_REQUEST, "BadRequestException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(UnsupportedClientOperationException.class)
+ public ResponseEntity handleUnsupportedClient(UnsupportedClientOperationException ex) {
+ return render(HttpStatus.BAD_REQUEST, "BadRequestException", safeMessage(ex));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Iceberg-native exceptions (in case a handler unwraps and rethrows raw)
+ // --------------------------------------------------------------------------------------------
+
+ @ExceptionHandler(CommitFailedException.class)
+ public ResponseEntity handleCommitFailed(CommitFailedException ex) {
+ return render(HttpStatus.CONFLICT, "CommitFailedException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(org.apache.iceberg.exceptions.NoSuchTableException.class)
+ public ResponseEntity handleIcebergNoSuchTable(
+ org.apache.iceberg.exceptions.NoSuchTableException ex) {
+ return render(HttpStatus.NOT_FOUND, "NoSuchTableException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(org.apache.iceberg.exceptions.NoSuchNamespaceException.class)
+ public ResponseEntity handleIcebergNoSuchNamespace(
+ org.apache.iceberg.exceptions.NoSuchNamespaceException ex) {
+ return render(HttpStatus.NOT_FOUND, "NoSuchNamespaceException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(org.apache.iceberg.exceptions.AlreadyExistsException.class)
+ public ResponseEntity handleIcebergAlreadyExists(
+ org.apache.iceberg.exceptions.AlreadyExistsException ex) {
+ return render(HttpStatus.CONFLICT, "AlreadyExistsException", safeMessage(ex));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Generic fallbacks
+ // --------------------------------------------------------------------------------------------
+
+ @ExceptionHandler(IllegalArgumentException.class)
+ public ResponseEntity handleIllegalArgument(IllegalArgumentException ex) {
+ return render(HttpStatus.BAD_REQUEST, "BadRequestException", safeMessage(ex));
+ }
+
+ @ExceptionHandler(Exception.class)
+ public ResponseEntity handleAny(Exception ex) {
+ log.warn("Unhandled exception in Iceberg REST adapter", ex);
+ return render(HttpStatus.INTERNAL_SERVER_ERROR, "InternalServerError", safeMessage(ex));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // helpers
+ // --------------------------------------------------------------------------------------------
+
+ private static ResponseEntity render(HttpStatus status, String type, String message) {
+ ErrorResponse err =
+ ErrorResponse.builder()
+ .withMessage(message == null ? "" : message)
+ .withType(type)
+ .responseCode(status.value())
+ .build();
+ String json = ErrorResponseParser.toJson(err);
+ return ResponseEntity.status(status).contentType(MediaType.APPLICATION_JSON).body(json);
+ }
+
+ private static String safeMessage(Throwable t) {
+ return t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage();
+ }
+
+ private static boolean isTablePath(WebRequest req) {
+ if (req == null) {
+ return false;
+ }
+ String desc = req.getDescription(false);
+ return desc != null && desc.contains("/tables/");
+ }
+}
diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergTablesController.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergTablesController.java
new file mode 100644
index 000000000..0cd8d3805
--- /dev/null
+++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/rest/controller/IcebergTablesController.java
@@ -0,0 +1,314 @@
+package com.linkedin.openhouse.tables.rest.controller;
+
+import static com.linkedin.openhouse.common.security.AuthenticationUtils.extractAuthenticatedUserPrincipal;
+
+import com.linkedin.openhouse.common.api.spec.ApiResponse;
+import com.linkedin.openhouse.tables.api.handler.IcebergSnapshotsApiHandler;
+import com.linkedin.openhouse.tables.api.handler.TablesApiHandler;
+import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
+import com.linkedin.openhouse.tables.api.spec.v0.request.IcebergSnapshotsRequestBody;
+import com.linkedin.openhouse.tables.api.spec.v0.response.GetAllTablesResponseBody;
+import com.linkedin.openhouse.tables.api.spec.v0.response.GetTableResponseBody;
+import com.linkedin.openhouse.tables.rest.adapter.CommitAdapter;
+import com.linkedin.openhouse.tables.rest.adapter.CreateTableRequestAdapter;
+import com.linkedin.openhouse.tables.rest.adapter.IcebergRestJson;
+import com.linkedin.openhouse.tables.rest.adapter.NamespaceUtilRest;
+import com.linkedin.openhouse.tables.rest.adapter.TableLoadAdapter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotParser;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotRefParser;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Iceberg REST Catalog table endpoints under {@code /iceberg/v1/{prefix}/namespaces/...}.
+ *
+ * Reads are delegated to {@link TablesApiHandler} for ACL + existence and then to {@link
+ * TableLoadAdapter} (backed by {@code OpenHouseInternalCatalog}) for the live Iceberg metadata.
+ *
+ *
Commits replay the inbound {@code MetadataUpdate}s on top of the current metadata, then route
+ * the result to either {@link IcebergSnapshotsApiHandler#putIcebergSnapshots} (when snapshots
+ * changed) or {@link TablesApiHandler#updateTable} (metadata-only commit) so OpenHouse's normal
+ * write path runs.
+ *
+ *
{@link org.apache.iceberg.rest.responses.LoadTableResponse} is used for both create and commit
+ * responses — Iceberg's commit response is structurally identical and {@code CommitTableResponse}
+ * does not exist in iceberg-core 1.5.2.
+ */
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+@RequestMapping("/iceberg/v1/{prefix}")
+public class IcebergTablesController {
+
+ private final TablesApiHandler tablesApiHandler;
+ private final IcebergSnapshotsApiHandler icebergSnapshotsApiHandler;
+ private final CommitAdapter commitAdapter;
+ private final CreateTableRequestAdapter createTableRequestAdapter;
+ private final TableLoadAdapter tableLoadAdapter;
+
+ // ----------------------------------------------------------------------------------------------
+ // GET /namespaces/{namespace}/tables
+ // ----------------------------------------------------------------------------------------------
+ @GetMapping(value = "/namespaces/{namespace}/tables", produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity listTables(
+ @PathVariable("prefix") String prefix, @PathVariable("namespace") String namespacePath)
+ throws Exception {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ String databaseId = NamespaceUtilRest.singleLevelDb(ns);
+
+ ApiResponse resp = tablesApiHandler.searchTables(databaseId);
+ List tables =
+ resp.getResponseBody() == null || resp.getResponseBody().getResults() == null
+ ? Collections.emptyList()
+ : resp.getResponseBody().getResults();
+
+ ListTablesResponse.Builder builder = ListTablesResponse.builder();
+ for (GetTableResponseBody t : tables) {
+ builder.add(
+ org.apache.iceberg.catalog.TableIdentifier.of(
+ Namespace.of(t.getDatabaseId()), t.getTableId()));
+ }
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(builder.build()));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // GET /namespaces/{namespace}/tables/{table}
+ // ----------------------------------------------------------------------------------------------
+ @GetMapping(
+ value = "/namespaces/{namespace}/tables/{table}",
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity loadTable(
+ @PathVariable("prefix") String prefix,
+ @PathVariable("namespace") String namespacePath,
+ @PathVariable("table") String tableId)
+ throws Exception {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ String databaseId = NamespaceUtilRest.singleLevelDb(ns);
+ String principal = extractAuthenticatedUserPrincipal();
+
+ // existence + ACL check via OpenHouse handler; throws NoSuchEntityException -> 404
+ tablesApiHandler.getTable(databaseId, tableId, principal);
+
+ LoadTableResponse response = tableLoadAdapter.buildLoadTableResponse(databaseId, tableId);
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(response));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // HEAD /namespaces/{namespace}/tables/{table}
+ // ----------------------------------------------------------------------------------------------
+ @RequestMapping(value = "/namespaces/{namespace}/tables/{table}", method = RequestMethod.HEAD)
+ public ResponseEntity tableExistsHead(
+ @PathVariable("prefix") String prefix,
+ @PathVariable("namespace") String namespacePath,
+ @PathVariable("table") String tableId) {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ String databaseId = NamespaceUtilRest.singleLevelDb(ns);
+ String principal = extractAuthenticatedUserPrincipal();
+
+ // Throws NoSuchEntityException -> 404 via exception handler.
+ tablesApiHandler.getTable(databaseId, tableId, principal);
+ return ResponseEntity.status(HttpStatus.NO_CONTENT).build();
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // POST /namespaces/{namespace}/tables — create
+ // ----------------------------------------------------------------------------------------------
+ @PostMapping(
+ value = "/namespaces/{namespace}/tables",
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity createTable(
+ @PathVariable("prefix") String prefix,
+ @PathVariable("namespace") String namespacePath,
+ @RequestBody String requestBody)
+ throws Exception {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ String databaseId = NamespaceUtilRest.singleLevelDb(ns);
+ String principal = extractAuthenticatedUserPrincipal();
+
+ CreateTableRequest req =
+ IcebergRestJson.mapper().readValue(requestBody, CreateTableRequest.class);
+ if (req.stageCreate()) {
+ // v0 limitation: staged-create (CTAS) requires separate stage + commit handling.
+ return errorResponse(
+ HttpStatus.NOT_IMPLEMENTED,
+ "BadRequestException",
+ "stage-create (CTAS) is not supported by the OpenHouse Iceberg REST adapter v0");
+ }
+
+ CreateUpdateTableRequestBody body = createTableRequestAdapter.buildFromCreate(ns, req);
+ tablesApiHandler.createTable(databaseId, body, principal);
+
+ // Re-load the freshly-created table to produce a LoadTableResponse with current metadata.
+ LoadTableResponse response = tableLoadAdapter.buildLoadTableResponse(databaseId, req.name());
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(response));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // POST /namespaces/{namespace}/tables/{table} — commit (updateTable)
+ // ----------------------------------------------------------------------------------------------
+ @PostMapping(
+ value = "/namespaces/{namespace}/tables/{table}",
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity commitTable(
+ @PathVariable("prefix") String prefix,
+ @PathVariable("namespace") String namespacePath,
+ @PathVariable("table") String tableId,
+ @RequestBody String requestBody)
+ throws Exception {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ String databaseId = NamespaceUtilRest.singleLevelDb(ns);
+ String principal = extractAuthenticatedUserPrincipal();
+
+ UpdateTableRequest req =
+ IcebergRestJson.mapper().readValue(requestBody, UpdateTableRequest.class);
+
+ // 1. Load current metadata
+ TableMetadata currentMetadata = tableLoadAdapter.loadMetadata(databaseId, tableId);
+
+ // 2. Validate every requirement against current metadata (throws CommitFailedException -> 409)
+ if (req.requirements() != null) {
+ for (UpdateRequirement requirement : req.requirements()) {
+ requirement.validate(currentMetadata);
+ }
+ }
+
+ // 3. Replay updates to produce post-commit metadata
+ TableMetadata.Builder mdBuilder = TableMetadata.buildFrom(currentMetadata);
+ if (req.updates() != null) {
+ for (MetadataUpdate u : req.updates()) {
+ u.applyTo(mdBuilder);
+ }
+ }
+ TableMetadata newMetadata = mdBuilder.build();
+
+ // 4. Build the OpenHouse request body
+ String baseTableVersion = currentMetadata.metadataFileLocation();
+ CreateUpdateTableRequestBody body =
+ commitAdapter.buildCreateUpdateBody(ns, tableId, newMetadata, baseTableVersion);
+
+ // 5. Discriminate snapshot-changing vs metadata-only commits
+ boolean snapshotsChanged =
+ !safeEquals(currentMetadata.snapshots(), newMetadata.snapshots())
+ || !safeEquals(currentMetadata.refs(), newMetadata.refs());
+
+ if (snapshotsChanged) {
+ List jsonSnapshots = new ArrayList<>();
+ if (newMetadata.snapshots() != null) {
+ for (Snapshot s : newMetadata.snapshots()) {
+ jsonSnapshots.add(SnapshotParser.toJson(s));
+ }
+ }
+ Map jsonRefs = new LinkedHashMap<>();
+ if (newMetadata.refs() != null) {
+ for (Map.Entry e : newMetadata.refs().entrySet()) {
+ jsonRefs.put(e.getKey(), SnapshotRefParser.toJson(e.getValue()));
+ }
+ }
+ IcebergSnapshotsRequestBody snapBody =
+ IcebergSnapshotsRequestBody.builder()
+ .baseTableVersion(baseTableVersion)
+ .jsonSnapshots(jsonSnapshots)
+ .snapshotRefs(jsonRefs)
+ .createUpdateTableRequestBody(body)
+ .build();
+ icebergSnapshotsApiHandler.putIcebergSnapshots(databaseId, tableId, snapBody, principal);
+ } else {
+ tablesApiHandler.updateTable(databaseId, tableId, body, principal);
+ }
+
+ // 6. Re-load to produce the committed LoadTableResponse
+ LoadTableResponse response = tableLoadAdapter.buildLoadTableResponse(databaseId, tableId);
+ return ResponseEntity.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(IcebergRestJson.mapper().writeValueAsString(response));
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // DELETE /namespaces/{namespace}/tables/{table} — drop
+ // ----------------------------------------------------------------------------------------------
+ @DeleteMapping(value = "/namespaces/{namespace}/tables/{table}")
+ public ResponseEntity dropTable(
+ @PathVariable("prefix") String prefix,
+ @PathVariable("namespace") String namespacePath,
+ @PathVariable("table") String tableId,
+ @RequestParam(value = "purgeRequested", required = false, defaultValue = "false")
+ boolean purgeRequested) {
+ Namespace ns = NamespaceUtilRest.decode(namespacePath);
+ String databaseId = NamespaceUtilRest.singleLevelDb(ns);
+ String principal = extractAuthenticatedUserPrincipal();
+
+ // v0: ignore purgeRequested — OpenHouse soft-deletes by default.
+ tablesApiHandler.deleteTable(databaseId, tableId, principal);
+ return ResponseEntity.status(HttpStatus.NO_CONTENT).build();
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // helpers
+ // ----------------------------------------------------------------------------------------------
+
+ /** Null-safe equality wrapper for {@code List#equals} / {@code Map#equals}. */
+ private static boolean safeEquals(Object a, Object b) {
+ if (a == null && b == null) {
+ return true;
+ }
+ if (a == null || b == null) {
+ return false;
+ }
+ return a.equals(b);
+ }
+
+ /** Render a minimal Iceberg ErrorResponse JSON inline (used for special-case 501 below). */
+ private ResponseEntity errorResponse(HttpStatus status, String type, String message)
+ throws Exception {
+ org.apache.iceberg.rest.responses.ErrorResponse err =
+ org.apache.iceberg.rest.responses.ErrorResponse.builder()
+ .withMessage(message)
+ .withType(type)
+ .responseCode(status.value())
+ .build();
+ String json = org.apache.iceberg.rest.responses.ErrorResponseParser.toJson(err);
+ return ResponseEntity.status(status).contentType(MediaType.APPLICATION_JSON).body(json);
+ }
+
+ // Avoid an unused import warning if CommitFailedException is referenced only in javadoc.
+ @SuppressWarnings("unused")
+ private static Class> unusedCommitFailed() {
+ return CommitFailedException.class;
+ }
+}