Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 35 additions & 102 deletions packages/phlo-delta/src/phlo_delta/schema_migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

import pyarrow as pa

from phlo.capabilities.schema import default_classify_change, worst_classification
from phlo.capabilities.specs import NormalizedSchema, SchemaChange, SchemaMigrationPlan
from phlo.capabilities.specs import FieldSpec, NormalizedSchema, SchemaChange, SchemaMigrationPlan
from phlo.hooks.emitters import SchemaMigrationEventContext, SchemaMigrationEventEmitter
from phlo.logging import get_logger
from phlo.schema_migration.planning import (
SchemaMigrationInstructions,
SchemaPlanningPolicy,
classify_schema_change,
plan_schema_migration,
)
from phlo_delta.tables import _default_storage_options, _resolve_table_uri

logger = get_logger(__name__)
Expand All @@ -41,14 +46,10 @@
}
# Mapping of PyArrow data types to canonical dtype strings.

_WIDEN_PAIRS: set[tuple[str, str]] = {
("int32", "int64"),
("float32", "float64"),
("int32", "float64"),
("int64", "float64"),
("date", "timestamptz"),
}
# Set of valid type widening pairs for schema evolution.
DELTA_SCHEMA_POLICY = SchemaPlanningPolicy(
change_classifications={"drop": "warning", "rename": "safe"},
recommendations={"drop": "Dropped columns are recoverable via Delta Lake time travel."},
)


def _arrow_type_to_dtype(arrow_type: pa.DataType) -> str:
Expand Down Expand Up @@ -186,13 +187,15 @@ def classify_change(self, change_type: str, **details: Any) -> str:
# Returns: "safe"

"""
if change_type == "rename":
return "safe"
if change_type == "drop":
return "warning"
return default_classify_change(change_type, **details)

def diff_schema(self, *, table_name: str, desired: NormalizedSchema) -> SchemaMigrationPlan:
return classify_schema_change(change_type, policy=DELTA_SCHEMA_POLICY, **details)

def diff_schema(
self,
*,
table_name: str,
desired: NormalizedSchema,
instructions: SchemaMigrationInstructions | None = None,
) -> SchemaMigrationPlan:
"""Compare *desired* schema against current Delta table schema.

Analyzes differences between the desired schema and the existing
Expand Down Expand Up @@ -226,102 +229,32 @@ def diff_schema(self, *, table_name: str, desired: NormalizedSchema) -> SchemaMi
dt = DeltaTable(table_uri, storage_options=opts)
current_schema = cast(Any, dt.schema()).to_pyarrow()

current_fields: dict[str, tuple[str, bool]] = {}
current_fields: list[FieldSpec] = []
for field in current_schema:
current_fields[field.name] = (_arrow_type_to_dtype(field.type), field.nullable)

desired_fields: dict[str, tuple[str, bool]] = {}
for f in desired.fields:
desired_fields[f.name] = (f.dtype, f.nullable)

changes: list[SchemaChange] = []

for name, (dtype, nullable) in desired_fields.items():
if name not in current_fields:
cls = self.classify_change("add", nullable=nullable, has_default=False)
changes.append(
SchemaChange(
field_name=name,
change_type="add",
new_value=dtype,
classification=cls,
)
)

for name in current_fields:
if name not in desired_fields:
cls = self.classify_change("drop")
changes.append(
SchemaChange(
field_name=name,
change_type="drop",
old_value=current_fields[name][0],
classification=cls,
)
)

for name in current_fields.keys() & desired_fields.keys():
cur_dtype, cur_nullable = current_fields[name]
des_dtype, des_nullable = desired_fields[name]

if cur_dtype != des_dtype:
if (cur_dtype, des_dtype) in _WIDEN_PAIRS:
change_type = "widen_type"
else:
change_type = "narrow_type"
cls = self.classify_change(change_type)
changes.append(
SchemaChange(
field_name=name,
change_type=change_type,
old_value=cur_dtype,
new_value=des_dtype,
classification=cls,
)
current_fields.append(
FieldSpec(
name=field.name,
dtype=_arrow_type_to_dtype(field.type),
nullable=field.nullable,
)
)

if cur_nullable != des_nullable:
if des_nullable and not cur_nullable:
change_type = "nullability_relaxed"
else:
change_type = "nullability_tightened"
cls = self.classify_change(change_type)
changes.append(
SchemaChange(
field_name=name,
change_type=change_type,
old_value=str(cur_nullable),
new_value=str(des_nullable),
classification=cls,
)
)

classifications = [c.classification for c in changes]
overall = worst_classification(classifications)
requires_approval = overall == "breaking"

recommendations: list[str] = []
if requires_approval:
recommendations.append("Breaking changes detected — requires explicit approval.")
if any(c.change_type == "drop" for c in changes):
recommendations.append("Dropped columns are recoverable via Delta Lake time travel.")

plan = SchemaMigrationPlan(
plan = plan_schema_migration(
table_name=table_name,
changes=changes,
classification=overall,
recommendations=recommendations,
requires_approval=requires_approval,
current=NormalizedSchema(fields=current_fields),
desired=desired,
policy=DELTA_SCHEMA_POLICY,
instructions=instructions,
)

emitter = SchemaMigrationEventEmitter(
SchemaMigrationEventContext(table_name=table_name, tags={"backend": "delta"})
)
emitter.emit(
status="planned",
classification=overall,
change_count=len(changes),
changes=[asdict(c) for c in changes],
classification=plan.classification,
change_count=len(plan.changes),
changes=[asdict(c) for c in plan.changes],
)

return plan
Expand Down
141 changes: 36 additions & 105 deletions packages/phlo-iceberg/src/phlo_iceberg/schema_migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@

from dataclasses import asdict

from phlo.capabilities.schema import default_classify_change, worst_classification
from phlo.capabilities.specs import NormalizedSchema, SchemaChange, SchemaMigrationPlan
from phlo.capabilities.specs import FieldSpec, NormalizedSchema, SchemaChange, SchemaMigrationPlan
from phlo.hooks.emitters import SchemaMigrationEventContext, SchemaMigrationEventEmitter
from phlo.logging import get_logger
from phlo.schema_migration.planning import (
SchemaMigrationInstructions,
SchemaPlanningPolicy,
classify_schema_change,
plan_schema_migration,
)
from phlo_iceberg.catalog import get_catalog
from phlo_iceberg.settings import get_settings

Expand All @@ -93,14 +98,6 @@
DecimalType: "decimal",
}

_WIDEN_PAIRS: set[tuple[str, str]] = {
("int32", "int64"),
("float32", "float64"),
("int32", "float64"),
("int64", "float64"),
("date", "timestamptz"),
}

_SYSTEM_METADATA_FIELDS = {
"_dlt_load_id",
"_dlt_id",
Expand All @@ -110,6 +107,11 @@
"_phlo_run_id",
}

ICEBERG_SCHEMA_POLICY = SchemaPlanningPolicy(
change_classifications={"drop": "warning", "rename": "safe"},
recommendations={"drop": "Dropped columns are recoverable via Iceberg snapshot rollback."},
)


def _iceberg_type_to_dtype(iceberg_type: IcebergType) -> str:
"""Map a PyIceberg type instance to a canonical dtype string."""
Expand Down Expand Up @@ -240,13 +242,15 @@ def classify_change(self, change_type: str, **details: Any) -> str:
assert migrator.classify_change("add", nullable=False, has_default=False) == "breaking"

"""
if change_type == "rename":
return "safe"
if change_type == "drop":
return "warning"
return default_classify_change(change_type, **details)

def diff_schema(self, *, table_name: str, desired: NormalizedSchema) -> SchemaMigrationPlan:
return classify_schema_change(change_type, policy=ICEBERG_SCHEMA_POLICY, **details)

def diff_schema(
self,
*,
table_name: str,
desired: NormalizedSchema,
instructions: SchemaMigrationInstructions | None = None,
) -> SchemaMigrationPlan:
"""Compare desired schema against current table schema.

Detects all differences between the desired schema and the current
Expand Down Expand Up @@ -301,107 +305,34 @@ def diff_schema(self, *, table_name: str, desired: NormalizedSchema) -> SchemaMi
table = catalog.load_table(table_name)
current_schema = table.schema()

current_fields: dict[str, tuple[str, bool]] = {}
current_fields: list[FieldSpec] = []
for f in current_schema.fields:
if f.name in _SYSTEM_METADATA_FIELDS:
continue
current_fields[f.name] = (_iceberg_type_to_dtype(f.field_type), f.required is False)

desired_fields: dict[str, tuple[str, bool]] = {}
for f in desired.fields:
desired_fields[f.name] = (f.dtype, f.nullable)

changes: list[SchemaChange] = []

# Added fields
for name, (dtype, nullable) in desired_fields.items():
if name not in current_fields:
cls = self.classify_change("add", nullable=nullable, has_default=False)
changes.append(
SchemaChange(
field_name=name,
change_type="add",
new_value=dtype,
classification=cls,
)
)

# Dropped fields
for name in current_fields:
if name not in desired_fields:
cls = self.classify_change("drop")
changes.append(
SchemaChange(
field_name=name,
change_type="drop",
old_value=current_fields[name][0],
classification=cls,
)
current_fields.append(
FieldSpec(
name=f.name,
dtype=_iceberg_type_to_dtype(f.field_type),
nullable=f.required is False,
)
)

# Type and nullability changes on common fields
for name in current_fields.keys() & desired_fields.keys():
cur_dtype, cur_nullable = current_fields[name]
des_dtype, des_nullable = desired_fields[name]

if cur_dtype != des_dtype:
if (cur_dtype, des_dtype) in _WIDEN_PAIRS:
change_type = "widen_type"
else:
change_type = "narrow_type"
cls = self.classify_change(change_type)
changes.append(
SchemaChange(
field_name=name,
change_type=change_type,
old_value=cur_dtype,
new_value=des_dtype,
classification=cls,
)
)

if cur_nullable != des_nullable:
if des_nullable and not cur_nullable:
change_type = "nullability_relaxed"
else:
change_type = "nullability_tightened"
cls = self.classify_change(change_type)
changes.append(
SchemaChange(
field_name=name,
change_type=change_type,
old_value=str(cur_nullable),
new_value=str(des_nullable),
classification=cls,
)
)

classifications = [c.classification for c in changes]
overall = worst_classification(classifications)
requires_approval = overall == "breaking"

recommendations: list[str] = []
if requires_approval:
recommendations.append("Breaking changes detected — requires explicit approval.")
if any(c.change_type == "drop" for c in changes):
recommendations.append("Dropped columns are recoverable via Iceberg snapshot rollback.")

plan = SchemaMigrationPlan(
plan = plan_schema_migration(
table_name=table_name,
changes=changes,
classification=overall,
recommendations=recommendations,
requires_approval=requires_approval,
current=NormalizedSchema(fields=current_fields),
desired=desired,
policy=ICEBERG_SCHEMA_POLICY,
instructions=instructions,
)

emitter = SchemaMigrationEventEmitter(
SchemaMigrationEventContext(table_name=table_name, tags={"backend": "iceberg"})
)
emitter.emit(
status="planned",
classification=overall,
change_count=len(changes),
changes=[asdict(c) for c in changes],
classification=plan.classification,
change_count=len(plan.changes),
changes=[asdict(c) for c in plan.changes],
)

return plan
Expand Down
Loading
Loading