-
Notifications
You must be signed in to change notification settings - Fork 78
OH case-insensitive writes via OHSparkCatalog and OHWriteSchemaNormalizationRule #586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pandaamit91
wants to merge
1
commit into
linkedin:main
Choose a base branch
from
pandaamit91:ampanda/oh-case-insensitive-writes
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
235 changes: 189 additions & 46 deletions
235
...rk-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java
Large diffs are not rendered by default.
Oops, something went wrong.
65 changes: 65 additions & 0 deletions
65
....1/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| package com.linkedin.openhouse.spark; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.spark.SparkCatalog; | ||
| import org.apache.iceberg.spark.source.SparkTable; | ||
| import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; | ||
| import org.apache.spark.sql.connector.catalog.Identifier; | ||
| import org.apache.spark.sql.connector.catalog.TableCapability; | ||
|
|
||
| /** | ||
| * Spark catalog wrapper for OpenHouse that extends {@link SparkCatalog} and annotates every loaded | ||
| * table with {@link TableCapability#ACCEPT_ANY_SCHEMA}. | ||
| * | ||
| * <p>Why {@code ACCEPT_ANY_SCHEMA}? Spark's {@code ResolveOutputRelation} analyzer rule uses the | ||
| * session resolver (case-sensitive when {@code spark.sql.caseSensitive=true}). If a client | ||
| * DataFrame has column {@code "id"} but the OH table stores {@code "ID"}, the write command never | ||
| * resolves — Spark throws "Cannot find data for output column 'ID'" before OH's own server-side | ||
| * case-insensitive schema validation runs. | ||
| * | ||
| * <p>Advertising {@code ACCEPT_ANY_SCHEMA} causes {@code DataSourceV2Relation.skipSchemaResolution} | ||
| * to return {@code true}, which makes {@code V2WriteCommand.outputResolved} return {@code true} | ||
| * immediately. {@code ResolveOutputRelation} therefore skips OH write commands, allowing {@link | ||
| * OHWriteSchemaNormalizationRule} (a post-hoc resolution rule) to insert the necessary | ||
| * column-renaming {@code Project} before execution. | ||
| * | ||
| * <p>For reads and DDL the capability has no effect; it is only consulted during write analysis. | ||
| * | ||
| * <p>Configuration: | ||
| * | ||
| * <pre> | ||
| * spark.sql.catalog.openhouse=com.linkedin.openhouse.spark.OHSparkCatalog | ||
| * spark.sql.catalog.openhouse.catalog-impl=com.linkedin.openhouse.spark.OpenHouseCatalog | ||
| * </pre> | ||
| */ | ||
| public class OHSparkCatalog extends SparkCatalog { | ||
|
|
||
| @Override | ||
| public SparkTable loadTable(Identifier ident) throws NoSuchTableException { | ||
| SparkTable original = super.loadTable(ident); | ||
| return withAcceptAnySchema(original); | ||
| } | ||
|
|
||
| /** | ||
| * Wraps a {@link SparkTable} in an anonymous subclass that adds {@link | ||
| * TableCapability#ACCEPT_ANY_SCHEMA} to the table's capabilities. | ||
| * | ||
| * <p>The anonymous class delegates all other behaviour to the original table by invoking {@code | ||
| * super} (which delegates to the underlying Iceberg table object). {@code snapshotId=null} and | ||
| * {@code refreshEagerly=false} are the correct defaults for a standard (non-time-travel) table | ||
| * load; the original table's Iceberg {@code Table} object is passed unchanged so all reads and | ||
| * writes continue to use the real table state. | ||
| */ | ||
| private SparkTable withAcceptAnySchema(SparkTable original) { | ||
| return new SparkTable(original.table(), null /* snapshotId */, false /* refreshEagerly */) { | ||
| @Override | ||
| public Set<TableCapability> capabilities() { | ||
| Set<TableCapability> caps = new HashSet<>(original.capabilities()); | ||
| caps.add(TableCapability.ACCEPT_ANY_SCHEMA); | ||
| return Collections.unmodifiableSet(caps); | ||
| } | ||
| }; | ||
| } | ||
| } | ||
163 changes: 163 additions & 0 deletions
163
...c/main/scala/com/linkedin/openhouse/spark/extensions/OHWriteSchemaNormalizationRule.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| package com.linkedin.openhouse.spark.extensions | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Cast} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, V2WriteCommand} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | ||
|
|
||
| /** | ||
| * Post-hoc resolution rule that replicates the column-name and type normalization that Spark's | ||
| * {@code ResolveOutputRelation} would have applied to OpenHouse write commands, compensating for | ||
| * the fact that OH tables advertise {@link org.apache.spark.sql.connector.catalog.TableCapability#ACCEPT_ANY_SCHEMA} | ||
| * (via {@link OHSparkCatalog}) which causes {@code ResolveOutputRelation} to skip them entirely. | ||
| * | ||
| * <p>Why {@code ACCEPT_ANY_SCHEMA}? Spark's {@code ResolveOutputRelation} throws at analysis time | ||
| * when {@code caseSensitive=true} and a client DataFrame column (e.g. {@code "id"}) does not match | ||
| * the OH table column name exactly ({@code "ID"}). Advertising {@code ACCEPT_ANY_SCHEMA} prevents | ||
| * the throw. This rule then runs as a {@code Post-Hoc Resolution} rule and does the work that | ||
| * {@code ResolveOutputRelation} would have done: wrapping the source query in a {@code Project} | ||
| * that renames (and if necessary casts) each source column to the stored OH casing and type. | ||
| * | ||
| * <p>The rule handles both write modes: | ||
| * <ul> | ||
| * <li><b>By-name writes</b> ({@code isByName=true}, e.g. {@code df.writeTo().append()}): each | ||
| * source column is matched to the target column whose name it equals case-insensitively. | ||
| * Tables with case-duplicate columns are skipped (ambiguous target).</li> | ||
| * <li><b>By-position writes</b> ({@code isByName=false}, e.g. {@code INSERT INTO … VALUES …}): | ||
| * source and target columns are zipped positionally and each source column is renamed (and | ||
| * if the types differ, cast) to match the target. This replicates the {@code Alias} + | ||
| * {@code Cast} that {@code ResolveOutputRelation} would have inserted.</li> | ||
| * </ul> | ||
| * | ||
| * <p>In both modes, if source and target already match in name and type the rule returns the plan | ||
| * unchanged. If the column count differs the rule is a no-op (the mismatch is left for Iceberg or | ||
| * the OH server to report). | ||
| */ | ||
| class OHWriteSchemaNormalizationRule(spark: SparkSession) extends Rule[LogicalPlan] { | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = { | ||
| plan.transformDown { | ||
| case write: V2WriteCommand | ||
| if write.table.resolved && write.query.resolved && isOHWrite(write) => | ||
| normalizeColumnNames(write).getOrElse(write) | ||
| } | ||
| } | ||
|
|
||
| private def isOHWrite(write: V2WriteCommand): Boolean = { | ||
| write.table match { | ||
| case rel: DataSourceV2Relation => isOHRelation(rel) | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| private def normalizeColumnNames(write: V2WriteCommand): Option[V2WriteCommand] = { | ||
| val ohRelation = write.table match { | ||
| case rel: DataSourceV2Relation => rel | ||
| case _ => return None | ||
| } | ||
|
|
||
| val targetCols = ohRelation.output | ||
| val sourceCols = write.query.output | ||
|
|
||
| // If column counts differ, leave it to Iceberg / the OH server to report the mismatch. | ||
| if (sourceCols.size != targetCols.size) return None | ||
|
|
||
| val projections = | ||
| if (write.isByName) projectByName(sourceCols, targetCols) | ||
| else projectByPosition(sourceCols, targetCols) | ||
|
|
||
| projections match { | ||
| case None => None | ||
| case Some(exprs) => Some(write.withNewQuery(Project(exprs, write.query))) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * By-name mode: replicate what {@code ResolveOutputRelation} does for by-name writes — produce a | ||
| * projection in <em>target column order</em> that renames (and if necessary casts) each source | ||
| * column to the stored OH casing. This also handles the case where the source DataFrame has | ||
| * columns in a different order than the stored schema (e.g. when the source is built from a bean | ||
| * whose fields are introspected alphabetically). | ||
| * | ||
| * <p>Tables with case-duplicate columns are skipped (the target is ambiguous). | ||
| */ | ||
| private def projectByName( | ||
| sourceCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute], | ||
| targetCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute]) | ||
| : Option[Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]] = { | ||
|
|
||
| // Case-duplicate target: skip normalization to avoid silently misdirecting the write. | ||
| val targetGrouped = targetCols.groupBy(_.name.toLowerCase) | ||
| if (targetGrouped.values.exists(_.size > 1)) return None | ||
|
|
||
| // Case-duplicate source: skip to avoid ambiguous lookup. | ||
| val srcGrouped = sourceCols.groupBy(_.name.toLowerCase) | ||
| if (srcGrouped.values.exists(_.size > 1)) return None | ||
| val srcByLower: Map[String, org.apache.spark.sql.catalyst.expressions.Attribute] = | ||
| srcGrouped.map { case (lower, attrs) => lower -> attrs.head } | ||
|
|
||
| // Produce expressions in TARGET column order (replicating ResolveOutputRelation). | ||
| // For each target column find the matching source column by case-insensitive name. | ||
| val exprs: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = | ||
| targetCols.map { tgt => | ||
| srcByLower.get(tgt.name.toLowerCase) match { | ||
| case Some(src) if src.name == tgt.name => src // correct casing, keep as-is | ||
| case Some(src) => Alias(src, tgt.name)() // rename to stored casing | ||
| case None => return None // unmatched column | ||
| } | ||
| } | ||
|
|
||
| // No-op if the result is identical to the source (same column order, same names). | ||
| val unchanged = exprs.zip(sourceCols).forall { | ||
| case (expr: org.apache.spark.sql.catalyst.expressions.Attribute, src) => | ||
| expr.exprId == src.exprId | ||
| case _ => false | ||
| } | ||
| if (unchanged) None else Some(exprs) | ||
| } | ||
|
|
||
| /** | ||
| * By-position mode (e.g. {@code INSERT INTO … VALUES …}): zip source and target by position. | ||
| * For each pair, replicate what {@code ResolveOutputRelation} would have done: | ||
| * <ul> | ||
| * <li>If names and types already match, keep the source attribute as-is.</li> | ||
| * <li>Otherwise, wrap the source in {@code Alias(Cast(src, targetType), targetName)} to | ||
| * rename the column and coerce the type to the stored schema.</li> | ||
| * </ul> | ||
| */ | ||
| private def projectByPosition( | ||
| sourceCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute], | ||
| targetCols: Seq[org.apache.spark.sql.catalyst.expressions.Attribute]) | ||
| : Option[Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]] = { | ||
|
|
||
| val pairsNeedingChange = sourceCols.zip(targetCols).filter { | ||
| case (src, tgt) => | ||
| src.name != tgt.name || | ||
| src.dataType != tgt.dataType || | ||
| src.metadata != tgt.metadata | ||
| } | ||
| if (pairsNeedingChange.isEmpty) return None | ||
|
|
||
| val exprs = sourceCols.zip(targetCols).map { | ||
| case (src, tgt) | ||
| if src.name == tgt.name && src.dataType == tgt.dataType && src.metadata == tgt.metadata => | ||
| src | ||
| case (src, tgt) => | ||
| val castExpr = if (src.dataType == tgt.dataType) src | ||
| else Cast(src, tgt.dataType, Option(spark.conf.get("spark.sql.session.timeZone"))) | ||
| Alias(castExpr, tgt.name)(explicitMetadata = Some(tgt.metadata)) | ||
| } | ||
| Some(exprs) | ||
| } | ||
|
|
||
| private def isOHRelation(rel: DataSourceV2Relation): Boolean = { | ||
| rel.catalog match { | ||
| case Some(c) => | ||
| val key = s"spark.sql.catalog.${c.name()}.catalog-impl" | ||
| spark.conf.getOption(key).exists(_.toLowerCase.contains("openhouse")) | ||
| case None => | ||
| false | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
....5/openhouse-spark-runtime/src/main/java/com/linkedin/openhouse/spark/OHSparkCatalog.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| package com.linkedin.openhouse.spark; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.spark.SparkCatalog; | ||
| import org.apache.iceberg.spark.source.SparkTable; | ||
| import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; | ||
| import org.apache.spark.sql.connector.catalog.Identifier; | ||
| import org.apache.spark.sql.connector.catalog.Table; | ||
| import org.apache.spark.sql.connector.catalog.TableCapability; | ||
|
|
||
| /** | ||
| * OpenHouse catalog extension for Spark 3.5 / Iceberg 1.5. Overrides {@link | ||
| * SparkCatalog#loadTable(Identifier)} to advertise {@link TableCapability#ACCEPT_ANY_SCHEMA} on | ||
| * every OpenHouse table. This prevents Spark's {@code ResolveOutputRelation} from throwing at | ||
| * analysis time when {@code caseSensitive=true} and the source column casing differs from the | ||
| * stored column name. The companion rule {@link | ||
| * com.linkedin.openhouse.spark.extensions.OHWriteSchemaNormalizationRule} runs as a post-hoc | ||
| * resolution rule and applies the necessary column renaming / casting that {@code | ||
| * ResolveOutputRelation} would otherwise have done. | ||
| */ | ||
| public class OHSparkCatalog extends SparkCatalog { | ||
|
|
||
| @Override | ||
| public Table loadTable(Identifier ident) throws NoSuchTableException { | ||
| Table original = super.loadTable(ident); | ||
| if (original instanceof SparkTable) { | ||
| return withAcceptAnySchema((SparkTable) original); | ||
| } | ||
| return original; | ||
| } | ||
|
|
||
| private SparkTable withAcceptAnySchema(SparkTable original) { | ||
| // SparkTable carries a branch field (set when loading branch-qualified identifiers like | ||
| // "table.branch_feature_a"). We must use the SparkTable(Table, String, boolean) constructor | ||
| // when a branch is present so that newWriteBuilder() targets the correct branch. | ||
| // Using SparkTable(Table, Long, boolean) with snapshotId=null would silently drop the branch | ||
| // and cause all branch writes to land on the main table instead. | ||
| String branch = original.branch(); | ||
| if (branch != null) { | ||
| return new SparkTable(original.table(), branch, false /* refreshEagerly */) { | ||
| @Override | ||
| public Set<TableCapability> capabilities() { | ||
| Set<TableCapability> caps = new HashSet<>(original.capabilities()); | ||
| caps.add(TableCapability.ACCEPT_ANY_SCHEMA); | ||
| return Collections.unmodifiableSet(caps); | ||
| } | ||
| }; | ||
| } | ||
| return new SparkTable(original.table(), original.snapshotId(), false /* refreshEagerly */) { | ||
| @Override | ||
| public Set<TableCapability> capabilities() { | ||
| Set<TableCapability> caps = new HashSet<>(original.capabilities()); | ||
| caps.add(TableCapability.ACCEPT_ANY_SCHEMA); | ||
| return Collections.unmodifiableSet(caps); | ||
| } | ||
| }; | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have V2 sparkCatalog interface implemented in the internal fork i think, or we did and needed to deramp it for an unrelated failure.
We would need to co-ordinate the the two set of changes / make them compatible