Skip to content

Schema evolution error for _dlt_pipeline_state table #9

@mbarbaric-work

Description

@mbarbaric-work

Hello, I'm trying to load data to Apache Iceberg tables using Lakekeeper as catalog. I'm getting this error constantly:

<class 'dlt_iceberg.schema_evolution.SchemaEvolutionError'>
Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening).

Not sure if I configured something wrong or is this a bug since the mentioned table is auto created by dlt.

import dlt
from dlt.sources.sql_database import sql_database
from dlt_iceberg import iceberg_rest


iceberg_destination = iceberg_rest(
    catalog_uri="https://my-lakekeeper/catalog",
    warehouse='minio-aos',
    namespace="iceberg-db",
    credential="credentials:credentials",
    oauth2_server_uri="https:/url/protocol/openid-connect/token",
    scope="lakekeeper",
    strict_casting=False,
)


def run_replace_load():
    source = sql_database(
        credentials=dlt.secrets["sources.postgres.credentials"],
        schema="mbarbaric_dev",
        table_names=["bd_poc_tbl_tran_bin_map_hist", "dlt_big_table"],
    )

    pipeline = dlt.pipeline(
        pipeline_name="postgres_to_iceberg_replace",
        destination=iceberg_destination,
        dataset_name="test_replace",
    )

    load_info = pipeline.run(
        source,
        write_disposition="replace",
    )

    print(f"Load completed!")
    print(f"Load info: {load_info}")
    print(f"Loaded packages: {load_info.load_packages}")

    return load_info


if __name__ == "__main__":
    run_replace_load()

(venv) mbarbaric@mbarbaric dlt_test % ./venv/bin/python test_full_load_replace.py
Non-retryable error encountered: Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening). (error_type=SchemaEvolutionError, error_category=unknown_error, retryable=False, operation=commit files to iceberg-db._dlt_pipeline_state, table_name=_dlt_pipeline_state)
Stack trace:
Traceback (most recent call last):
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/destination_client.py", line 794, in _commit_table_files
    schema_evolved = evolve_schema_if_needed(
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/schema_evolution.py", line 256, in evolve_schema_if_needed
    validate_schema_changes(added_fields, type_changes, dropped_fields, allow_column_drops)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/schema_evolution.py", line 164, in validate_schema_changes
    raise SchemaEvolutionError(
dlt_iceberg.schema_evolution.SchemaEvolutionError: Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening).

Failed to commit files for table iceberg-db._dlt_pipeline_state: Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening).
Traceback (most recent call last):
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/destination_client.py", line 546, in complete_load
    self._commit_table_files(
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/destination_client.py", line 794, in _commit_table_files
    schema_evolved = evolve_schema_if_needed(
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/schema_evolution.py", line 256, in evolve_schema_if_needed
    validate_schema_changes(added_fields, type_changes, dropped_fields, allow_column_drops)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/schema_evolution.py", line 164, in validate_schema_changes
    raise SchemaEvolutionError(
dlt_iceberg.schema_evolution.SchemaEvolutionError: Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening).
Traceback (most recent call last):
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 616, in load
    runner.run_pool(load_step.config, load_step)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/common/runners/pool_runner.py", line 208, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/common/runners/pool_runner.py", line 201, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/load/load.py", line 696, in run
    self.load_single_package(load_id, schema)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/load/load.py", line 659, in load_single_package
    self.complete_package(load_id, schema, aborted=False)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/load/load.py", line 512, in complete_package
    job_client.complete_load(load_id)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/destination_client.py", line 546, in complete_load
    self._commit_table_files(
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/destination_client.py", line 794, in _commit_table_files
    schema_evolved = evolve_schema_if_needed(
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/schema_evolution.py", line 256, in evolve_schema_if_needed
    validate_schema_changes(added_fields, type_changes, dropped_fields, allow_column_drops)
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt_iceberg/schema_evolution.py", line 164, in validate_schema_changes
    raise SchemaEvolutionError(
dlt_iceberg.schema_evolution.SchemaEvolutionError: Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening).

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/test_full_load_replace.py", line 50, in <module>
    run_replace_load()
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/test_full_load_replace.py", line 37, in run_replace_load
    load_info = pipeline.run(
                ^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 224, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 272, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 761, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 224, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 164, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 272, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbarbaric/Documents/Interni projekti/LakekeeperUC/dlt_test/venv/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 626, in load
    raise PipelineStepFailed(self, "load", err_load_id, l_ex, step_info) from l_ex
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at `step=load` when processing package with `load_id=1769520236.331732` with exception:

<class 'dlt_iceberg.schema_evolution.SchemaEvolutionError'>
Schema evolution validation failed:
  - Unsafe type change for column 'created_at': timestamp → timestamptz. Only safe promotions are allowed (int→long, float→double, decimal widening).

Pending packages are left in the pipeline and will be re-tried on the next pipeline run. If you pass new data to extract to next run, it will be ignored. Run `dlt pipeline postgres_to_iceberg_replace info` for more information or `dlt pipeline postgres_to_iceberg_replace drop-pending-packages` to drop pending packages.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions