Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 148 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ A [dlt](https://dlthub.com/) destination for [Apache Iceberg](https://iceberg.ap

- **Atomic Multi-File Commits**: Multiple parquet files committed as single Iceberg snapshot per table
- **REST Catalog Support**: Works with Nessie, Polaris, AWS Glue, Unity Catalog
- **Partitioning**: Full support for Iceberg partition transforms (temporal, bucket, truncate, identity)
- **Authentication**: OAuth2, Bearer token, AWS SigV4
- **Write Dispositions**: Append, replace, merge (upsert)
- **Partitioning**: Full support for Iceberg partition transforms via `iceberg_adapter()`
- **Merge Strategies**: Delete-insert and upsert with hard delete support
- **DuckDB Integration**: Query loaded data via `pipeline.dataset()`
- **Schema Evolution**: Automatic schema updates when adding columns
- **Retry Logic**: Exponential backoff for transient failures
- **Authentication**: OAuth2, Bearer token, AWS SigV4

## Installation

Expand Down Expand Up @@ -53,6 +53,22 @@ pipeline = dlt.pipeline(
pipeline.run(generate_events())
```

### Query Loaded Data

```python
# Query data via DuckDB
dataset = pipeline.dataset()

# Access as dataframe
df = dataset["events"].df()

# Run SQL queries
result = dataset.query("SELECT * FROM events WHERE value > 50").fetchall()

# Get Arrow table
arrow_table = dataset["events"].arrow()
```

### Merge/Upsert

```python
Expand Down Expand Up @@ -97,10 +113,17 @@ iceberg_rest(
retry_backoff_base=2.0, # Exponential backoff multiplier
merge_batch_size=500000, # Rows per batch for merge operations
strict_casting=False, # Fail on potential data loss

# Table management
table_location_layout=None, # Custom table location pattern
register_new_tables=False, # Register tables found in storage
hard_delete_column="_dlt_deleted_at", # Column for hard deletes
)
```

### Nessie (Docker)
### Catalog Examples

#### Nessie (Docker)

```python
iceberg_rest(
Expand All @@ -115,7 +138,17 @@ iceberg_rest(

Start services: `docker compose up -d`

### AWS Glue
#### Local SQLite Catalog

```python
iceberg_rest(
catalog_uri="sqlite:///catalog.db",
warehouse="file:///path/to/warehouse",
namespace="my_namespace",
)
```

#### AWS Glue

```python
iceberg_rest(
Expand All @@ -129,7 +162,7 @@ iceberg_rest(

AWS credentials via environment variables.

### Polaris
#### Polaris

```python
iceberg_rest(
Expand All @@ -141,7 +174,7 @@ iceberg_rest(
)
```

### Unity Catalog
#### Unity Catalog

```python
iceberg_rest(
Expand All @@ -154,7 +187,59 @@ iceberg_rest(

## Partitioning

Mark columns for partitioning using dlt column hints:
### Using iceberg_adapter (Recommended)

The `iceberg_adapter` function provides a clean API for configuring Iceberg partitioning:

```python
from dlt_iceberg import iceberg_adapter, iceberg_partition

@dlt.resource(name="events")
def events():
yield {"event_date": "2024-01-01", "user_id": 123, "region": "US"}

# Single partition
adapted = iceberg_adapter(events, partition="region")

# Multiple partitions with transforms
adapted = iceberg_adapter(
events,
partition=[
iceberg_partition.month("event_date"),
iceberg_partition.bucket(10, "user_id"),
"region", # identity partition
]
)

pipeline.run(adapted)
```

### Partition Transforms

```python
# Temporal transforms (for timestamp/date columns)
iceberg_partition.year("created_at")
iceberg_partition.month("created_at")
iceberg_partition.day("created_at")
iceberg_partition.hour("created_at")

# Identity (no transformation)
iceberg_partition.identity("region")

# Bucket (hash into N buckets)
iceberg_partition.bucket(10, "user_id")

# Truncate (truncate to width)
iceberg_partition.truncate(4, "email")

# Custom partition field names
iceberg_partition.month("created_at", "event_month")
iceberg_partition.bucket(8, "user_id", "user_bucket")
```

### Using Column Hints

You can also use dlt column hints for partitioning:

```python
@dlt.resource(
Expand All @@ -163,36 +248,19 @@ Mark columns for partitioning using dlt column hints:
"event_date": {
"data_type": "date",
"partition": True,
"partition_transform": "day", # Optional: year, month, day, hour
},
"region": {
"data_type": "text",
"partition": True, # Uses identity transform for strings
"partition_transform": "day",
},
"user_id": {
"data_type": "bigint",
"partition": True,
"partition_transform": "bucket[10]", # Hash into 10 buckets
"partition_transform": "bucket[10]",
}
}
)
def events():
...
```

### Available Transforms

- **Temporal**: `year`, `month`, `day`, `hour` (for timestamp/date columns)
- **Identity**: No transformation (default for string/integer)
- **Bucket**: `bucket[N]` - Hash-based partitioning into N buckets
- **Truncate**: `truncate[N]` - Truncate strings/integers to N width

### Default Behavior

If `partition_transform` is not specified:
- Timestamp/date columns default to `month`
- String/integer columns default to `identity`

## Write Dispositions

### Append
Expand All @@ -208,11 +276,39 @@ write_disposition="replace"
Truncates table and inserts new data.

### Merge

#### Delete-Insert Strategy (Default)
```python
write_disposition="merge"
primary_key="user_id"
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "delete-insert"},
primary_key="user_id"
)
```
Deletes matching rows then inserts new data. Single atomic transaction.

#### Upsert Strategy
```python
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "upsert"},
primary_key="user_id"
)
```
Updates existing rows, inserts new rows.

#### Hard Deletes

Mark rows for deletion by setting the `_dlt_deleted_at` column:

```python
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "delete-insert"},
primary_key="user_id"
)
def users_with_deletes():
from datetime import datetime
yield {"user_id": 1, "name": "alice", "_dlt_deleted_at": None} # Keep
yield {"user_id": 2, "name": "bob", "_dlt_deleted_at": datetime.now()} # Delete
```
Updates existing rows by primary key, inserts new rows.

## Development

Expand All @@ -225,37 +321,40 @@ docker compose up -d
# Run all tests
uv run pytest tests/ -v

# Run only unit tests
uv run pytest tests/ -v -m "not integration"
# Run only unit tests (no Docker required)
uv run pytest tests/ --ignore=tests/nessie -v

# Run only integration tests
uv run pytest tests/ -v -m integration
# Run Nessie integration tests
uv run pytest tests/nessie/ -v
```

### Project Structure

```
dlt-iceberg/
├── src/dlt_iceberg/
│ ├── __init__.py # Public API
│ ├── destination_client.py # Class-based destination (atomic commits)
│ ├── destination.py # Function-based destination (legacy)
│ ├── schema_converter.py # dlt → Iceberg schema conversion
│ ├── schema_casting.py # Arrow table casting
│ ├── schema_evolution.py # Schema updates
│ ├── partition_builder.py # Partition specs
│ └── error_handling.py # Retry logic
│ ├── __init__.py # Public API
│ ├── destination_client.py # Class-based destination (atomic commits)
│ ├── destination.py # Function-based destination (legacy)
│ ├── adapter.py # iceberg_adapter() for partitioning
│ ├── sql_client.py # DuckDB integration for dataset()
│ ├── schema_converter.py # dlt → Iceberg schema conversion
│ ├── schema_casting.py # Arrow table casting
│ ├── schema_evolution.py # Schema updates
│ ├── partition_builder.py # Partition specs
│ └── error_handling.py # Retry logic
├── tests/
│ ├── test_destination_rest_catalog.py # Integration tests (Docker)
│ ├── test_class_based_atomic.py # Atomic commit tests
│ ├── test_adapter.py # iceberg_adapter tests
│ ├── test_capabilities.py # Hard delete, partition names tests
│ ├── test_dataset.py # DuckDB integration tests
│ ├── test_merge_disposition.py
│ ├── test_schema_evolution.py
│ └── ...
├── examples/
│ ├── incremental_load.py # CSV incremental loading
│ ├── merge_load.py # CSV merge/upsert
│ └── data/ # Sample CSV files
└── docker-compose.yml # Nessie + MinIO for testing
│ ├── incremental_load.py # CSV incremental loading
│ ├── merge_load.py # CSV merge/upsert
│ └── data/ # Sample CSV files
└── docker-compose.yml # Nessie + MinIO for testing
```

## How It Works
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ classifiers = [
dependencies = [
"boto3>=1.40.50",
"dlt>=1.17.1",
"duckdb>=1.4.3",
"pandas>=2.3.3",
"pyarrow>=21.0.0",
"pydantic<2.11",
Expand Down
Loading