Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.openhouse.tables.rest.adapter;

import com.linkedin.openhouse.cluster.configs.ClusterProperties;
import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.tables.common.TableType;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
import org.springframework.stereotype.Component;

/**
* Builds an OpenHouse {@link CreateUpdateTableRequestBody} from an Iceberg {@link TableMetadata}
* produced by a REST commit (i.e. base + MetadataUpdates replayed).
*
* <p>v0 simplifications:
*
* <ul>
* <li>timePartitioning / clustering — null (OpenHouse stores the spec inside the metadata file
* directly; the REST adapter does not synthesize OpenHouse's extra hints)
* <li>newIntermediateSchemas — null
* <li>policies — null
* <li>tableType — PRIMARY_TABLE always; replicas are not writable via REST
* </ul>
*/
@Component
@RequiredArgsConstructor
public class CommitAdapter {

private final ClusterProperties clusterProperties;

/**
* @param namespace OpenHouse database namespace (depth 1)
* @param tableName table name within the namespace
* @param newMetadata the post-update Iceberg metadata
* @param baseTableVersion {@code base.metadataFileLocation()} for updates, or {@code null} on
* create — in which case "INITIAL_VERSION" is used
*/
public CreateUpdateTableRequestBody buildCreateUpdateBody(
Namespace namespace, String tableName, TableMetadata newMetadata, String baseTableVersion) {
NamespaceUtilRest.requireDepthOne(namespace);

Map<String, String> tableProperties = new HashMap<>();
if (newMetadata.properties() != null) {
for (Map.Entry<String, String> e : newMetadata.properties().entrySet()) {
if (e.getKey() != null && e.getValue() != null) {
tableProperties.put(e.getKey(), e.getValue());
}
}
}

return CreateUpdateTableRequestBody.builder()
.tableId(tableName)
.databaseId(namespace.level(0))
.clusterId(clusterProperties.getClusterName())
.schema(SchemaParser.toJson(newMetadata.schema(), false))
.sortOrder(SortOrderParser.toJson(newMetadata.sortOrder()))
.baseTableVersion(baseTableVersion == null ? "INITIAL_VERSION" : baseTableVersion)
.tableProperties(tableProperties)
.tableType(TableType.PRIMARY_TABLE)
// v0: timePartitioning / clustering / policies / newIntermediateSchemas left null
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.linkedin.openhouse.tables.rest.adapter;

import com.linkedin.openhouse.cluster.configs.ClusterProperties;
import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.tables.common.TableType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.springframework.stereotype.Component;

/**
* Adapter from Iceberg REST {@link CreateTableRequest} to OpenHouse {@link
* CreateUpdateTableRequestBody}.
*
* <p>v0: partition-spec / clustering / policies are not propagated separately — OpenHouse stores
* the Iceberg PartitionSpec inside its own metadata file. Stage-create requests are rejected at the
* controller layer.
*/
@Component
@RequiredArgsConstructor
public class CreateTableRequestAdapter {

private final ClusterProperties clusterProperties;

public CreateUpdateTableRequestBody buildFromCreate(Namespace namespace, CreateTableRequest req) {
NamespaceUtilRest.requireDepthOne(namespace);

Map<String, String> tableProperties = new HashMap<>();
Map<String, String> incoming =
req.properties() == null ? Collections.emptyMap() : req.properties();
for (Map.Entry<String, String> e : incoming.entrySet()) {
if (e.getKey() != null && e.getValue() != null) {
tableProperties.put(e.getKey(), e.getValue());
}
}

String sortOrderJson;
if (req.writeOrder() != null) {
sortOrderJson = SortOrderParser.toJson(req.writeOrder());
} else {
sortOrderJson = SortOrderParser.toJson(SortOrder.unsorted());
}

return CreateUpdateTableRequestBody.builder()
.tableId(req.name())
.databaseId(namespace.level(0))
.clusterId(clusterProperties.getClusterName())
.schema(SchemaParser.toJson(req.schema(), false))
.sortOrder(sortOrderJson)
.baseTableVersion("INITIAL_VERSION")
.tableProperties(tableProperties)
.tableType(TableType.PRIMARY_TABLE)
.stageCreate(false)
// v0: timePartitioning / clustering / policies / newIntermediateSchemas left null
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.openhouse.tables.rest.adapter;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.iceberg.rest.RESTSerializers;

/**
* Project-local replacement for Iceberg's package-private {@code RESTObjectMapper}. Builds a
* Jackson {@link ObjectMapper} configured with Iceberg REST request/response (de)serializers via
* {@link RESTSerializers#registerAll(ObjectMapper)}. Use this for parsing inbound bodies and
* rendering outbound bodies on all {@code /iceberg/v1/*} endpoints.
*/
public final class IcebergRestJson {

private static final ObjectMapper MAPPER;

static {
ObjectMapper m = new ObjectMapper();
// Iceberg's REST response types (ListNamespacesResponse, LoadTableResponse, ...) have
// private fields and method-style accessors like `namespaces()` — not getters.
// Without FIELD visibility ANY, Jackson sees no properties and emits {}.
m.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
m.setPropertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE);
m.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
m.setSerializationInclusion(com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL);
RESTSerializers.registerAll(m);
MAPPER = m;
}

private IcebergRestJson() {}

public static ObjectMapper mapper() {
return MAPPER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.openhouse.tables.rest.adapter;

import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.RESTUtil;

/**
* Helpers for translating between Iceberg REST namespace path variables and {@link Namespace}
* objects. OpenHouse only supports single-level namespaces ("databases"); this helper enforces
* that.
*/
public final class NamespaceUtilRest {

private NamespaceUtilRest() {}

/** Decode an Iceberg REST path-variable (unit-separator delimited) into a {@link Namespace}. */
public static Namespace decode(String pathVariable) {
if (pathVariable == null || pathVariable.isEmpty()) {
throw new RequestValidationFailureException("namespace path variable is empty");
}
return RESTUtil.decodeNamespace(pathVariable);
}

/** Encode a {@link Namespace} back to a path-variable string. */
public static String encode(Namespace ns) {
return RESTUtil.encodeNamespace(ns);
}

/**
* Reject multi-level namespaces — OpenHouse uses a single database level. Empty namespaces are
* also rejected.
*/
public static void requireDepthOne(Namespace ns) {
if (ns == null || ns.isEmpty()) {
throw new RequestValidationFailureException(
"namespace must have exactly one level; got empty namespace");
}
if (ns.length() > 1) {
throw new RequestValidationFailureException(
"multi-level namespaces are not supported; depth=" + ns.length());
}
}

/**
* Return the single database level. Caller must have run {@link #requireDepthOne(Namespace)}
* first; this method does so defensively as well.
*/
public static String singleLevelDb(Namespace ns) {
requireDepthOne(ns);
return ns.level(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.openhouse.tables.rest.adapter;

import com.linkedin.openhouse.internal.catalog.OpenHouseInternalCatalog;
import lombok.RequiredArgsConstructor;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.springframework.stereotype.Component;

/**
* Loads an OpenHouse-backed Iceberg table via {@link OpenHouseInternalCatalog} and wraps it into an
* Iceberg REST {@link LoadTableResponse}. The metadata-location field of the response is filled
* from {@link TableMetadata#metadataFileLocation()} by the Iceberg builder.
*/
@Component
@RequiredArgsConstructor
public class TableLoadAdapter {

private final OpenHouseInternalCatalog openHouseInternalCatalog;

public LoadTableResponse buildLoadTableResponse(String databaseId, String tableId) {
TableIdentifier id = TableIdentifier.of(Namespace.of(databaseId), tableId);
Table table = openHouseInternalCatalog.loadTable(id);
BaseTable baseTable = (BaseTable) table;
TableMetadata metadata = baseTable.operations().current();
return LoadTableResponse.builder().withTableMetadata(metadata).build();
}

/** Convenience: also returns the underlying metadata, for callers that need both. */
public TableMetadata loadMetadata(String databaseId, String tableId) {
TableIdentifier id = TableIdentifier.of(Namespace.of(databaseId), tableId);
Table table = openHouseInternalCatalog.loadTable(id);
BaseTable baseTable = (BaseTable) table;
return baseTable.operations().current();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.openhouse.tables.rest.config;

/** URL roots for the Iceberg REST Catalog adapter. */
public final class IcebergRestPaths {
private IcebergRestPaths() {}

public static final String BASE = "/iceberg";
public static final String V1 = BASE + "/v1";
public static final String CONFIG = V1 + "/config";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.openhouse.tables.rest.controller;

import com.linkedin.openhouse.tables.rest.adapter.IcebergRestJson;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
* Iceberg REST Catalog {@code GET /v1/config} endpoint.
*
* <p>Iceberg 1.5.2 {@link ConfigResponse.Builder} does not expose a {@code withEndpoints(...)}
* method (endpoint advertisement was added in newer Iceberg). Clients will probe each call instead
* and 404s for unsupported endpoints are tolerated by Spark/Flink REST catalog clients.
*/
@RestController
@Slf4j
public class IcebergConfigController {

@GetMapping(value = "/iceberg/v1/config", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> getConfig(
@RequestParam(value = "warehouse", required = false) String warehouse) throws Exception {
// Echo the caller's warehouse name back as `overrides.prefix` so the Iceberg
// REST client routes every subsequent call through /v1/{prefix}/... — that is
// the URL shape this adapter's namespace/table controllers serve.
Map<String, String> overrides = new HashMap<>();
overrides.put("prefix", warehouse == null || warehouse.isEmpty() ? "openhouse" : warehouse);
ConfigResponse response =
ConfigResponse.builder()
.withDefaults(Collections.emptyMap())
.withOverrides(overrides)
.build();
String json = IcebergRestJson.mapper().writeValueAsString(response);
return ResponseEntity.ok().contentType(MediaType.APPLICATION_JSON).body(json);
}
}
Loading