Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,12 @@ These settings control BTQL-based streaming for high-volume resources.
|---------------------|----------|---------|-------------|
| `MIGRATION_EVENTS_FETCH_LIMIT` | — | `1000` | BTQL fetch page size (rows per query) |
| `MIGRATION_EVENTS_FETCH_GROUP_SIZE` | — | `25` | Number of experiment or dataset ids to group into one BTQL event stream |
| `MIGRATION_EVENTS_FLUSH_MAX_ROWS` | — | `5000` | Buffered flush threshold shared by logs, experiment events, and dataset events |
| `MIGRATION_EVENTS_USE_SEEN_DB` | — | `true` | Use SQLite store for deduplication |
| `MIGRATION_LOGS_FETCH_LIMIT` | `--logs-fetch-limit` | *(inherits)* | Override fetch limit for logs only |
| `MIGRATION_LOGS_INSERT_BATCH_SIZE` | `--logs-insert-batch-size` | `200` | Max rows per pre-SDK logs insert chunk before enqueueing to the SDK writer |
| `MIGRATION_LOGS_INSERT_BATCH_SIZE` | `--logs-insert-batch-size` | `5000` | Deprecated alias for `MIGRATION_EVENTS_FLUSH_MAX_ROWS` |

Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT` and `MIGRATION_{RESOURCE}_USE_SEEN_DB` where `{RESOURCE}` is `LOGS`, `EXPERIMENT_EVENTS`, or `DATASET_EVENTS`. Logs additionally support `MIGRATION_LOGS_INSERT_BATCH_SIZE` and `MIGRATION_LOGS_USE_VERSION_SNAPSHOT`.
Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT` and `MIGRATION_{RESOURCE}_USE_SEEN_DB` where `{RESOURCE}` is `LOGS`, `EXPERIMENT_EVENTS`, or `DATASET_EVENTS`. Logs additionally support `MIGRATION_LOGS_USE_VERSION_SNAPSHOT`. The older `MIGRATION_LOGS_INSERT_BATCH_SIZE` name is still accepted as a compatibility alias for the shared flush threshold.

#### Insert Request Sizing

Expand Down
8 changes: 7 additions & 1 deletion braintrust_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ def migrate(
int | None,
typer.Option(
"--logs-insert-batch-size",
help="Insert batch size for streaming logs (number of events per insert call)",
help=(
"Deprecated alias for the shared streaming flush threshold "
"(rows buffered before flush across logs, experiment events, "
"and dataset events)"
),
envvar="MIGRATION_LOGS_INSERT_BATCH_SIZE",
),
] = None,
Expand Down Expand Up @@ -334,6 +338,7 @@ async def _migrate_main(
if logs_fetch_limit is not None:
config.migration.logs_fetch_limit = logs_fetch_limit
if logs_insert_batch_size is not None:
config.migration.events_flush_max_rows = logs_insert_batch_size
config.migration.logs_insert_batch_size = logs_insert_batch_size
if created_after is not None:
config.migration.created_after = canonicalize_created_after(created_after)
Expand All @@ -357,6 +362,7 @@ async def _migrate_main(
state_dir=str(config.state_dir),
dry_run=dry_run,
logs_fetch_limit=config.migration.logs_fetch_limit,
events_flush_max_rows=config.migration.events_flush_max_rows,
logs_insert_batch_size=config.migration.logs_insert_batch_size,
created_after=config.migration.created_after,
created_before=config.migration.created_before,
Expand Down
29 changes: 23 additions & 6 deletions braintrust_migrate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,24 @@ class MigrationConfig(BaseModel):
le=1_000,
description="Fetch page size for streaming logs migration via BTQL (limit is in rows/spans)",
)
events_flush_max_rows: int = Field(
default=5_000,
ge=1,
le=50_000,
description=(
"Buffered flush threshold for streaming logs, experiment events, and "
"dataset events. Rows may be fetched across multiple BTQL pages before "
"one flush is committed."
),
)
logs_insert_batch_size: int = Field(
default=200,
default=5_000,
ge=1,
le=1_000,
description="Insert batch size for streaming logs migration (number of events per insert call)",
le=50_000,
description=(
"Deprecated compatibility alias for events_flush_max_rows. Prefer "
"MIGRATION_EVENTS_FLUSH_MAX_ROWS."
),
)
logs_use_version_snapshot: bool = Field(
default=True,
Expand Down Expand Up @@ -370,6 +383,7 @@ def from_env(cls) -> "Config":
# MIGRATION_EVENTS_FETCH_LIMIT=1000
# MIGRATION_EVENTS_USE_SEEN_DB=true
# MIGRATION_EVENTS_FETCH_GROUP_SIZE=25
# MIGRATION_EVENTS_FLUSH_MAX_ROWS=5000
#
# Resource-specific overrides (optional):
# MIGRATION_LOGS_FETCH_LIMIT, MIGRATION_EXPERIMENT_EVENTS_FETCH_LIMIT, etc.
Expand All @@ -384,14 +398,16 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
events_fetch_group_size = int(
os.getenv("MIGRATION_EVENTS_FETCH_GROUP_SIZE", "25")
)
events_flush_max_rows = int(
os.getenv("MIGRATION_EVENTS_FLUSH_MAX_ROWS")
or os.getenv("MIGRATION_LOGS_INSERT_BATCH_SIZE", "5000")
)

# Logs
logs_fetch_limit = _get_int(
"MIGRATION_LOGS_FETCH_LIMIT", "MIGRATION_EVENTS_FETCH_LIMIT", "1000"
)
logs_insert_batch_size = int(
os.getenv("MIGRATION_LOGS_INSERT_BATCH_SIZE", "200")
)
logs_insert_batch_size = events_flush_max_rows
logs_use_version_snapshot = os.getenv(
"MIGRATION_LOGS_USE_VERSION_SNAPSHOT", "true"
).lower() in {"1", "true", "yes", "y", "on"}
Expand Down Expand Up @@ -499,6 +515,7 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
insert_max_request_bytes=insert_max_request_bytes,
insert_request_headroom_ratio=insert_request_headroom_ratio,
logs_fetch_limit=logs_fetch_limit,
events_flush_max_rows=events_flush_max_rows,
logs_insert_batch_size=logs_insert_batch_size,
logs_use_version_snapshot=logs_use_version_snapshot,
logs_use_seen_db=logs_use_seen_db,
Expand Down
54 changes: 37 additions & 17 deletions braintrust_migrate/resources/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
EventsStreamState,
SeenIdsDB,
build_btql_sorted_page_query,
coerce_int_config,
stream_btql_sorted_events_buffered,
)

Expand All @@ -30,21 +31,6 @@
# HTTP status codes
HTTP_STATUS_REQUEST_ENTITY_TOO_LARGE = 413


def _coerce_int_config(
cfg: Any, attr_name: str, default: int, *, minimum: int | None = None
) -> int:
value = getattr(cfg, attr_name, default)
if not isinstance(value, int):
try:
value = int(value)
except Exception:
value = default
if minimum is not None and value < minimum:
return default
return value


class DatasetMigrator(ResourceMigrator[dict]):
"""Migrator for Braintrust datasets.

Expand Down Expand Up @@ -102,9 +88,14 @@ def __init__(
self._insert_max_bytes: int | None = int(max_req * headroom)
except Exception:
self._insert_max_bytes = None
self._sdk_flush_max_rows = int(self.SDK_FLUSH_MAX_ROWS)
self._sdk_flush_max_rows = coerce_int_config(
cfg,
"events_flush_max_rows",
self.SDK_FLUSH_MAX_ROWS,
minimum=1,
)
self._sdk_flush_max_bytes = int(self.SDK_FLUSH_MAX_BYTES)
self._event_fetch_group_size = _coerce_int_config(
self._event_fetch_group_size = coerce_int_config(
cfg,
"events_fetch_group_size",
self.DEFAULT_EVENT_FETCH_GROUP_SIZE,
Expand Down Expand Up @@ -803,6 +794,35 @@ async def _fetch_group(n: int) -> dict[str, Any]:
"next_cursor": None,
}
),
"on_insert": lambda info, _p=progress: _p(
{
"resource": "dataset_events",
"phase": "insert",
"source_dataset_ids": source_dataset_ids,
"dest_dataset_ids": list(source_to_dest_dataset_ids.values()),
"page_num": None,
"page_events": None,
"inserted_last": info.get("inserted_last"),
"inserted_bytes_last": info.get("inserted_bytes_last"),
"insert_seconds": info.get("insert_seconds"),
"flush_rows": info.get("flush_rows"),
"flush_buffer_bytes": info.get("flush_buffer_bytes"),
"fetched_total": state.fetched_events,
"inserted_total": state.inserted_events,
"inserted_bytes_total": state.inserted_bytes,
"skipped_deleted_total": state.skipped_deleted,
"skipped_seen_total": state.skipped_seen,
"attachments_copied_total": state.attachments_copied,
"pending_buffered_rows": 0,
"pending_buffered_bytes": 0,
"cursor": (
(state.btql_min_pagination_key[:16] + "…")
if isinstance(state.btql_min_pagination_key, str)
else None
),
"next_cursor": None,
}
),
"on_done": lambda info, _p=progress: _p(
{
"resource": "dataset_events",
Expand Down
62 changes: 44 additions & 18 deletions braintrust_migrate/resources/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,13 @@
EventsStreamState,
SeenIdsDB,
build_btql_sorted_page_query,
coerce_int_config,
stream_btql_sorted_events_buffered,
)

# HTTP status codes
HTTP_STATUS_REQUEST_ENTITY_TOO_LARGE = 413


def _coerce_int_config(
cfg: Any, attr_name: str, default: int, *, minimum: int | None = None
) -> int:
value = getattr(cfg, attr_name, default)
if not isinstance(value, int):
try:
value = int(value)
except Exception:
value = default
if minimum is not None and value < minimum:
return default
return value


class ExperimentMigrator(ResourceMigrator[dict]):
"""Migrator for Braintrust experiments.

Expand All @@ -54,7 +40,7 @@ class ExperimentMigrator(ResourceMigrator[dict]):
Uses raw API requests instead of SDK to avoid model dependencies.
"""

SDK_FLUSH_MAX_ROWS: ClassVar[int] = 1_000
SDK_FLUSH_MAX_ROWS: ClassVar[int] = 5_000
SDK_FLUSH_MAX_BYTES: ClassVar[int] = 25 * 1024 * 1024
DEFAULT_EVENT_FETCH_GROUP_SIZE: ClassVar[int] = 25

Expand Down Expand Up @@ -99,9 +85,14 @@ def __init__(
self._insert_max_bytes: int | None = int(max_req * headroom)
except Exception:
self._insert_max_bytes = None
self._sdk_flush_max_rows = int(self.SDK_FLUSH_MAX_ROWS)
self._sdk_flush_max_rows = coerce_int_config(
cfg,
"events_flush_max_rows",
self.SDK_FLUSH_MAX_ROWS,
minimum=1,
)
self._sdk_flush_max_bytes = int(self.SDK_FLUSH_MAX_BYTES)
self._event_fetch_group_size = _coerce_int_config(
self._event_fetch_group_size = coerce_int_config(
cfg,
"events_fetch_group_size",
self.DEFAULT_EVENT_FETCH_GROUP_SIZE,
Expand Down Expand Up @@ -887,6 +878,41 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None:
"next_cursor": None,
}
),
"on_insert": lambda info, _p=progress: _p(
{
"resource": "experiment_events",
"phase": "insert",
"source_experiment_ids": source_experiment_ids,
"dest_experiment_ids": list(
source_to_dest_experiment_ids.values()
),
"page_num": None,
"page_events": None,
"inserted_last": info.get("inserted_last"),
"inserted_bytes_last": info.get(
"inserted_bytes_last"
),
"insert_seconds": info.get("insert_seconds"),
"flush_rows": info.get("flush_rows"),
"flush_buffer_bytes": info.get(
"flush_buffer_bytes"
),
"fetched_total": state.fetched_events,
"inserted_total": state.inserted_events,
"inserted_bytes_total": state.inserted_bytes,
"skipped_deleted_total": state.skipped_deleted,
"skipped_seen_total": state.skipped_seen,
"attachments_copied_total": state.attachments_copied,
"pending_buffered_rows": 0,
"pending_buffered_bytes": 0,
"cursor": (
(state.btql_min_pagination_key[:16] + "…")
if isinstance(state.btql_min_pagination_key, str)
else None
),
"next_cursor": None,
}
),
"on_done": lambda info, _p=progress: _p(
{
"resource": "experiment_events",
Expand Down
15 changes: 10 additions & 5 deletions braintrust_migrate/resources/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
from braintrust_migrate.streaming_utils import (
SeenIdsDB,
build_btql_sorted_page_query,
coerce_int_config,
)

logger = structlog.get_logger(__name__)

# HTTP status codes
HTTP_STATUS_REQUEST_ENTITY_TOO_LARGE = 413


@dataclass(slots=True)
class _LogsStreamingState:
"""Checkpoint state for streaming logs migration (small + restartable)."""
Expand Down Expand Up @@ -156,7 +156,15 @@ def __init__(

self._logger = logger.bind(migrator=self.__class__.__name__)
self._sdk_logs_writer: SDKProjectLogsWriter | None = None
self._sdk_flush_max_rows = int(self.SDK_FLUSH_MAX_ROWS)
cfg = getattr(self.dest_client, "migration_config", None) or getattr(
self.source_client, "migration_config", None
)
self._sdk_flush_max_rows = coerce_int_config(
cfg,
"events_flush_max_rows",
self.SDK_FLUSH_MAX_ROWS,
minimum=1,
)
self._sdk_flush_max_bytes = int(self.SDK_FLUSH_MAX_BYTES)

self._stream_state_path = self.checkpoint_dir / "logs_streaming_state.json"
Expand Down Expand Up @@ -188,9 +196,6 @@ def __init__(
)

# Byte-aware insert batching config (best-effort; falls back to count-only if missing).
cfg = getattr(self.dest_client, "migration_config", None) or getattr(
self.source_client, "migration_config", None
)
try:
max_req = int(getattr(cfg, "insert_max_request_bytes", 6 * 1024 * 1024))
headroom = float(getattr(cfg, "insert_request_headroom_ratio", 0.5))
Expand Down
34 changes: 28 additions & 6 deletions braintrust_migrate/streaming_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,30 @@ def mark_seen(self, ids: list[str]) -> None:
self._conn.commit()


def coerce_int_config(
cfg: Any,
attr_name: str,
default: int,
*,
minimum: int | None = None,
) -> int:
"""Best-effort int coercion for migration config values.

This is intentionally tolerant of mocks and unset attributes because many
unit tests construct lightweight client doubles without real config models.
"""

value = getattr(cfg, attr_name, default)
if not isinstance(value, int):
try:
value = int(value)
except Exception:
value = default
if minimum is not None and value < minimum:
return default
return value


def build_btql_sorted_page_query(
*,
from_expr: str,
Expand Down Expand Up @@ -304,9 +328,8 @@ async def _flush_pending() -> None:
"page_events": len(page_events),
"configured_fetch_limit": int(page_limit),
"fetched_total": state.fetched_events + pending_fetched_events,
"inserted_total": state.inserted_events + pending_inserted_events,
"inserted_bytes_total": state.inserted_bytes
+ pending_inserted_bytes,
"inserted_total": state.inserted_events,
"inserted_bytes_total": state.inserted_bytes,
"skipped_deleted_total": state.skipped_deleted
+ pending_skipped_deleted,
"skipped_seen_total": state.skipped_seen + pending_skipped_seen,
Expand Down Expand Up @@ -406,9 +429,8 @@ async def _flush_pending() -> None:
"page_num": page_num,
"page_events": len(page_events),
"fetched_total": state.fetched_events + pending_fetched_events,
"inserted_total": state.inserted_events + pending_inserted_events,
"inserted_bytes_total": state.inserted_bytes
+ pending_inserted_bytes,
"inserted_total": state.inserted_events,
"inserted_bytes_total": state.inserted_bytes,
"skipped_deleted_total": state.skipped_deleted
+ pending_skipped_deleted,
"skipped_seen_total": state.skipped_seen + pending_skipped_seen,
Expand Down
Loading
Loading