diff --git a/README.md b/README.md index 3a61a1b..cd4166a 100644 --- a/README.md +++ b/README.md @@ -166,11 +166,12 @@ These settings control BTQL-based streaming for high-volume resources. |---------------------|----------|---------|-------------| | `MIGRATION_EVENTS_FETCH_LIMIT` | — | `1000` | BTQL fetch page size (rows per query) | | `MIGRATION_EVENTS_FETCH_GROUP_SIZE` | — | `25` | Number of experiment or dataset ids to group into one BTQL event stream | +| `MIGRATION_EVENTS_FLUSH_MAX_ROWS` | — | `5000` | Buffered flush threshold shared by logs, experiment events, and dataset events | | `MIGRATION_EVENTS_USE_SEEN_DB` | — | `true` | Use SQLite store for deduplication | | `MIGRATION_LOGS_FETCH_LIMIT` | `--logs-fetch-limit` | *(inherits)* | Override fetch limit for logs only | -| `MIGRATION_LOGS_INSERT_BATCH_SIZE` | `--logs-insert-batch-size` | `200` | Max rows per pre-SDK logs insert chunk before enqueueing to the SDK writer | +| `MIGRATION_LOGS_INSERT_BATCH_SIZE` | `--logs-insert-batch-size` | `5000` | Deprecated alias for `MIGRATION_EVENTS_FLUSH_MAX_ROWS` | -Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT` and `MIGRATION_{RESOURCE}_USE_SEEN_DB` where `{RESOURCE}` is `LOGS`, `EXPERIMENT_EVENTS`, or `DATASET_EVENTS`. Logs additionally support `MIGRATION_LOGS_INSERT_BATCH_SIZE` and `MIGRATION_LOGS_USE_VERSION_SNAPSHOT`. +Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT` and `MIGRATION_{RESOURCE}_USE_SEEN_DB` where `{RESOURCE}` is `LOGS`, `EXPERIMENT_EVENTS`, or `DATASET_EVENTS`. Logs additionally support `MIGRATION_LOGS_USE_VERSION_SNAPSHOT`. The older `MIGRATION_LOGS_INSERT_BATCH_SIZE` name is still accepted as a compatibility alias for the shared flush threshold. #### Insert Request Sizing diff --git a/braintrust_migrate/cli.py b/braintrust_migrate/cli.py index fbf1c24..9141b94 100644 --- a/braintrust_migrate/cli.py +++ b/braintrust_migrate/cli.py @@ -157,7 +157,11 @@ def migrate( int | None, typer.Option( "--logs-insert-batch-size", - help="Insert batch size for streaming logs (number of events per insert call)", + help=( + "Deprecated alias for the shared streaming flush threshold " + "(rows buffered before flush across logs, experiment events, " + "and dataset events)" + ), envvar="MIGRATION_LOGS_INSERT_BATCH_SIZE", ), ] = None, @@ -334,6 +338,7 @@ async def _migrate_main( if logs_fetch_limit is not None: config.migration.logs_fetch_limit = logs_fetch_limit if logs_insert_batch_size is not None: + config.migration.events_flush_max_rows = logs_insert_batch_size config.migration.logs_insert_batch_size = logs_insert_batch_size if created_after is not None: config.migration.created_after = canonicalize_created_after(created_after) @@ -357,6 +362,7 @@ async def _migrate_main( state_dir=str(config.state_dir), dry_run=dry_run, logs_fetch_limit=config.migration.logs_fetch_limit, + events_flush_max_rows=config.migration.events_flush_max_rows, logs_insert_batch_size=config.migration.logs_insert_batch_size, created_after=config.migration.created_after, created_before=config.migration.created_before, diff --git a/braintrust_migrate/config.py b/braintrust_migrate/config.py index ada99fc..6bbebaf 100644 --- a/braintrust_migrate/config.py +++ b/braintrust_migrate/config.py @@ -144,11 +144,24 @@ class MigrationConfig(BaseModel): le=1_000, description="Fetch page size for streaming logs migration via BTQL (limit is in rows/spans)", ) + events_flush_max_rows: int = Field( + default=5_000, + ge=1, + le=50_000, + description=( + "Buffered flush threshold for streaming logs, experiment events, and " + "dataset events. Rows may be fetched across multiple BTQL pages before " + "one flush is committed." + ), + ) logs_insert_batch_size: int = Field( - default=200, + default=5_000, ge=1, - le=1_000, - description="Insert batch size for streaming logs migration (number of events per insert call)", + le=50_000, + description=( + "Deprecated compatibility alias for events_flush_max_rows. Prefer " + "MIGRATION_EVENTS_FLUSH_MAX_ROWS." + ), ) logs_use_version_snapshot: bool = Field( default=True, @@ -370,6 +383,7 @@ def from_env(cls) -> "Config": # MIGRATION_EVENTS_FETCH_LIMIT=1000 # MIGRATION_EVENTS_USE_SEEN_DB=true # MIGRATION_EVENTS_FETCH_GROUP_SIZE=25 + # MIGRATION_EVENTS_FLUSH_MAX_ROWS=5000 # # Resource-specific overrides (optional): # MIGRATION_LOGS_FETCH_LIMIT, MIGRATION_EXPERIMENT_EVENTS_FETCH_LIMIT, etc. @@ -384,14 +398,16 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool: events_fetch_group_size = int( os.getenv("MIGRATION_EVENTS_FETCH_GROUP_SIZE", "25") ) + events_flush_max_rows = int( + os.getenv("MIGRATION_EVENTS_FLUSH_MAX_ROWS") + or os.getenv("MIGRATION_LOGS_INSERT_BATCH_SIZE", "5000") + ) # Logs logs_fetch_limit = _get_int( "MIGRATION_LOGS_FETCH_LIMIT", "MIGRATION_EVENTS_FETCH_LIMIT", "1000" ) - logs_insert_batch_size = int( - os.getenv("MIGRATION_LOGS_INSERT_BATCH_SIZE", "200") - ) + logs_insert_batch_size = events_flush_max_rows logs_use_version_snapshot = os.getenv( "MIGRATION_LOGS_USE_VERSION_SNAPSHOT", "true" ).lower() in {"1", "true", "yes", "y", "on"} @@ -499,6 +515,7 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool: insert_max_request_bytes=insert_max_request_bytes, insert_request_headroom_ratio=insert_request_headroom_ratio, logs_fetch_limit=logs_fetch_limit, + events_flush_max_rows=events_flush_max_rows, logs_insert_batch_size=logs_insert_batch_size, logs_use_version_snapshot=logs_use_version_snapshot, logs_use_seen_db=logs_use_seen_db, diff --git a/braintrust_migrate/resources/datasets.py b/braintrust_migrate/resources/datasets.py index 916c726..661a901 100644 --- a/braintrust_migrate/resources/datasets.py +++ b/braintrust_migrate/resources/datasets.py @@ -22,6 +22,7 @@ EventsStreamState, SeenIdsDB, build_btql_sorted_page_query, + coerce_int_config, stream_btql_sorted_events_buffered, ) @@ -30,21 +31,6 @@ # HTTP status codes HTTP_STATUS_REQUEST_ENTITY_TOO_LARGE = 413 - -def _coerce_int_config( - cfg: Any, attr_name: str, default: int, *, minimum: int | None = None -) -> int: - value = getattr(cfg, attr_name, default) - if not isinstance(value, int): - try: - value = int(value) - except Exception: - value = default - if minimum is not None and value < minimum: - return default - return value - - class DatasetMigrator(ResourceMigrator[dict]): """Migrator for Braintrust datasets. @@ -102,9 +88,14 @@ def __init__( self._insert_max_bytes: int | None = int(max_req * headroom) except Exception: self._insert_max_bytes = None - self._sdk_flush_max_rows = int(self.SDK_FLUSH_MAX_ROWS) + self._sdk_flush_max_rows = coerce_int_config( + cfg, + "events_flush_max_rows", + self.SDK_FLUSH_MAX_ROWS, + minimum=1, + ) self._sdk_flush_max_bytes = int(self.SDK_FLUSH_MAX_BYTES) - self._event_fetch_group_size = _coerce_int_config( + self._event_fetch_group_size = coerce_int_config( cfg, "events_fetch_group_size", self.DEFAULT_EVENT_FETCH_GROUP_SIZE, @@ -803,6 +794,35 @@ async def _fetch_group(n: int) -> dict[str, Any]: "next_cursor": None, } ), + "on_insert": lambda info, _p=progress: _p( + { + "resource": "dataset_events", + "phase": "insert", + "source_dataset_ids": source_dataset_ids, + "dest_dataset_ids": list(source_to_dest_dataset_ids.values()), + "page_num": None, + "page_events": None, + "inserted_last": info.get("inserted_last"), + "inserted_bytes_last": info.get("inserted_bytes_last"), + "insert_seconds": info.get("insert_seconds"), + "flush_rows": info.get("flush_rows"), + "flush_buffer_bytes": info.get("flush_buffer_bytes"), + "fetched_total": state.fetched_events, + "inserted_total": state.inserted_events, + "inserted_bytes_total": state.inserted_bytes, + "skipped_deleted_total": state.skipped_deleted, + "skipped_seen_total": state.skipped_seen, + "attachments_copied_total": state.attachments_copied, + "pending_buffered_rows": 0, + "pending_buffered_bytes": 0, + "cursor": ( + (state.btql_min_pagination_key[:16] + "…") + if isinstance(state.btql_min_pagination_key, str) + else None + ), + "next_cursor": None, + } + ), "on_done": lambda info, _p=progress: _p( { "resource": "dataset_events", diff --git a/braintrust_migrate/resources/experiments.py b/braintrust_migrate/resources/experiments.py index 3c09a2f..333094c 100644 --- a/braintrust_migrate/resources/experiments.py +++ b/braintrust_migrate/resources/experiments.py @@ -21,27 +21,13 @@ EventsStreamState, SeenIdsDB, build_btql_sorted_page_query, + coerce_int_config, stream_btql_sorted_events_buffered, ) # HTTP status codes HTTP_STATUS_REQUEST_ENTITY_TOO_LARGE = 413 - -def _coerce_int_config( - cfg: Any, attr_name: str, default: int, *, minimum: int | None = None -) -> int: - value = getattr(cfg, attr_name, default) - if not isinstance(value, int): - try: - value = int(value) - except Exception: - value = default - if minimum is not None and value < minimum: - return default - return value - - class ExperimentMigrator(ResourceMigrator[dict]): """Migrator for Braintrust experiments. @@ -54,7 +40,7 @@ class ExperimentMigrator(ResourceMigrator[dict]): Uses raw API requests instead of SDK to avoid model dependencies. """ - SDK_FLUSH_MAX_ROWS: ClassVar[int] = 1_000 + SDK_FLUSH_MAX_ROWS: ClassVar[int] = 5_000 SDK_FLUSH_MAX_BYTES: ClassVar[int] = 25 * 1024 * 1024 DEFAULT_EVENT_FETCH_GROUP_SIZE: ClassVar[int] = 25 @@ -99,9 +85,14 @@ def __init__( self._insert_max_bytes: int | None = int(max_req * headroom) except Exception: self._insert_max_bytes = None - self._sdk_flush_max_rows = int(self.SDK_FLUSH_MAX_ROWS) + self._sdk_flush_max_rows = coerce_int_config( + cfg, + "events_flush_max_rows", + self.SDK_FLUSH_MAX_ROWS, + minimum=1, + ) self._sdk_flush_max_bytes = int(self.SDK_FLUSH_MAX_BYTES) - self._event_fetch_group_size = _coerce_int_config( + self._event_fetch_group_size = coerce_int_config( cfg, "events_fetch_group_size", self.DEFAULT_EVENT_FETCH_GROUP_SIZE, @@ -887,6 +878,41 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None: "next_cursor": None, } ), + "on_insert": lambda info, _p=progress: _p( + { + "resource": "experiment_events", + "phase": "insert", + "source_experiment_ids": source_experiment_ids, + "dest_experiment_ids": list( + source_to_dest_experiment_ids.values() + ), + "page_num": None, + "page_events": None, + "inserted_last": info.get("inserted_last"), + "inserted_bytes_last": info.get( + "inserted_bytes_last" + ), + "insert_seconds": info.get("insert_seconds"), + "flush_rows": info.get("flush_rows"), + "flush_buffer_bytes": info.get( + "flush_buffer_bytes" + ), + "fetched_total": state.fetched_events, + "inserted_total": state.inserted_events, + "inserted_bytes_total": state.inserted_bytes, + "skipped_deleted_total": state.skipped_deleted, + "skipped_seen_total": state.skipped_seen, + "attachments_copied_total": state.attachments_copied, + "pending_buffered_rows": 0, + "pending_buffered_bytes": 0, + "cursor": ( + (state.btql_min_pagination_key[:16] + "…") + if isinstance(state.btql_min_pagination_key, str) + else None + ), + "next_cursor": None, + } + ), "on_done": lambda info, _p=progress: _p( { "resource": "experiment_events", diff --git a/braintrust_migrate/resources/logs.py b/braintrust_migrate/resources/logs.py index fad64cf..6bcb1df 100644 --- a/braintrust_migrate/resources/logs.py +++ b/braintrust_migrate/resources/logs.py @@ -32,6 +32,7 @@ from braintrust_migrate.streaming_utils import ( SeenIdsDB, build_btql_sorted_page_query, + coerce_int_config, ) logger = structlog.get_logger(__name__) @@ -39,7 +40,6 @@ # HTTP status codes HTTP_STATUS_REQUEST_ENTITY_TOO_LARGE = 413 - @dataclass(slots=True) class _LogsStreamingState: """Checkpoint state for streaming logs migration (small + restartable).""" @@ -156,7 +156,15 @@ def __init__( self._logger = logger.bind(migrator=self.__class__.__name__) self._sdk_logs_writer: SDKProjectLogsWriter | None = None - self._sdk_flush_max_rows = int(self.SDK_FLUSH_MAX_ROWS) + cfg = getattr(self.dest_client, "migration_config", None) or getattr( + self.source_client, "migration_config", None + ) + self._sdk_flush_max_rows = coerce_int_config( + cfg, + "events_flush_max_rows", + self.SDK_FLUSH_MAX_ROWS, + minimum=1, + ) self._sdk_flush_max_bytes = int(self.SDK_FLUSH_MAX_BYTES) self._stream_state_path = self.checkpoint_dir / "logs_streaming_state.json" @@ -188,9 +196,6 @@ def __init__( ) # Byte-aware insert batching config (best-effort; falls back to count-only if missing). - cfg = getattr(self.dest_client, "migration_config", None) or getattr( - self.source_client, "migration_config", None - ) try: max_req = int(getattr(cfg, "insert_max_request_bytes", 6 * 1024 * 1024)) headroom = float(getattr(cfg, "insert_request_headroom_ratio", 0.5)) diff --git a/braintrust_migrate/streaming_utils.py b/braintrust_migrate/streaming_utils.py index 0df3c2a..ba3d6d5 100644 --- a/braintrust_migrate/streaming_utils.py +++ b/braintrust_migrate/streaming_utils.py @@ -102,6 +102,30 @@ def mark_seen(self, ids: list[str]) -> None: self._conn.commit() +def coerce_int_config( + cfg: Any, + attr_name: str, + default: int, + *, + minimum: int | None = None, +) -> int: + """Best-effort int coercion for migration config values. + + This is intentionally tolerant of mocks and unset attributes because many + unit tests construct lightweight client doubles without real config models. + """ + + value = getattr(cfg, attr_name, default) + if not isinstance(value, int): + try: + value = int(value) + except Exception: + value = default + if minimum is not None and value < minimum: + return default + return value + + def build_btql_sorted_page_query( *, from_expr: str, @@ -304,9 +328,8 @@ async def _flush_pending() -> None: "page_events": len(page_events), "configured_fetch_limit": int(page_limit), "fetched_total": state.fetched_events + pending_fetched_events, - "inserted_total": state.inserted_events + pending_inserted_events, - "inserted_bytes_total": state.inserted_bytes - + pending_inserted_bytes, + "inserted_total": state.inserted_events, + "inserted_bytes_total": state.inserted_bytes, "skipped_deleted_total": state.skipped_deleted + pending_skipped_deleted, "skipped_seen_total": state.skipped_seen + pending_skipped_seen, @@ -406,9 +429,8 @@ async def _flush_pending() -> None: "page_num": page_num, "page_events": len(page_events), "fetched_total": state.fetched_events + pending_fetched_events, - "inserted_total": state.inserted_events + pending_inserted_events, - "inserted_bytes_total": state.inserted_bytes - + pending_inserted_bytes, + "inserted_total": state.inserted_events, + "inserted_bytes_total": state.inserted_bytes, "skipped_deleted_total": state.skipped_deleted + pending_skipped_deleted, "skipped_seen_total": state.skipped_seen + pending_skipped_seen, diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index b28bea5..367a56c 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -148,3 +148,25 @@ def test_valid_env_config(self, monkeypatch): assert config.migration.group_auto_invite_users is True assert config.migration.events_fetch_group_size == 17 assert config.logging.level == "DEBUG" + + def test_unified_events_flush_max_rows_from_env(self, monkeypatch): + """Test shared streaming flush threshold env var.""" + monkeypatch.setenv("BT_SOURCE_API_KEY", "source-test-key") + monkeypatch.setenv("BT_DEST_API_KEY", "dest-test-key") + monkeypatch.setenv("MIGRATION_EVENTS_FLUSH_MAX_ROWS", "4321") + + config = Config.from_env() + + assert config.migration.events_flush_max_rows == 4321 + assert config.migration.logs_insert_batch_size == 4321 + + def test_legacy_logs_insert_batch_size_alias_still_works(self, monkeypatch): + """Test legacy logs-only env var maps to shared flush threshold.""" + monkeypatch.setenv("BT_SOURCE_API_KEY", "source-test-key") + monkeypatch.setenv("BT_DEST_API_KEY", "dest-test-key") + monkeypatch.setenv("MIGRATION_LOGS_INSERT_BATCH_SIZE", "3456") + + config = Config.from_env() + + assert config.migration.events_flush_max_rows == 3456 + assert config.migration.logs_insert_batch_size == 3456 diff --git a/tests/unit/test_logs_streaming_migrator.py b/tests/unit/test_logs_streaming_migrator.py index 877cf5a..a262b9e 100644 --- a/tests/unit/test_logs_streaming_migrator.py +++ b/tests/unit/test_logs_streaming_migrator.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Any +from unittest.mock import Mock import pytest @@ -130,3 +131,20 @@ async def test_logs_migrator_skips_duplicate_ids_across_pages(tmp_path: Path) -> assert (tmp_path / "logs_streaming_state.json").exists() finally: logs_module.SDKProjectLogsWriter = original_writer + + +def test_logs_migrator_defaults_flush_rows_when_config_is_a_mock(tmp_path: Path) -> None: + source = Mock() + source.migration_config = Mock() + dest = Mock() + dest.migration_config = Mock() + + migrator = LogsMigrator( + source, # type: ignore[arg-type] + dest, # type: ignore[arg-type] + tmp_path, + page_limit=1, + insert_batch_size=2, + ) + + assert migrator._sdk_flush_max_rows == LogsMigrator.SDK_FLUSH_MAX_ROWS diff --git a/tests/unit/test_streaming_utf8_and_rewrite_batching.py b/tests/unit/test_streaming_utf8_and_rewrite_batching.py index 95a2114..a4b64af 100644 --- a/tests/unit/test_streaming_utf8_and_rewrite_batching.py +++ b/tests/unit/test_streaming_utf8_and_rewrite_batching.py @@ -67,3 +67,62 @@ async def insert_events(batch: list[dict[str, Any]]) -> None: assert state.attachments_copied == 2 assert len(insert_updates) == 1 assert insert_updates[0]["flush_buffer_bytes"] > max_bytes + + +@pytest.mark.asyncio +async def test_page_progress_reports_committed_rows_only() -> None: + page_calls = 0 + page_updates: list[dict[str, Any]] = [] + insert_updates: list[dict[str, Any]] = [] + state = EventsStreamState() + + async def fetch_page(_limit: int) -> dict[str, Any]: + nonlocal page_calls + page_calls += 1 + if page_calls == 1: + return { + "events": [ + {"id": "a", "_pagination_key": "p1"}, + {"id": "b", "_pagination_key": "p2"}, + ], + "btql_last_pagination_key": "p2", + } + if page_calls == 2: + return { + "events": [ + {"id": "c", "_pagination_key": "p3"}, + ], + "btql_last_pagination_key": "p3", + } + return {"events": [], "btql_last_pagination_key": None} + + async def insert_events(_batch: list[dict[str, Any]]) -> None: + return None + + await stream_btql_sorted_events_buffered( + fetch_page=fetch_page, + page_limit=100, + state=state, + save_state=lambda: None, + page_event_filter=None, + event_to_insert=lambda event: {"id": event["id"]}, + seen_db=None, + rewrite_event_in_place=None, + insert_events=insert_events, + flush_max_rows=3, + flush_max_bytes=10_000_000, + is_http_413=lambda _exc: False, + on_single_413=None, + hooks={ + "on_page": page_updates.append, + "on_insert": insert_updates.append, + }, + ) + + assert len(page_updates) >= 2 + assert page_updates[0]["inserted_total"] == 0 + assert page_updates[0]["pending_buffered_rows"] == 2 + assert page_updates[1]["inserted_total"] == 3 + assert page_updates[1]["pending_buffered_rows"] == 0 + assert len(insert_updates) == 1 + assert insert_updates[0]["inserted_last"] == 3