Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 148 additions & 25 deletions src/dlt_iceberg/destination_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import pyarrow as pa
import pyarrow.parquet as pq
from dlt.common import pendulum
from dlt.common.configuration import configspec
from dlt.common.destination import DestinationCapabilitiesContext, Destination
from dlt.common.destination.client import (
Expand All @@ -33,6 +34,7 @@
NoSuchTableError,
NoSuchNamespaceError,
)
from pyiceberg.expressions import EqualTo

from .schema_converter import convert_dlt_to_iceberg_schema
from .partition_builder import build_partition_spec
Expand Down Expand Up @@ -520,43 +522,164 @@ def complete_load(self, load_id: str) -> None:
Called once by dlt after all individual file jobs complete successfully.
Reads from module-level global state since dlt creates multiple client instances.
"""
pending_files: Dict[str, List[Tuple[TTableSchema, str, pa.Table]]] = {}
with _PENDING_FILES_LOCK:
if load_id not in _PENDING_FILES or not _PENDING_FILES[load_id]:
logger.info(f"No files to commit for load {load_id}")
return

# Copy data and clear immediately (under lock)
pending_files = dict(_PENDING_FILES[load_id])
del _PENDING_FILES[load_id]
if load_id in _PENDING_FILES and _PENDING_FILES[load_id]:
# Copy data and clear immediately (under lock)
pending_files = dict(_PENDING_FILES[load_id])
del _PENDING_FILES[load_id]

catalog = self._get_catalog()
namespace = self.config.namespace

total_files = sum(len(files) for files in pending_files.values())
logger.info(
f"Committing {total_files} files across "
f"{len(pending_files)} tables for load {load_id}"
)
if pending_files:
total_files = sum(len(files) for files in pending_files.values())
logger.info(
f"Committing {total_files} files across "
f"{len(pending_files)} tables for load {load_id}"
)

# Process each table
for table_name, file_data in pending_files.items():
identifier = f"{namespace}.{table_name}"
# Process each table
for table_name, file_data in pending_files.items():
identifier = f"{namespace}.{table_name}"

try:
self._commit_table_files(
catalog=catalog,
identifier=identifier,
table_name=table_name,
file_data=file_data,
)
except Exception as e:
logger.error(
f"Failed to commit files for table {identifier}: {e}",
exc_info=True,
)
raise
else:
logger.info(f"No files to commit for load {load_id}")

# dlt expects complete_load to persist a record in _dlt_loads.
# Without this, _dlt_loads is never materialized for this destination.
self._store_completed_load(catalog, load_id)
logger.info(f"Load {load_id} completed successfully")

def _store_completed_load(self, catalog, load_id: str) -> None:
"""Persist a load completion row in the internal _dlt_loads table."""
loads_table_name = self.schema.loads_table_name
identifier = f"{self.config.namespace}.{loads_table_name}"

load_row = pa.table(
{
"load_id": [load_id],
"schema_name": [self.schema.name],
"status": [0],
"inserted_at": [pendulum.now()],
"schema_version_hash": [self.schema.version_hash],
}
)

for attempt in range(self.config.max_retries):
try:
self._commit_table_files(
catalog=catalog,
identifier=identifier,
table_name=table_name,
file_data=file_data,
loads_table = self._get_or_create_loads_table(
catalog, identifier, loads_table_name, load_row
)

# Idempotency: if this load_id was already recorded, do nothing.
if self._load_record_exists(loads_table, load_id):
logger.info(f"Load {load_id} already recorded in {identifier}")
return

expected_schema = schema_to_pyarrow(loads_table.schema())
casted_row = cast_table_safe(load_row, expected_schema, strict=False)
loads_table.append(casted_row)
return

except Exception as e:
logger.error(
f"Failed to commit files for table {identifier}: {e}",
exc_info=True,
retryable = is_retryable_error(e)

# Ambiguous write handling: append may have succeeded but client saw an error.
# Read-after-error: if record exists now, treat as success.
if self._load_record_exists_in_catalog(catalog, identifier, load_id):
logger.warning(
f"Encountered error while recording load {load_id} but record exists; "
f"treating as success: {e}"
)
return

log_error_with_context(
e,
operation=f"record load in {identifier}",
table_name=loads_table_name,
attempt=attempt + 1,
max_attempts=self.config.max_retries,
include_traceback=not retryable,
)
raise

logger.info(f"Load {load_id} completed successfully")
if not retryable:
error_msg = get_user_friendly_error_message(
e, f"record load metadata in table {identifier}"
)
raise RuntimeError(error_msg) from e

if attempt >= self.config.max_retries - 1:
error_msg = get_user_friendly_error_message(
e,
f"record load metadata in table {identifier} after "
f"{self.config.max_retries} attempts",
)
raise RuntimeError(error_msg) from e

sleep_time = self.config.retry_backoff_base ** attempt
logger.info(
f"Retrying load metadata write after {sleep_time}s "
f"(attempt {attempt + 2}/{self.config.max_retries})"
)
time.sleep(sleep_time)

def _get_or_create_loads_table(
self,
catalog,
identifier: str,
loads_table_name: str,
load_row: pa.Table,
):
"""Load the internal _dlt_loads table, creating it if needed."""
try:
return catalog.load_table(identifier)
except NoSuchTableError:
from pyiceberg.partitioning import PartitionSpec

loads_schema = convert_dlt_to_iceberg_schema(
self.schema.get_table(loads_table_name),
load_row,
)
create_kwargs = {
"identifier": identifier,
"schema": loads_schema,
"partition_spec": PartitionSpec(),
}
table_location = self._get_table_location(loads_table_name)
if table_location:
create_kwargs["location"] = table_location

catalog.create_table(
**create_kwargs
)
Comment on lines +666 to +668
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor custom table location when creating _dlt_loads

When _dlt_loads is created on first use, the code bypasses _get_table_location and always relies on catalog defaults, unlike normal table creation; with table_location_layout configured, this creates internal metadata in a different path (and can fail in environments where only the configured layout path is writable). The same location-resolution logic used for data tables should be applied here.

Useful? React with 👍 / 👎.

return catalog.load_table(identifier)

def _load_record_exists(self, loads_table, load_id: str) -> bool:
"""Check whether a load_id already exists in _dlt_loads."""
existing_rows = loads_table.scan(row_filter=EqualTo("load_id", load_id)).to_arrow()
return len(existing_rows) > 0

def _load_record_exists_in_catalog(self, catalog, identifier: str, load_id: str) -> bool:
"""Best-effort catalog-level existence check for ambiguous write outcomes."""
try:
loads_table = catalog.load_table(identifier)
return self._load_record_exists(loads_table, load_id)
except Exception:
return False

def _get_merge_strategy(self, table_schema: TTableSchema) -> str:
"""Extract merge strategy from table schema.
Expand Down
6 changes: 6 additions & 0 deletions tests/test_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ def events():
assert "test_location" in location
assert "events" in location

loads_table = catalog.load_table("test_location._dlt_loads")
loads_location = loads_table.location()
assert "custom_path" in loads_location
assert "test_location" in loads_location
assert "_dlt_loads" in loads_location

finally:
shutil.rmtree(temp_dir, ignore_errors=True)

Expand Down
55 changes: 55 additions & 0 deletions tests/test_class_based_atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,60 @@ def generate_batch_2():
shutil.rmtree(temp_dir, ignore_errors=True)


def test_class_based_creates_dlt_loads_table():
"""Ensure complete_load persists load records in _dlt_loads."""
temp_dir = tempfile.mkdtemp()
warehouse_path = f"{temp_dir}/warehouse"
catalog_path = f"{temp_dir}/catalog.db"

try:
from dlt_iceberg import iceberg_rest
from pyiceberg.catalog import load_catalog

@dlt.resource(name="events", write_disposition="append")
def batch_1():
for i in range(1, 3):
yield {"event_id": i, "value": i * 10}

@dlt.resource(name="events", write_disposition="append")
def batch_2():
for i in range(3, 5):
yield {"event_id": i, "value": i * 10}

pipeline = dlt.pipeline(
pipeline_name="test_dlt_loads",
destination=iceberg_rest(
catalog_uri=f"sqlite:///{catalog_path}",
warehouse=f"file://{warehouse_path}",
namespace="test_ns",
),
dataset_name="test_dataset",
)

load_info_1 = pipeline.run(batch_1())
load_info_2 = pipeline.run(batch_2())

assert not load_info_1.has_failed_jobs
assert not load_info_2.has_failed_jobs

expected_load_ids = {load_info_1.loads_ids[0], load_info_2.loads_ids[0]}

catalog = load_catalog(
"dlt_catalog",
type="sql",
uri=f"sqlite:///{catalog_path}",
warehouse=f"file://{warehouse_path}",
)
loads_table = catalog.load_table("test_ns._dlt_loads")
loads_rows = loads_table.scan().to_arrow()

assert len(loads_rows) == 2
assert set(loads_rows.column("load_id").to_pylist()) == expected_load_ids
assert set(loads_rows.column("status").to_pylist()) == {0}

finally:
shutil.rmtree(temp_dir, ignore_errors=True)


if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
Loading