From 240637eff54c045d08295a77a40a1d319338abf6 Mon Sep 17 00:00:00 2001 From: Nico Ritschel Date: Tue, 10 Feb 2026 18:47:55 -0800 Subject: [PATCH 1/3] fix: persist completed loads in _dlt_loads --- src/dlt_iceberg/destination_client.py | 96 +++++++++++++++++++-------- tests/test_class_based_atomic.py | 55 +++++++++++++++ 2 files changed, 123 insertions(+), 28 deletions(-) diff --git a/src/dlt_iceberg/destination_client.py b/src/dlt_iceberg/destination_client.py index 8f8919d..b72be48 100644 --- a/src/dlt_iceberg/destination_client.py +++ b/src/dlt_iceberg/destination_client.py @@ -15,6 +15,7 @@ import pyarrow as pa import pyarrow.parquet as pq +from dlt.common import pendulum from dlt.common.configuration import configspec from dlt.common.destination import DestinationCapabilitiesContext, Destination from dlt.common.destination.client import ( @@ -520,44 +521,83 @@ def complete_load(self, load_id: str) -> None: Called once by dlt after all individual file jobs complete successfully. Reads from module-level global state since dlt creates multiple client instances. """ + pending_files: Dict[str, List[Tuple[TTableSchema, str, pa.Table]]] = {} with _PENDING_FILES_LOCK: - if load_id not in _PENDING_FILES or not _PENDING_FILES[load_id]: - logger.info(f"No files to commit for load {load_id}") - return - - # Copy data and clear immediately (under lock) - pending_files = dict(_PENDING_FILES[load_id]) - del _PENDING_FILES[load_id] + if load_id in _PENDING_FILES and _PENDING_FILES[load_id]: + # Copy data and clear immediately (under lock) + pending_files = dict(_PENDING_FILES[load_id]) + del _PENDING_FILES[load_id] catalog = self._get_catalog() namespace = self.config.namespace - total_files = sum(len(files) for files in pending_files.values()) - logger.info( - f"Committing {total_files} files across " - f"{len(pending_files)} tables for load {load_id}" - ) + if pending_files: + total_files = sum(len(files) for files in pending_files.values()) + logger.info( + f"Committing {total_files} files across " + f"{len(pending_files)} tables for load {load_id}" + ) - # Process each table - for table_name, file_data in pending_files.items(): - identifier = f"{namespace}.{table_name}" + # Process each table + for table_name, file_data in pending_files.items(): + identifier = f"{namespace}.{table_name}" - try: - self._commit_table_files( - catalog=catalog, - identifier=identifier, - table_name=table_name, - file_data=file_data, - ) - except Exception as e: - logger.error( - f"Failed to commit files for table {identifier}: {e}", - exc_info=True, - ) - raise + try: + self._commit_table_files( + catalog=catalog, + identifier=identifier, + table_name=table_name, + file_data=file_data, + ) + except Exception as e: + logger.error( + f"Failed to commit files for table {identifier}: {e}", + exc_info=True, + ) + raise + else: + logger.info(f"No files to commit for load {load_id}") + # dlt expects complete_load to persist a record in _dlt_loads. + # Without this, _dlt_loads is never materialized for this destination. + self._store_completed_load(catalog, load_id) logger.info(f"Load {load_id} completed successfully") + def _store_completed_load(self, catalog, load_id: str) -> None: + """Persist a load completion row in the internal _dlt_loads table.""" + loads_table_name = self.schema.loads_table_name + identifier = f"{self.config.namespace}.{loads_table_name}" + + load_row = pa.table( + { + "load_id": [load_id], + "schema_name": [self.schema.name], + "status": [0], + "inserted_at": [pendulum.now()], + "schema_version_hash": [self.schema.version_hash], + } + ) + + try: + loads_table = catalog.load_table(identifier) + except NoSuchTableError: + from pyiceberg.partitioning import PartitionSpec + + loads_schema = convert_dlt_to_iceberg_schema( + self.schema.get_table(loads_table_name), + load_row, + ) + catalog.create_table( + identifier=identifier, + schema=loads_schema, + partition_spec=PartitionSpec(), + ) + loads_table = catalog.load_table(identifier) + + expected_schema = schema_to_pyarrow(loads_table.schema()) + casted_row = cast_table_safe(load_row, expected_schema, strict=False) + loads_table.append(casted_row) + def _get_merge_strategy(self, table_schema: TTableSchema) -> str: """Extract merge strategy from table schema. diff --git a/tests/test_class_based_atomic.py b/tests/test_class_based_atomic.py index 1c55b53..9de587c 100644 --- a/tests/test_class_based_atomic.py +++ b/tests/test_class_based_atomic.py @@ -293,5 +293,60 @@ def generate_batch_2(): shutil.rmtree(temp_dir, ignore_errors=True) +def test_class_based_creates_dlt_loads_table(): + """Ensure complete_load persists load records in _dlt_loads.""" + temp_dir = tempfile.mkdtemp() + warehouse_path = f"{temp_dir}/warehouse" + catalog_path = f"{temp_dir}/catalog.db" + + try: + from dlt_iceberg import iceberg_rest + from pyiceberg.catalog import load_catalog + + @dlt.resource(name="events", write_disposition="append") + def batch_1(): + for i in range(1, 3): + yield {"event_id": i, "value": i * 10} + + @dlt.resource(name="events", write_disposition="append") + def batch_2(): + for i in range(3, 5): + yield {"event_id": i, "value": i * 10} + + pipeline = dlt.pipeline( + pipeline_name="test_dlt_loads", + destination=iceberg_rest( + catalog_uri=f"sqlite:///{catalog_path}", + warehouse=f"file://{warehouse_path}", + namespace="test_ns", + ), + dataset_name="test_dataset", + ) + + load_info_1 = pipeline.run(batch_1()) + load_info_2 = pipeline.run(batch_2()) + + assert not load_info_1.has_failed_jobs + assert not load_info_2.has_failed_jobs + + expected_load_ids = {load_info_1.loads_ids[0], load_info_2.loads_ids[0]} + + catalog = load_catalog( + "dlt_catalog", + type="sql", + uri=f"sqlite:///{catalog_path}", + warehouse=f"file://{warehouse_path}", + ) + loads_table = catalog.load_table("test_ns._dlt_loads") + loads_rows = loads_table.scan().to_arrow() + + assert len(loads_rows) == 2 + assert set(loads_rows.column("load_id").to_pylist()) == expected_load_ids + assert set(loads_rows.column("status").to_pylist()) == {0} + + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + if __name__ == "__main__": pytest.main([__file__, "-v", "-s"]) From bd3b4496ff2b6189895160eac4de701a1a7055e1 Mon Sep 17 00:00:00 2001 From: Nico Ritschel Date: Tue, 10 Feb 2026 18:58:50 -0800 Subject: [PATCH 2/3] fix: make _dlt_loads write retryable and idempotent --- src/dlt_iceberg/destination_client.py | 86 +++++++++++++++++++++-- tests/test_load_metadata_resilience.py | 97 ++++++++++++++++++++++++++ 2 files changed, 178 insertions(+), 5 deletions(-) create mode 100644 tests/test_load_metadata_resilience.py diff --git a/src/dlt_iceberg/destination_client.py b/src/dlt_iceberg/destination_client.py index b72be48..d3c12ca 100644 --- a/src/dlt_iceberg/destination_client.py +++ b/src/dlt_iceberg/destination_client.py @@ -34,6 +34,7 @@ NoSuchTableError, NoSuchNamespaceError, ) +from pyiceberg.expressions import EqualTo from .schema_converter import convert_dlt_to_iceberg_schema from .partition_builder import build_partition_spec @@ -578,8 +579,74 @@ def _store_completed_load(self, catalog, load_id: str) -> None: } ) + for attempt in range(self.config.max_retries): + try: + loads_table = self._get_or_create_loads_table( + catalog, identifier, loads_table_name, load_row + ) + + # Idempotency: if this load_id was already recorded, do nothing. + if self._load_record_exists(loads_table, load_id): + logger.info(f"Load {load_id} already recorded in {identifier}") + return + + expected_schema = schema_to_pyarrow(loads_table.schema()) + casted_row = cast_table_safe(load_row, expected_schema, strict=False) + loads_table.append(casted_row) + return + + except Exception as e: + retryable = is_retryable_error(e) + + # Ambiguous write handling: append may have succeeded but client saw an error. + # Read-after-error: if record exists now, treat as success. + if self._load_record_exists_in_catalog(catalog, identifier, load_id): + logger.warning( + f"Encountered error while recording load {load_id} but record exists; " + f"treating as success: {e}" + ) + return + + log_error_with_context( + e, + operation=f"record load in {identifier}", + table_name=loads_table_name, + attempt=attempt + 1, + max_attempts=self.config.max_retries, + include_traceback=not retryable, + ) + + if not retryable: + error_msg = get_user_friendly_error_message( + e, f"record load metadata in table {identifier}" + ) + raise RuntimeError(error_msg) from e + + if attempt >= self.config.max_retries - 1: + error_msg = get_user_friendly_error_message( + e, + f"record load metadata in table {identifier} after " + f"{self.config.max_retries} attempts", + ) + raise RuntimeError(error_msg) from e + + sleep_time = self.config.retry_backoff_base ** attempt + logger.info( + f"Retrying load metadata write after {sleep_time}s " + f"(attempt {attempt + 2}/{self.config.max_retries})" + ) + time.sleep(sleep_time) + + def _get_or_create_loads_table( + self, + catalog, + identifier: str, + loads_table_name: str, + load_row: pa.Table, + ): + """Load the internal _dlt_loads table, creating it if needed.""" try: - loads_table = catalog.load_table(identifier) + return catalog.load_table(identifier) except NoSuchTableError: from pyiceberg.partitioning import PartitionSpec @@ -592,11 +659,20 @@ def _store_completed_load(self, catalog, load_id: str) -> None: schema=loads_schema, partition_spec=PartitionSpec(), ) - loads_table = catalog.load_table(identifier) + return catalog.load_table(identifier) - expected_schema = schema_to_pyarrow(loads_table.schema()) - casted_row = cast_table_safe(load_row, expected_schema, strict=False) - loads_table.append(casted_row) + def _load_record_exists(self, loads_table, load_id: str) -> bool: + """Check whether a load_id already exists in _dlt_loads.""" + existing_rows = loads_table.scan(row_filter=EqualTo("load_id", load_id)).to_arrow() + return len(existing_rows) > 0 + + def _load_record_exists_in_catalog(self, catalog, identifier: str, load_id: str) -> bool: + """Best-effort catalog-level existence check for ambiguous write outcomes.""" + try: + loads_table = catalog.load_table(identifier) + return self._load_record_exists(loads_table, load_id) + except Exception: + return False def _get_merge_strategy(self, table_schema: TTableSchema) -> str: """Extract merge strategy from table schema. diff --git a/tests/test_load_metadata_resilience.py b/tests/test_load_metadata_resilience.py new file mode 100644 index 0000000..7882100 --- /dev/null +++ b/tests/test_load_metadata_resilience.py @@ -0,0 +1,97 @@ +""" +Tests for _dlt_loads metadata write resilience in class-based destination. +""" + +from unittest.mock import Mock, patch + +import pyarrow as pa +from pyiceberg.exceptions import CommitFailedException + +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.schema import Schema +from dlt_iceberg.destination_client import IcebergRestClient, IcebergRestConfiguration + + +def _make_client(max_retries: int = 3) -> IcebergRestClient: + schema = Schema("test_schema") + config = IcebergRestConfiguration( + catalog_uri="sqlite:///unused.db", + namespace="test_ns", + max_retries=max_retries, + retry_backoff_base=2.0, + ) + return IcebergRestClient(schema, config, DestinationCapabilitiesContext()) + + +def _empty_load_rows() -> pa.Table: + return pa.table({"load_id": pa.array([], type=pa.string())}) + + +def _existing_load_rows(load_id: str) -> pa.Table: + return pa.table({"load_id": pa.array([load_id], type=pa.string())}) + + +def test_store_completed_load_retries_transient_append_error() -> None: + client = _make_client(max_retries=3) + load_id = "load-1" + + catalog = Mock() + loads_table = Mock() + catalog.load_table.return_value = loads_table + loads_table.scan.return_value.to_arrow.side_effect = [ + _empty_load_rows(), # Initial idempotency check (attempt 1) + _empty_load_rows(), # Read-after-error ambiguity check (attempt 1) + _empty_load_rows(), # Initial idempotency check (attempt 2) + ] + loads_table.append.side_effect = [ + CommitFailedException("transient commit failure"), + None, + ] + + with ( + patch("dlt_iceberg.destination_client.schema_to_pyarrow", return_value=pa.schema([])), + patch("dlt_iceberg.destination_client.cast_table_safe", side_effect=lambda table, *_args, **_kwargs: table), + patch("dlt_iceberg.destination_client.time.sleep") as sleep_mock, + ): + client._store_completed_load(catalog, load_id) + + assert loads_table.append.call_count == 2 + sleep_mock.assert_called_once_with(1.0) + + +def test_store_completed_load_is_idempotent_by_load_id() -> None: + client = _make_client(max_retries=3) + load_id = "load-1" + + catalog = Mock() + loads_table = Mock() + catalog.load_table.return_value = loads_table + loads_table.scan.return_value.to_arrow.return_value = _existing_load_rows(load_id) + + client._store_completed_load(catalog, load_id) + + loads_table.append.assert_not_called() + + +def test_store_completed_load_handles_ambiguous_append_outcome() -> None: + client = _make_client(max_retries=3) + load_id = "load-1" + + catalog = Mock() + loads_table = Mock() + catalog.load_table.return_value = loads_table + loads_table.scan.return_value.to_arrow.side_effect = [ + _empty_load_rows(), # Initial idempotency check + _existing_load_rows(load_id), # Read-after-error ambiguity check + ] + loads_table.append.side_effect = [CommitFailedException("state unknown")] + + with ( + patch("dlt_iceberg.destination_client.schema_to_pyarrow", return_value=pa.schema([])), + patch("dlt_iceberg.destination_client.cast_table_safe", side_effect=lambda table, *_args, **_kwargs: table), + patch("dlt_iceberg.destination_client.time.sleep") as sleep_mock, + ): + client._store_completed_load(catalog, load_id) + + assert loads_table.append.call_count == 1 + sleep_mock.assert_not_called() From 00c8f5a4b839113eb68250de9bea44bee90158ed Mon Sep 17 00:00:00 2001 From: Nico Ritschel Date: Tue, 10 Feb 2026 19:04:12 -0800 Subject: [PATCH 3/3] fix: honor custom location layout for _dlt_loads --- src/dlt_iceberg/destination_client.py | 13 +++++++--- tests/test_capabilities.py | 6 +++++ tests/test_load_metadata_resilience.py | 36 +++++++++++++++++++++++++- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/src/dlt_iceberg/destination_client.py b/src/dlt_iceberg/destination_client.py index d3c12ca..421133a 100644 --- a/src/dlt_iceberg/destination_client.py +++ b/src/dlt_iceberg/destination_client.py @@ -654,10 +654,17 @@ def _get_or_create_loads_table( self.schema.get_table(loads_table_name), load_row, ) + create_kwargs = { + "identifier": identifier, + "schema": loads_schema, + "partition_spec": PartitionSpec(), + } + table_location = self._get_table_location(loads_table_name) + if table_location: + create_kwargs["location"] = table_location + catalog.create_table( - identifier=identifier, - schema=loads_schema, - partition_spec=PartitionSpec(), + **create_kwargs ) return catalog.load_table(identifier) diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index 89ebe68..a34f6ed 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -269,6 +269,12 @@ def events(): assert "test_location" in location assert "events" in location + loads_table = catalog.load_table("test_location._dlt_loads") + loads_location = loads_table.location() + assert "custom_path" in loads_location + assert "test_location" in loads_location + assert "_dlt_loads" in loads_location + finally: shutil.rmtree(temp_dir, ignore_errors=True) diff --git a/tests/test_load_metadata_resilience.py b/tests/test_load_metadata_resilience.py index 7882100..6e1d5bc 100644 --- a/tests/test_load_metadata_resilience.py +++ b/tests/test_load_metadata_resilience.py @@ -5,7 +5,7 @@ from unittest.mock import Mock, patch import pyarrow as pa -from pyiceberg.exceptions import CommitFailedException +from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.schema import Schema @@ -95,3 +95,37 @@ def test_store_completed_load_handles_ambiguous_append_outcome() -> None: assert loads_table.append.call_count == 1 sleep_mock.assert_not_called() + + +def test_get_or_create_loads_table_honors_custom_location_layout() -> None: + client = _make_client(max_retries=3) + client.config.table_location_layout = "custom_path/{namespace}/{table_name}" + client.config.warehouse = "file:///warehouse_root" + + load_row = pa.table( + { + "load_id": ["load-1"], + "schema_name": ["test_schema"], + "status": [0], + "inserted_at": [None], + "schema_version_hash": ["abc"], + } + ) + loads_table_name = client.schema.loads_table_name + identifier = f"{client.config.namespace}.{loads_table_name}" + + catalog = Mock() + created_table = Mock() + catalog.load_table.side_effect = [NoSuchTableError("missing"), created_table] + + with patch("dlt_iceberg.destination_client.convert_dlt_to_iceberg_schema", return_value=Mock()): + result = client._get_or_create_loads_table( + catalog=catalog, + identifier=identifier, + loads_table_name=loads_table_name, + load_row=load_row, + ) + + assert result is created_table + create_kwargs = catalog.create_table.call_args.kwargs + assert create_kwargs["location"] == "file:///warehouse_root/custom_path/test_ns/_dlt_loads"