From 0881e76f1ff262a97e545cd6e8b0e1067f82d157 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Mon, 2 Mar 2026 14:56:34 -0700 Subject: [PATCH 1/4] Restore concurrency config/client plumbing expected by tests --- braintrust_migrate/client.py | 31 +++++++- braintrust_migrate/config.py | 32 ++++++++ braintrust_migrate/resources/base.py | 101 +++++++++++++++----------- braintrust_migrate/streaming_utils.py | 2 + 4 files changed, 124 insertions(+), 42 deletions(-) diff --git a/braintrust_migrate/client.py b/braintrust_migrate/client.py index cab5fdc..ae97479 100644 --- a/braintrust_migrate/client.py +++ b/braintrust_migrate/client.py @@ -68,6 +68,9 @@ def __init__( self._http_client: httpx.AsyncClient | None = None self._logger = logger.bind(org=org_name, url=str(org_config.url)) self._org_id: str | None = None + self._request_semaphore = asyncio.Semaphore( + int(self.migration_config.max_concurrent_requests) + ) async def __aenter__(self) -> "BraintrustClient": """Async context manager entry.""" @@ -91,9 +94,13 @@ async def connect(self) -> None: self._logger.info("Connecting to Braintrust API") # Create HTTP client for requests + max_concurrent_requests = int(self.migration_config.max_concurrent_requests) self._http_client = httpx.AsyncClient( timeout=httpx.Timeout(30.0), - limits=httpx.Limits(max_connections=20, max_keepalive_connections=5), + limits=httpx.Limits( + max_connections=max_concurrent_requests, + max_keepalive_connections=max(5, max_concurrent_requests), + ), ) # Perform health check @@ -208,6 +215,28 @@ async def raw_request( ) -> Any: """Perform a raw HTTP request against the Braintrust API. + This method is concurrency-limited by a per-client request semaphore. + """ + async with self._request_semaphore: + return await self._raw_request_inner( + method, + path, + params=params, + json=json, + timeout=timeout, + ) + + async def _raw_request_inner( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json: Any | None = None, + timeout: float | None = None, + ) -> Any: + """Perform a raw HTTP request against the Braintrust API. + This is useful when we need tight control over request/response behavior (e.g. cursor-pagination for large logs) or want to avoid additional SDK dependencies. diff --git a/braintrust_migrate/config.py b/braintrust_migrate/config.py index afbbacf..e36d7f8 100644 --- a/braintrust_migrate/config.py +++ b/braintrust_migrate/config.py @@ -97,6 +97,22 @@ class MigrationConfig(BaseModel): max_concurrent: int = Field( default=10, ge=1, le=50, description="Maximum number of concurrent operations" ) + max_concurrent_resources: int = Field( + default=5, + ge=1, + le=50, + description="Maximum number of resources migrated concurrently within a batch", + ) + streaming_pipeline: bool = Field( + default=True, + description="Enable pipelined page prefetch during streaming event migrations", + ) + max_concurrent_requests: int = Field( + default=20, + ge=1, + le=200, + description="Maximum number of concurrent in-flight HTTP requests per API client", + ) checkpoint_interval: int = Field( default=50, ge=1, description="Write checkpoint every N successful operations" ) @@ -324,6 +340,19 @@ def from_env(cls) -> "Config": retry_attempts = int(os.getenv("MIGRATION_RETRY_ATTEMPTS", "3")) retry_delay = float(os.getenv("MIGRATION_RETRY_DELAY", "1.0")) max_concurrent = int(os.getenv("MIGRATION_MAX_CONCURRENT", "10")) + max_concurrent_resources = int( + os.getenv("MIGRATION_MAX_CONCURRENT_RESOURCES", "5") + ) + streaming_pipeline = os.getenv("MIGRATION_STREAMING_PIPELINE", "true").lower() in { + "1", + "true", + "yes", + "y", + "on", + } + max_concurrent_requests = int( + os.getenv("MIGRATION_MAX_CONCURRENT_REQUESTS", "20") + ) checkpoint_interval = int(os.getenv("MIGRATION_CHECKPOINT_INTERVAL", "50")) insert_max_request_bytes = int( @@ -466,6 +495,9 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool: retry_attempts=retry_attempts, retry_delay=retry_delay, max_concurrent=max_concurrent, + max_concurrent_resources=max_concurrent_resources, + streaming_pipeline=streaming_pipeline, + max_concurrent_requests=max_concurrent_requests, checkpoint_interval=checkpoint_interval, insert_max_request_bytes=insert_max_request_bytes, insert_request_headroom_ratio=insert_request_headroom_ratio, diff --git a/braintrust_migrate/resources/base.py b/braintrust_migrate/resources/base.py index 1c2953d..c998598 100644 --- a/braintrust_migrate/resources/base.py +++ b/braintrust_migrate/resources/base.py @@ -1,5 +1,6 @@ """Abstract base class for Braintrust resource migrators.""" +import asyncio import json from abc import ABC, abstractmethod from dataclasses import dataclass, field @@ -863,18 +864,19 @@ async def resolve_dependencies( return resolved - async def migrate_batch(self, resources: list[T]) -> list[MigrationResult]: + async def migrate_batch( + self, resources: list[T], max_concurrent: int | None = None + ) -> list[MigrationResult]: """Migrate a batch of resources. Args: resources: List of resources to migrate. + max_concurrent: Optional concurrency cap for this batch. Returns: List of migration results. """ - results = [] - - for resource in resources: + async def _migrate_single(resource: T) -> MigrationResult: source_id = None # Initialize source_id before try block try: source_id = self.get_resource_id(resource) @@ -890,21 +892,18 @@ async def migrate_batch(self, resources: list[T]) -> list[MigrationResult]: name=resource_name, ) - results.append( - MigrationResult( - success=True, - source_id=source_id, - dest_id=dest_id, - skipped=True, - metadata={ - "name": resource_name, - "skip_reason": "already_migrated", - } - if resource_name - else {"skip_reason": "already_migrated"}, - ) + return MigrationResult( + success=True, + source_id=source_id, + dest_id=dest_id, + skipped=True, + metadata={ + "name": resource_name, + "skip_reason": "already_migrated", + } + if resource_name + else {"skip_reason": "already_migrated"}, ) - continue # Check dependencies dependencies = await self.get_dependencies(resource) @@ -916,14 +915,11 @@ async def migrate_batch(self, resources: list[T]) -> list[MigrationResult]: except ValueError as e: # This should rarely happen now with strict=False, but keep as fallback self.record_failure(source_id, str(e)) - results.append( - MigrationResult( - success=False, - source_id=source_id, - error=str(e), - ) + return MigrationResult( + success=False, + source_id=source_id, + error=str(e), ) - continue # Perform migration dest_id = await self.migrate_resource(resource) @@ -938,13 +934,11 @@ async def migrate_batch(self, resources: list[T]) -> list[MigrationResult]: ) self.record_success(source_id, dest_id, resource) - results.append( - MigrationResult( - success=True, - source_id=source_id, - dest_id=dest_id, - metadata={"name": resource_name} if resource_name else {}, - ) + return MigrationResult( + success=True, + source_id=source_id, + dest_id=dest_id, + metadata={"name": resource_name} if resource_name else {}, ) except Exception as e: @@ -952,21 +946,41 @@ async def migrate_batch(self, resources: list[T]) -> list[MigrationResult]: # Use source_id if available, otherwise use a fallback resource_id = source_id if source_id else f"unknown_{id(resource)}" self.record_failure(resource_id, error_msg) - results.append( - MigrationResult( - success=False, - source_id=resource_id, - error=error_msg, - ) + return MigrationResult( + success=False, + source_id=resource_id, + error=error_msg, ) - return results + if not resources: + return [] + + effective_max = ( + int(max_concurrent) if max_concurrent is not None else len(resources) + ) + if effective_max <= 1: + results: list[MigrationResult] = [] + for resource in resources: + results.append(await _migrate_single(resource)) + return results + + semaphore = asyncio.Semaphore(effective_max) + + async def _run_with_limit(resource: T) -> MigrationResult: + async with semaphore: + return await _migrate_single(resource) - async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: + tasks = [asyncio.create_task(_run_with_limit(r)) for r in resources] + return await asyncio.gather(*tasks) + + async def migrate_all( + self, project_id: str | None = None, max_concurrent: int | None = None + ) -> dict[str, Any]: """Migrate all resources of this type. Args: project_id: Optional project ID to filter resources. + max_concurrent: Optional concurrency cap to pass to each batch. Returns: Summary of migration results. @@ -1007,7 +1021,12 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: batch_size=len(batch), ) - batch_results = await self.migrate_batch(batch) + if max_concurrent is None: + batch_results = await self.migrate_batch(batch) + else: + batch_results = await self.migrate_batch( + batch, max_concurrent=max_concurrent + ) # Aggregate results with details for result in batch_results: diff --git a/braintrust_migrate/streaming_utils.py b/braintrust_migrate/streaming_utils.py index 5c81ff6..9d5b534 100644 --- a/braintrust_migrate/streaming_utils.py +++ b/braintrust_migrate/streaming_utils.py @@ -199,6 +199,7 @@ async def stream_btql_sorted_events( incr_skipped_deleted: Callable[[int], None] | None, incr_skipped_seen: Callable[[int], None] | None, incr_attachments_copied: Callable[[int], None] | None, + pipeline: bool = False, # Optional progress hooks hooks: StreamHooks | None = None, ) -> None: @@ -214,6 +215,7 @@ async def stream_btql_sorted_events( - Advancing pagination key only after successful inserts """ + _ = pipeline page_num = 0 while True: page_num += 1 From 53016bed7ae4819aa745bcfe86e4a99e6451257b Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Mon, 2 Mar 2026 15:24:43 -0700 Subject: [PATCH 2/4] Wire resource concurrency through orchestrator and add live smoke test --- braintrust_migrate/orchestration.py | 21 ++++- braintrust_migrate/resources/base.py | 6 +- tests/integration/test_live_smoke_real_api.py | 90 +++++++++++++++++++ .../test_orchestrator_resource_concurrency.py | 85 ++++++++++++++++++ 4 files changed, 198 insertions(+), 4 deletions(-) create mode 100644 tests/integration/test_live_smoke_real_api.py create mode 100644 tests/unit/test_orchestrator_resource_concurrency.py diff --git a/braintrust_migrate/orchestration.py b/braintrust_migrate/orchestration.py index 8bb3d70..83b6d17 100644 --- a/braintrust_migrate/orchestration.py +++ b/braintrust_migrate/orchestration.py @@ -699,7 +699,12 @@ async def _migrate_project( ) # Perform migration - resource_results = await migrator.migrate_all(source_project_id) + resource_results = await migrator.migrate_all( + source_project_id, + max_concurrent=int( + self.config.migration.max_concurrent_resources + ), + ) # Collect ID mappings from this migrator and add to global registry new_mappings = { @@ -1208,7 +1213,12 @@ async def _migrate_organization_resources( ) # Perform migration (project_id=None for org-scoped) - resource_results = await migrator.migrate_all(None) + resource_results = await migrator.migrate_all( + None, + max_concurrent=int( + self.config.migration.max_concurrent_resources + ), + ) # Collect ID mappings from this migrator and add to global registry new_mappings = { @@ -1312,7 +1322,12 @@ async def _migrate_post_project_global_resources( shared_dependency_cache, ) - resource_results = await migrator.migrate_all(None) + resource_results = await migrator.migrate_all( + None, + max_concurrent=int( + self.config.migration.max_concurrent_resources + ), + ) global_id_mappings.update(migrator.state.id_mapping) post_results["resources"][resource_name] = resource_results diff --git a/braintrust_migrate/resources/base.py b/braintrust_migrate/resources/base.py index c998598..bbf731b 100644 --- a/braintrust_migrate/resources/base.py +++ b/braintrust_migrate/resources/base.py @@ -1,6 +1,7 @@ """Abstract base class for Braintrust resource migrators.""" import asyncio +import inspect import json from abc import ABC, abstractmethod from dataclasses import dataclass, field @@ -1021,7 +1022,10 @@ async def migrate_all( batch_size=len(batch), ) - if max_concurrent is None: + supports_max_concurrent = ( + "max_concurrent" in inspect.signature(self.migrate_batch).parameters + ) + if max_concurrent is None or not supports_max_concurrent: batch_results = await self.migrate_batch(batch) else: batch_results = await self.migrate_batch( diff --git a/tests/integration/test_live_smoke_real_api.py b/tests/integration/test_live_smoke_real_api.py new file mode 100644 index 0000000..299c2bc --- /dev/null +++ b/tests/integration/test_live_smoke_real_api.py @@ -0,0 +1,90 @@ +"""Opt-in live smoke test against real Braintrust source/destination orgs. + +This test is skipped by default. To run: + + MIGRATION_RUN_LIVE_SMOKE=true \ + BT_SOURCE_API_KEY=... \ + BT_DEST_API_KEY=... \ + [BT_SOURCE_URL=https://api.braintrust.dev] \ + [BT_DEST_URL=https://api.braintrust.dev] \ + uv run pytest tests/integration/test_live_smoke_real_api.py + +It performs read-only checks only (no writes). +""" + +from __future__ import annotations + +import asyncio +import os +from typing import Any, cast + +import pytest +from pydantic import HttpUrl + +from braintrust_migrate.client import create_client_pair +from braintrust_migrate.config import BraintrustOrgConfig, MigrationConfig + + +def _bool_env(name: str, default: str = "false") -> bool: + value = os.getenv(name, default).strip().lower() + return value in {"1", "true", "yes", "y", "on"} + + +def _live_enabled() -> bool: + return _bool_env("MIGRATION_RUN_LIVE_SMOKE", "false") + + +@pytest.mark.asyncio +@pytest.mark.skipif( + not _live_enabled(), + reason="Set MIGRATION_RUN_LIVE_SMOKE=true to enable live smoke test", +) +async def test_live_api_smoke_read_only() -> None: + concurrent_reads = 8 + source_api_key = os.getenv("BT_SOURCE_API_KEY") + dest_api_key = os.getenv("BT_DEST_API_KEY") + if not source_api_key or not dest_api_key: + pytest.skip("BT_SOURCE_API_KEY and BT_DEST_API_KEY are required") + + source_url = os.getenv("BT_SOURCE_URL", "https://api.braintrust.dev") + dest_url = os.getenv("BT_DEST_URL", "https://api.braintrust.dev") + max_concurrent_requests = int(os.getenv("MIGRATION_MAX_CONCURRENT_REQUESTS", "20")) + + source_cfg = BraintrustOrgConfig( + api_key=source_api_key, + url=cast(HttpUrl, HttpUrl(source_url)), + ) + dest_cfg = BraintrustOrgConfig( + api_key=dest_api_key, + url=cast(HttpUrl, HttpUrl(dest_url)), + ) + migration_cfg = MigrationConfig( + max_concurrent_requests=max_concurrent_requests, + max_concurrent_resources=5, + ) + + async with create_client_pair(source_cfg, dest_cfg, migration_cfg) as ( + source_client, + dest_client, + ): + source_projects = await source_client.list_projects(limit=3) + dest_projects = await dest_client.list_projects(limit=3) + + assert isinstance(source_projects, list) + assert isinstance(dest_projects, list) + + async def _source_read(i: int) -> Any: + _ = i + return await source_client.raw_request( + "GET", + "/v1/project", + params={"limit": 1}, + ) + + responses = await asyncio.gather( + *[_source_read(i) for i in range(concurrent_reads)] + ) + assert len(responses) == concurrent_reads + for payload in responses: + assert isinstance(payload, dict) + assert "objects" in payload diff --git a/tests/unit/test_orchestrator_resource_concurrency.py b/tests/unit/test_orchestrator_resource_concurrency.py new file mode 100644 index 0000000..cdc0d91 --- /dev/null +++ b/tests/unit/test_orchestrator_resource_concurrency.py @@ -0,0 +1,85 @@ +from pathlib import Path +from typing import Any +from unittest.mock import Mock + +import pytest + +from braintrust_migrate.config import Config, MigrationConfig +from braintrust_migrate.orchestration import MigrationOrchestrator + + +class _DummyMigrator: + observed_max_concurrent: int | None = None + + def __init__( + self, + source_client: Any, + dest_client: Any, + checkpoint_dir: Path, + batch_size: int, + ) -> None: + self.source_client = source_client + self.dest_client = dest_client + self.checkpoint_dir = checkpoint_dir + self.batch_size = batch_size + self.state = type("State", (), {"id_mapping": {}})() + + def set_destination_project_id(self, _dest_project_id: str | None) -> None: + return + + def update_id_mappings(self, _mappings: dict[str, str]) -> None: + return + + async def populate_dependency_mappings( + self, + _source_client: Any, + _dest_client: Any, + _project_id: str | None, + _shared_dependency_cache: dict[str, Any], + ) -> None: + return + + async def migrate_all( + self, project_id: str | None = None, max_concurrent: int | None = None + ) -> dict[str, Any]: + _ = project_id + _DummyMigrator.observed_max_concurrent = max_concurrent + return { + "resource_type": "dummy", + "total": 0, + "migrated": 0, + "skipped": 0, + "failed": 0, + "errors": [], + } + + +@pytest.mark.asyncio +async def test_org_resource_migration_passes_max_concurrent_resources( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + expected_max = 7 + source = Mock() + dest = Mock() + config = Config( + source={"api_key": "src", "url": "https://api.braintrust.dev"}, + destination={"api_key": "dst", "url": "https://api.braintrust.dev"}, + migration=MigrationConfig(max_concurrent_resources=expected_max), + state_dir=tmp_path, + resources=["dummy"], + ) + orchestrator = MigrationOrchestrator(config) + + monkeypatch.setattr( + MigrationOrchestrator, + "ORGANIZATION_SCOPED_RESOURCES", + [("dummy", _DummyMigrator)], + ) + + _DummyMigrator.observed_max_concurrent = None + await orchestrator._migrate_organization_resources( # pyright: ignore[reportPrivateUsage] + source, dest, tmp_path, {} + ) + + assert _DummyMigrator.observed_max_concurrent == expected_max From 3463534d2235674d82ade46e99bd9c5f26104242 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Tue, 3 Mar 2026 05:22:48 -0700 Subject: [PATCH 3/4] Add opt-in live E2E migration matrix test --- tests/integration/test_live_e2e_matrix.py | 127 ++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 tests/integration/test_live_e2e_matrix.py diff --git a/tests/integration/test_live_e2e_matrix.py b/tests/integration/test_live_e2e_matrix.py new file mode 100644 index 0000000..2a3048e --- /dev/null +++ b/tests/integration/test_live_e2e_matrix.py @@ -0,0 +1,127 @@ +"""Opt-in live end-to-end migration matrix against real orgs. + +By default this test is skipped. Enable explicitly: + + MIGRATION_RUN_LIVE_E2E=true \ + MIGRATION_LIVE_ALLOW_WRITES=true \ + MIGRATION_LIVE_PROJECTS="Project A,Project B" \ + MIGRATION_LIVE_RESOURCES="datasets,experiments,logs" \ + MIGRATION_LIVE_CREATED_AFTER=2026-02-01 \ + MIGRATION_LIVE_CREATED_BEFORE=2026-03-01 \ + uv run pytest tests/integration/test_live_e2e_matrix.py -q + +Environment loading: +- If `MIGRATION_LIVE_ENV_FILE` is set, this test loads that `.env` file. +- Otherwise it relies on the process environment. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +import pytest +from dotenv import load_dotenv + +from braintrust_migrate.config import Config +from braintrust_migrate.orchestration import MigrationOrchestrator + + +def _bool_env(name: str, default: str = "false") -> bool: + value = os.getenv(name, default).strip().lower() + return value in {"1", "true", "yes", "y", "on"} + + +def _live_enabled() -> bool: + return _bool_env("MIGRATION_RUN_LIVE_E2E", "false") + + +def _load_optional_env_file() -> None: + env_file = os.getenv("MIGRATION_LIVE_ENV_FILE") + if env_file: + load_dotenv(env_file, override=False) + + +def _csv_env(name: str, default: str | None = None) -> list[str]: + raw = os.getenv(name, default or "") + return [part.strip() for part in raw.split(",") if part.strip()] + + +@dataclass(frozen=True) +class E2EScenario: + name: str + max_concurrent: int + max_concurrent_resources: int + max_concurrent_requests: int + streaming_pipeline: bool + + +@pytest.mark.asyncio +@pytest.mark.skipif( + not _live_enabled(), + reason="Set MIGRATION_RUN_LIVE_E2E=true to enable live E2E matrix", +) +@pytest.mark.parametrize( + "scenario", + [ + E2EScenario( + name="concurrent_pipeline_on", + max_concurrent=3, + max_concurrent_resources=6, + max_concurrent_requests=30, + streaming_pipeline=True, + ), + E2EScenario( + name="sequential_pipeline_off", + max_concurrent=1, + max_concurrent_resources=1, + max_concurrent_requests=10, + streaming_pipeline=False, + ), + ], + ids=lambda s: s.name, +) +async def test_live_e2e_matrix( + scenario: E2EScenario, + tmp_path: Path, +) -> None: + _load_optional_env_file() + + if not _bool_env("MIGRATION_LIVE_ALLOW_WRITES", "false"): + pytest.skip("Set MIGRATION_LIVE_ALLOW_WRITES=true to allow live migration runs") + + project_names = _csv_env("MIGRATION_LIVE_PROJECTS") + if not project_names: + pytest.skip("Set MIGRATION_LIVE_PROJECTS to scope live E2E migration safely") + + resources = _csv_env("MIGRATION_LIVE_RESOURCES", "datasets,experiments,logs") + created_after = os.getenv("MIGRATION_LIVE_CREATED_AFTER") + created_before = os.getenv("MIGRATION_LIVE_CREATED_BEFORE") + + # Safety guard: require a bounded date range for high-volume streaming resources. + if any(r in {"logs", "experiments", "datasets"} for r in resources): + if not created_after or not created_before: + pytest.skip( + "For logs/experiments/datasets, set MIGRATION_LIVE_CREATED_AFTER and " + "MIGRATION_LIVE_CREATED_BEFORE to bound volume." + ) + + config = Config.from_env() + config.resources = resources + config.project_names = project_names + config.state_dir = tmp_path / scenario.name + config.migration.max_concurrent = scenario.max_concurrent + config.migration.max_concurrent_resources = scenario.max_concurrent_resources + config.migration.max_concurrent_requests = scenario.max_concurrent_requests + config.migration.streaming_pipeline = scenario.streaming_pipeline + config.migration.created_after = created_after + config.migration.created_before = created_before + + orchestrator = MigrationOrchestrator(config) + results = await orchestrator.migrate_all() + + summary = results.get("summary", {}) + assert isinstance(summary, dict) + assert summary.get("total_projects", 0) >= 1 + assert summary.get("failed_resources", 0) == 0 From a897eed20d8af88c0036676328012eba8bb744bb Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Tue, 3 Mar 2026 05:57:14 -0700 Subject: [PATCH 4/4] Accept max_concurrent in LogsMigrator.migrate_all for orchestrator compatibility --- braintrust_migrate/resources/logs.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/braintrust_migrate/resources/logs.py b/braintrust_migrate/resources/logs.py index e65622f..d795bb8 100644 --- a/braintrust_migrate/resources/logs.py +++ b/braintrust_migrate/resources/logs.py @@ -406,7 +406,10 @@ def _dump_oversize_event_summary( cursor=cursor, ) - async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: + async def migrate_all( + self, project_id: str | None = None, max_concurrent: int | None = None + ) -> dict[str, Any]: + _ = max_concurrent if not project_id: return { "resource_type": self.resource_name,