Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,20 @@ The format is based on Keep a Changelog and this project follows Semantic Versio

### Added

- Initial public migration tooling baseline with CLI, resource migrators, and tests.
- Initial public release of the Braintrust migration CLI with orchestrated resource migrators and test coverage.
- Support for migrating experiments, datasets, logs, ACLs, and experiment tags between Braintrust environments.
- Opt-in group member user mapping via email matching during ACL migration.
- Pagination support for resource listing, `created_before` filtering, and release/tag-based publishing workflows.
- Opt-in live smoke and E2E validation coverage for concurrency-sensitive migration paths.

### Changed

- Refactored migration internals to use the current API surface instead of the older Braintrust API SDK.
- Improved migration resilience around temporary-directory creation, client/config plumbing, and concurrent resource orchestration.
- Hardened CI and release automation with versioned release workflow guidance and pinned GitHub Actions.

### Fixed

- Dry-run project discovery is now read-only and no longer creates destination projects.
- File output now consistently uses UTF-8 encoding.
- Restored DAG scheduler and orchestrator compatibility helpers expected by the test suite.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Braintrust Migration Tool

> **⚠️ WARNING: Large-scale migrations (especially logs/experiments) can be extremely expensive and operationally risky. This tool includes streaming + resumable migration for high-volume event streams, but TB-scale migrations have not been fully soak-tested in production-like conditions. Use with caution and test on a subset first.**
> **⚠️ WARNING: Large-scale migrations can still be expensive and operationally risky, but high-volume event resources are now streamed and resumable. Logs, experiment events, and dataset events use BTQL sorted pagination with checkpointed resume and SDK-backed `logs3` writes. TB-scale migrations have not been fully soak-tested in production-like conditions, so test on a subset first.**

A Python CLI & library for migrating Braintrust organizations with maximum fidelity, using direct HTTP requests (via `httpx`) against the Braintrust REST API.

## Overview

This tool provides migration capabilities for Braintrust organizations, handling everything from AI provider credentials to project-level data. **It is best suited for small-scale migrations, such as moving POC/test data to a new deployment.**
This tool provides migration capabilities for Braintrust organizations, handling everything from AI provider credentials to project-level data. It works well for small and medium migrations, and it now has dedicated streaming paths for high-volume logs, experiment events, and dataset events.

- **Organization administrators** migrating between environments (dev → staging → prod)
- **Teams** consolidating multiple organizations
Expand All @@ -19,7 +19,7 @@ This tool provides migration capabilities for Braintrust organizations, handling
- **Dependency Resolution**: Handles resource dependencies (e.g., functions referenced by prompts, datasets referenced by experiments)
- **Organization vs Project Scope**: Org-level resources are migrated once, project-level resources per project
- **Real-time Progress**: Live progress indicators and detailed migration reports
- **High-volume Streaming**: Logs, experiment events, and dataset events are migrated via BTQL sorted pagination (by `_pagination_key`) with bounded insert batches
- **High-volume Streaming**: Logs, experiment events, and dataset events are migrated via BTQL sorted pagination (by `_pagination_key`) with checkpointed resume and SDK-backed `logs3` writes
- **Resume + Idempotency**: Per-resource/per-experiment checkpoints + a SQLite "seen ids" store enable safe resume and help avoid duplicate inserts/overwrites
- **Rate Limit Resilience**: Automatic LIMIT backoff on 500/504 errors (retries with progressively smaller page sizes: 1000 → 500 → 250 → ...)

Expand Down Expand Up @@ -170,12 +170,12 @@ These settings control BTQL-based streaming for high-volume resources.
| Environment Variable | CLI Flag | Default | Description |
|---------------------|----------|---------|-------------|
| `MIGRATION_EVENTS_FETCH_LIMIT` | — | `1000` | BTQL fetch page size (rows per query) |
| `MIGRATION_EVENTS_INSERT_BATCH_SIZE` | — | `200` | Events per insert API call |
| `MIGRATION_EVENTS_FETCH_GROUP_SIZE` | — | `25` | Number of experiment or dataset ids to group into one BTQL event stream |
| `MIGRATION_EVENTS_USE_SEEN_DB` | — | `true` | Use SQLite store for deduplication |
| `MIGRATION_LOGS_FETCH_LIMIT` | `--logs-fetch-limit` | *(inherits)* | Override fetch limit for logs only |
| `MIGRATION_LOGS_INSERT_BATCH_SIZE` | `--logs-insert-batch-size` | *(inherits)* | Override insert batch size for logs only |`
| `MIGRATION_LOGS_INSERT_BATCH_SIZE` | `--logs-insert-batch-size` | `200` | Max rows per pre-SDK logs insert chunk before enqueueing to the SDK writer |

Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT`, `MIGRATION_{RESOURCE}_INSERT_BATCH_SIZE`, `MIGRATION_{RESOURCE}_USE_SEEN_DB` where `{RESOURCE}` is `LOGS`, `EXPERIMENT_EVENTS`, or `DATASET_EVENTS`.
Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT` and `MIGRATION_{RESOURCE}_USE_SEEN_DB` where `{RESOURCE}` is `LOGS`, `EXPERIMENT_EVENTS`, or `DATASET_EVENTS`. Logs additionally support `MIGRATION_LOGS_INSERT_BATCH_SIZE` and `MIGRATION_LOGS_USE_VERSION_SNAPSHOT`.

#### Insert Request Sizing

Expand Down
2 changes: 2 additions & 0 deletions RELEASING.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ git push origin vX.Y.Z
- publish to PyPI
- create a GitHub Release

Create the tag only after the release commit is on `main`, so the tag points at an immutable revision whose `pyproject.toml` version and `CHANGELOG.md` entry already match `X.Y.Z`.

## Hotfixes

1. Branch from latest release tag: `hotfix/X.Y.Z+1`
Expand Down
10 changes: 4 additions & 6 deletions braintrust_migrate/batching.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Helpers for batching inserts by both count and approximate payload size."""
"""Helpers for batching inserts by both count and payload size."""

from __future__ import annotations

Expand All @@ -7,11 +7,10 @@
from typing import Any


# We deliberately use the *non-compact* json encoding here to slightly
# over-estimate payload sizes vs compact separators. This works well with a
# headroom ratio to avoid flirting with gateway limits.
# Measure the actual UTF-8 payload bytes that will be sent over the wire.
# This keeps batching aligned with gateway limits even for non-ASCII content.
def approx_json_bytes(obj: Any) -> int:
return len(json.dumps(obj, ensure_ascii=False))
return len(json.dumps(obj, ensure_ascii=False).encode("utf-8"))


_EMPTY_EVENTS_WRAPPER_BYTES = approx_json_bytes({"events": []})
Expand All @@ -25,7 +24,6 @@ def approx_events_insert_payload_bytes(
"""Approximate the JSON payload bytes for `{"events": events}`.

This uses a fast additive estimate: wrapper overhead + sum(event bytes) + commas.
It intentionally overestimates in common cases.
"""
if not events:
return _EMPTY_EVENTS_WRAPPER_BYTES
Expand Down
11 changes: 11 additions & 0 deletions braintrust_migrate/btql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from __future__ import annotations

from collections.abc import Callable
from importlib.metadata import PackageNotFoundError, version
from typing import Any, cast

import httpx
Expand All @@ -19,6 +20,13 @@
from braintrust_migrate.client import BraintrustClient


def _client_version() -> str:
try:
return version("braintrust-migrate")
except PackageNotFoundError:
return "dev"


def btql_quote(s: str) -> str:
"""Escape a string for inclusion in a single-quoted BTQL/SQL literal."""
return s.replace("\\", "\\\\").replace("'", "\\'")
Expand Down Expand Up @@ -98,9 +106,12 @@ async def _do_btql(*, op: str, query_text: str) -> Any:
# All modern deployments are Brainstore-backed; do not attempt
# a Postgres fallback.
"use_brainstore": True,
"client_version": _client_version(),
"query_timeout_seconds": int(timeout_seconds),
},
timeout=timeout_seconds,
),
non_retryable_statuses={500, 504},
)

async def _fetch_one_limit(n: int) -> Any:
Expand Down
17 changes: 14 additions & 3 deletions braintrust_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,8 @@ def hook(update: dict[str, Any], *, _label: str = label) -> None:
inserted_last = update.get("inserted_last")
inserted_bytes_last = update.get("inserted_bytes_last")
insert_seconds = update.get("insert_seconds")
pending_buffered_rows = update.get("pending_buffered_rows")
pending_buffered_bytes = update.get("pending_buffered_bytes")

gb_part = ""
if isinstance(inserted_bytes, int):
Expand Down Expand Up @@ -844,25 +846,34 @@ def hook(update: dict[str, Any], *, _label: str = label) -> None:

# Per-resource context
page_part = f" page={page_num}" if page_num is not None else ""
pending_part = ""
if (
isinstance(pending_buffered_rows, int)
and pending_buffered_rows > 0
):
pending_part = f" buffered={pending_buffered_rows}"
if isinstance(pending_buffered_bytes, int):
pending_gb = pending_buffered_bytes / 1_000_000_000
pending_part += f" pending_gb={pending_gb:.3f}"

if update.get("resource") == "experiment_events":
desc = (
f"{_label} ({_project_name}):{page_part}"
f" fetched={fetched} inserted={inserted}"
f"{gb_part}{batch_rate_part}"
f"{gb_part}{pending_part}{batch_rate_part}"
)
elif update.get("resource") == "dataset_events":
desc = (
f"{_label} ({_project_name}):{page_part}"
f" fetched={fetched} inserted={inserted}"
f"{gb_part}{batch_rate_part}"
f"{gb_part}{pending_part}{batch_rate_part}"
)
else:
# logs
desc = (
f"{_label} ({_project_name}):{page_part}"
f" fetched={fetched} inserted={inserted}"
f"{gb_part}{batch_rate_part}"
f"{gb_part}{pending_part}{batch_rate_part}"
)

if phase == "done":
Expand Down
13 changes: 12 additions & 1 deletion braintrust_migrate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,13 @@ async def check_brainstore_enabled(self) -> bool:
self._logger.warning("Could not determine Brainstore status", error=str(e))
return False

async def with_retry(self, operation_name: str, coro_func):
async def with_retry(
self,
operation_name: str,
coro_func,
*,
non_retryable_statuses: set[int] | None = None,
):
"""Execute a coroutine function with adaptive retry logic.

Args:
Expand Down Expand Up @@ -457,6 +463,11 @@ def _classify_exception(
if isinstance(exc, httpx.HTTPStatusError) and exc.response is not None:
status = int(exc.response.status_code)
retry_after = _parse_retry_after_seconds(exc.response)
if (
non_retryable_statuses is not None
and status in non_retryable_statuses
):
return False, status, retry_after
if status == HTTP_STATUS_TOO_MANY_REQUESTS:
return True, status, retry_after
if status in {408, 409, 425, 500, 502, 503, 504}:
Expand Down
74 changes: 20 additions & 54 deletions braintrust_migrate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,19 @@ class MigrationConfig(BaseModel):
le=10_000,
description="Fetch page size for streaming experiment events via BTQL (limit is in rows/spans)",
)
experiment_events_insert_batch_size: int = Field(
default=200,
ge=1,
le=10_000,
description="Insert batch size for streaming experiment events (number of events per insert call)",
)
experiment_events_use_version_snapshot: bool = Field(
default=True,
description="Pin a stable snapshot version for streaming experiment event migration",
)
experiment_events_use_seen_db: bool = Field(
default=True,
description="Use a SQLite seen-id store to prevent older versions overwriting newer ones during experiment pagination",
)
events_fetch_group_size: int = Field(
default=25,
ge=1,
le=1_000,
description=(
"Number of dataset or experiment ids to group into a single BTQL event stream. "
"Used for grouped dataset/experiment event fetches."
),
)

# Dataset event migration tuning
dataset_events_fetch_limit: int = Field(
Expand All @@ -208,16 +207,6 @@ class MigrationConfig(BaseModel):
le=10_000,
description="Fetch page size for streaming dataset events via BTQL (limit is in rows/spans)",
)
dataset_events_insert_batch_size: int = Field(
default=200,
ge=1,
le=10_000,
description="Insert batch size for streaming dataset events (number of events per insert call)",
)
dataset_events_use_version_snapshot: bool = Field(
default=True,
description="Pin a stable snapshot version for streaming dataset event migration",
)
dataset_events_use_seen_db: bool = Field(
default=True,
description="Use a SQLite seen-id store to prevent older versions overwriting newer ones during dataset pagination",
Expand Down Expand Up @@ -379,8 +368,8 @@ def from_env(cls) -> "Config":
#
# Unified:
# MIGRATION_EVENTS_FETCH_LIMIT=1000
# MIGRATION_EVENTS_INSERT_BATCH_SIZE=200
# MIGRATION_EVENTS_USE_SEEN_DB=true
# MIGRATION_EVENTS_FETCH_GROUP_SIZE=25
#
# Resource-specific overrides (optional):
# MIGRATION_LOGS_FETCH_LIMIT, MIGRATION_EXPERIMENT_EVENTS_FETCH_LIMIT, etc.
Expand All @@ -392,20 +381,20 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
val = os.getenv(specific_key) or os.getenv(unified_key, default)
return val.lower() in {"1", "true", "yes", "y", "on"}

events_fetch_group_size = int(
os.getenv("MIGRATION_EVENTS_FETCH_GROUP_SIZE", "25")
)

# Logs
logs_fetch_limit = _get_int(
"MIGRATION_LOGS_FETCH_LIMIT", "MIGRATION_EVENTS_FETCH_LIMIT", "1000"
)
logs_insert_batch_size = _get_int(
"MIGRATION_LOGS_INSERT_BATCH_SIZE",
"MIGRATION_EVENTS_INSERT_BATCH_SIZE",
"200",
)
logs_use_version_snapshot = _get_bool(
"MIGRATION_LOGS_USE_VERSION_SNAPSHOT",
"MIGRATION_EVENTS_USE_VERSION_SNAPSHOT",
"true",
logs_insert_batch_size = int(
os.getenv("MIGRATION_LOGS_INSERT_BATCH_SIZE", "200")
)
logs_use_version_snapshot = os.getenv(
"MIGRATION_LOGS_USE_VERSION_SNAPSHOT", "true"
).lower() in {"1", "true", "yes", "y", "on"}
logs_use_seen_db = _get_bool(
"MIGRATION_LOGS_USE_SEEN_DB", "MIGRATION_EVENTS_USE_SEEN_DB", "true"
)
Expand All @@ -420,16 +409,6 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
"MIGRATION_EVENTS_FETCH_LIMIT",
"1000",
)
experiment_events_insert_batch_size = _get_int(
"MIGRATION_EXPERIMENT_EVENTS_INSERT_BATCH_SIZE",
"MIGRATION_EVENTS_INSERT_BATCH_SIZE",
"200",
)
experiment_events_use_version_snapshot = _get_bool(
"MIGRATION_EXPERIMENT_EVENTS_USE_VERSION_SNAPSHOT",
"MIGRATION_EVENTS_USE_VERSION_SNAPSHOT",
"true",
)
experiment_events_use_seen_db = _get_bool(
"MIGRATION_EXPERIMENT_EVENTS_USE_SEEN_DB",
"MIGRATION_EVENTS_USE_SEEN_DB",
Expand All @@ -442,16 +421,6 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
"MIGRATION_EVENTS_FETCH_LIMIT",
"1000",
)
dataset_events_insert_batch_size = _get_int(
"MIGRATION_DATASET_EVENTS_INSERT_BATCH_SIZE",
"MIGRATION_EVENTS_INSERT_BATCH_SIZE",
"200",
)
dataset_events_use_version_snapshot = _get_bool(
"MIGRATION_DATASET_EVENTS_USE_VERSION_SNAPSHOT",
"MIGRATION_EVENTS_USE_VERSION_SNAPSHOT",
"true",
)
dataset_events_use_seen_db = _get_bool(
"MIGRATION_DATASET_EVENTS_USE_SEEN_DB",
"MIGRATION_EVENTS_USE_SEEN_DB",
Expand Down Expand Up @@ -536,12 +505,9 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
created_after=created_after,
created_before=created_before,
experiment_events_fetch_limit=experiment_events_fetch_limit,
experiment_events_insert_batch_size=experiment_events_insert_batch_size,
experiment_events_use_version_snapshot=experiment_events_use_version_snapshot,
experiment_events_use_seen_db=experiment_events_use_seen_db,
events_fetch_group_size=events_fetch_group_size,
dataset_events_fetch_limit=dataset_events_fetch_limit,
dataset_events_insert_batch_size=dataset_events_insert_batch_size,
dataset_events_use_version_snapshot=dataset_events_use_version_snapshot,
dataset_events_use_seen_db=dataset_events_use_seen_db,
copy_attachments=copy_attachments,
attachment_max_bytes=attachment_max_bytes,
Expand Down
54 changes: 0 additions & 54 deletions braintrust_migrate/insert_bisect.py

This file was deleted.

Loading
Loading