Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ Resource-specific overrides follow the pattern `MIGRATION_{RESOURCE}_FETCH_LIMIT
| — | `--config`, `-c` | *(none)* | Path to YAML/JSON configuration file |
| `MIGRATION_COPY_ATTACHMENTS` | — | `false` | Copy Braintrust-managed attachments between orgs |
| `MIGRATION_ATTACHMENT_MAX_BYTES` | — | `52428800` (50MB) | Maximum attachment size to copy |
| `MIGRATION_ACL_MAP_USERS` | `--acl-map-users` | `false` | Map ACL `user_id` by matching source/destination users on email |
| `MIGRATION_ACL_AUTO_INVITE_USERS` | `--acl-auto-invite-users` | `false` | If ACL user mapping is enabled, invite missing users and retry mapping |
| `MIGRATION_GROUP_MAP_USERS` | `--group-map-users` | `false` | Map group `member_users` by matching source/destination users on email |
| `MIGRATION_GROUP_AUTO_INVITE_USERS` | `--group-auto-invite-users` | `false` | If group user mapping is enabled, invite missing users and retry mapping |

---

Expand Down Expand Up @@ -358,6 +362,10 @@ After all projects finish, **ACLs** run once as a global post-project phase so A
- **ACLs**: Migrated in a final post-project global phase after resource ID mappings are established. ACLs targeting users or unsupported object types are skipped with explicit reasons.
- Optional user ACL mapping: set `MIGRATION_ACL_MAP_USERS=true` to map `user_id` ACLs by matching source/destination users on email.
- Optional auto-invite fallback: set `MIGRATION_ACL_AUTO_INVITE_USERS=true` (with user mapping enabled) to invite missing users to the destination org and retry ACL user mapping.
- **Groups**: Group definitions and `member_groups` inheritance are migrated by default.
- Optional group member user mapping: set `MIGRATION_GROUP_MAP_USERS=true` to map `member_users` by matching source/destination users on email.
- Optional auto-invite fallback: set `MIGRATION_GROUP_AUTO_INVITE_USERS=true` (with group mapping enabled) to invite missing users to destination and retry membership mapping.
- Caveat: group member user mapping is applied in the create-group payload. If a destination group already exists and is returned unchanged by create, this flow does not currently issue a follow-up patch to reconcile `member_users`.
- **Agents and users**: Not supported for migration (users are org-specific; agents are not present in the codebase).

### Progress Monitoring
Expand Down
30 changes: 30 additions & 0 deletions braintrust_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,26 @@ def migrate(
envvar="MIGRATION_ACL_AUTO_INVITE_USERS",
),
] = None,
group_map_users: Annotated[
bool | None,
typer.Option(
"--group-map-users/--no-group-map-users",
help=(
"When migrating groups, map member_users by matching source and destination users on email."
),
envvar="MIGRATION_GROUP_MAP_USERS",
),
] = None,
group_auto_invite_users: Annotated[
bool | None,
typer.Option(
"--group-auto-invite-users/--no-group-auto-invite-users",
help=(
"When group user mapping is enabled, invite missing users into destination org and retry mapping."
),
envvar="MIGRATION_GROUP_AUTO_INVITE_USERS",
),
] = None,
) -> None:
"""Migrate resources from source to destination Braintrust organization.

Expand Down Expand Up @@ -233,6 +253,8 @@ def migrate(
created_before,
acl_map_users,
acl_auto_invite_users,
group_map_users,
group_auto_invite_users,
)
)

Expand All @@ -252,6 +274,8 @@ async def _migrate_main(
created_before: str | None,
acl_map_users: bool | None,
acl_auto_invite_users: bool | None,
group_map_users: bool | None,
group_auto_invite_users: bool | None,
) -> None:
"""Async implementation of the migrate command."""
setup_logging(log_level, log_format)
Expand Down Expand Up @@ -319,6 +343,10 @@ async def _migrate_main(
config.migration.acl_map_users = acl_map_users
if acl_auto_invite_users is not None:
config.migration.acl_auto_invite_users = acl_auto_invite_users
if group_map_users is not None:
config.migration.group_map_users = group_map_users
if group_auto_invite_users is not None:
config.migration.group_auto_invite_users = group_auto_invite_users

logger.info(
"Starting migration",
Expand All @@ -334,6 +362,8 @@ async def _migrate_main(
created_before=config.migration.created_before,
acl_map_users=config.migration.acl_map_users,
acl_auto_invite_users=config.migration.acl_auto_invite_users,
group_map_users=config.migration.group_map_users,
group_auto_invite_users=config.migration.group_auto_invite_users,
resume_run_dir=str(resume_run_dir) if resume_run_dir is not None else None,
inferred_project=inferred_project,
)
Expand Down
30 changes: 30 additions & 0 deletions braintrust_migrate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ class MigrationConfig(BaseModel):
default=False,
description="If acl_map_users is enabled and a user is missing in destination, invite them to destination org before creating user ACL.",
)
group_map_users: bool = Field(
default=False,
description="Attempt to map group member_users entries between source and destination users by email.",
)
group_auto_invite_users: bool = Field(
default=False,
description="If group_map_users is enabled and a user is missing in destination, invite them to destination org before adding group membership.",
)

@field_validator("created_after")
def validate_created_after(cls, v: str | None) -> str | None:
Expand All @@ -246,6 +254,10 @@ def validate_acl_user_mapping_flags(self) -> "MigrationConfig":
raise ValueError(
"acl_auto_invite_users requires acl_map_users to also be enabled"
)
if self.group_auto_invite_users and not self.group_map_users:
raise ValueError(
"group_auto_invite_users requires group_map_users to also be enabled"
)
return self


Expand Down Expand Up @@ -444,6 +456,22 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
"y",
"on",
}
group_map_users = os.getenv("MIGRATION_GROUP_MAP_USERS", "false").lower() in {
"1",
"true",
"yes",
"y",
"on",
}
group_auto_invite_users = os.getenv(
"MIGRATION_GROUP_AUTO_INVITE_USERS", "false"
).lower() in {
"1",
"true",
"yes",
"y",
"on",
}

# Logging settings
log_level = os.getenv("LOG_LEVEL", "INFO")
Expand Down Expand Up @@ -487,6 +515,8 @@ def _get_bool(specific_key: str, unified_key: str, default: str) -> bool:
attachment_max_bytes=attachment_max_bytes,
acl_map_users=acl_map_users,
acl_auto_invite_users=acl_auto_invite_users,
group_map_users=group_map_users,
group_auto_invite_users=group_auto_invite_users,
),
logging=LoggingConfig(
level=log_level,
Expand Down
207 changes: 195 additions & 12 deletions braintrust_migrate/resources/groups.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Group migrator for Braintrust migration tool."""

import asyncio

from braintrust_migrate.resources.base import ResourceMigrator


Expand All @@ -10,17 +12,172 @@ class GroupMigrator(ResourceMigrator[dict]):
Groups can inherit from other groups via the member_groups field,
so parent groups must be migrated before child groups.

Note: member_users are excluded from migration since users are
organization-specific and cannot be migrated between organizations.
Note: member_users can be migrated only when explicit user mapping is enabled.
By default, member_users are skipped since users are organization-specific.

Uses raw API requests instead of SDK to avoid model dependencies.
"""

def __init__(self, source_client, dest_client, checkpoint_dir, batch_size: int = 100):
super().__init__(source_client, dest_client, checkpoint_dir, batch_size=batch_size)
self._source_user_email_cache: dict[str, str | None] = {}
self._dest_user_id_by_email_cache: dict[str, str | None] = {}
self._invited_user_emails: set[str] = set()

@property
def resource_name(self) -> str:
"""Human-readable name for this resource type."""
return "Groups"

def _group_user_mapping_enabled(self) -> bool:
"""Return whether group member user mapping is enabled."""
cfg = getattr(self.source_client, "migration_config", None) or getattr(
self.dest_client, "migration_config", None
)
value = getattr(cfg, "group_map_users", False)
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return False

def _group_auto_invite_enabled(self) -> bool:
"""Return whether group member auto-invite fallback is enabled."""
cfg = getattr(self.source_client, "migration_config", None) or getattr(
self.dest_client, "migration_config", None
)
value = getattr(cfg, "group_auto_invite_users", False)
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return False

async def _get_source_user_email(self, source_user_id: str) -> str | None:
"""Get source user email by user ID."""
if source_user_id in self._source_user_email_cache:
return self._source_user_email_cache[source_user_id]

try:
response = await self.source_client.with_retry(
"get_source_user_for_group_member",
lambda uid=source_user_id: self.source_client.raw_request(
"GET",
f"/v1/user/{uid}",
),
)
email = response.get("email") if isinstance(response, dict) else None
email = email.strip().lower() if isinstance(email, str) and email.strip() else None
self._source_user_email_cache[source_user_id] = email
return email
except Exception:
self._source_user_email_cache[source_user_id] = None
return None

async def _find_dest_user_id_by_email(
self, email: str, *, force_refresh: bool = False
) -> str | None:
"""Find destination user ID by email."""
normalized_email = email.strip().lower()
if force_refresh:
self._dest_user_id_by_email_cache.pop(normalized_email, None)
if normalized_email in self._dest_user_id_by_email_cache:
return self._dest_user_id_by_email_cache[normalized_email]

try:
response = await self.dest_client.with_retry(
"list_dest_users_by_email_for_group_members",
lambda e=normalized_email: self.dest_client.raw_request(
"GET",
"/v1/user",
params={"email": e, "limit": 100},
),
)

if isinstance(response, dict):
objects = response.get("objects", [])
elif isinstance(response, list):
objects = response
else:
objects = []

dest_user_id = None
for user in objects:
if not isinstance(user, dict):
continue
user_email = user.get("email")
user_id = user.get("id")
if (
isinstance(user_email, str)
and user_email.strip().lower() == normalized_email
and isinstance(user_id, str)
and user_id
):
dest_user_id = user_id
break

self._dest_user_id_by_email_cache[normalized_email] = dest_user_id
return dest_user_id
except Exception:
self._dest_user_id_by_email_cache[normalized_email] = None
return None

async def _invite_user_to_dest_org(self, email: str) -> bool:
"""Invite a user to destination org via organization members API."""
normalized_email = email.strip().lower()
if normalized_email in self._invited_user_emails:
return True

try:
await self.dest_client.with_retry(
"invite_user_to_dest_org_for_group_members",
lambda e=normalized_email: self.dest_client.raw_request(
"PATCH",
"/v1/organization/members",
json={
"invite_users": {
"emails": [e],
"send_invite_emails": False,
}
},
),
)
self._invited_user_emails.add(normalized_email)
self._dest_user_id_by_email_cache.pop(normalized_email, None)
return True
except Exception:
return False

async def _resolve_group_member_user_id(self, source_user_id: str) -> str | None:
"""Resolve group member user_id by source/destination email matching."""
existing_mapping = self.state.id_mapping.get(source_user_id)
if existing_mapping:
return existing_mapping

source_email = await self._get_source_user_email(source_user_id)
if not source_email:
return None

dest_user_id = await self._find_dest_user_id_by_email(source_email)
if not dest_user_id and self._group_auto_invite_enabled():
invited = await self._invite_user_to_dest_org(source_email)
if invited:
post_invite_attempts = 4
for attempt in range(post_invite_attempts):
dest_user_id = await self._find_dest_user_id_by_email(
source_email,
force_refresh=True,
)
if dest_user_id:
break
if attempt < post_invite_attempts - 1:
await asyncio.sleep(0.5 * (attempt + 1))

if dest_user_id:
self.state.id_mapping[source_user_id] = dest_user_id
return dest_user_id
return None

async def get_dependencies(self, resource: dict) -> list[str]:
"""Get list of group IDs that this group depends on.

Expand Down Expand Up @@ -116,18 +273,44 @@ async def migrate_resource(self, resource: dict) -> str:
if resolved_member_groups:
create_params["member_groups"] = resolved_member_groups

# Note: member_users are intentionally excluded from migration
# since users are organization-specific and cannot be migrated
member_users = resource.get("member_users")
if member_users:
self._logger.info(
"Skipping member_users migration - users are organization-specific",
group_id=resource.get("id"),
group_name=resource.get("name"),
user_count=len(member_users),
)
# Remove member_users from create_params to avoid trying to migrate them
create_params.pop("member_users", None)
if self._group_user_mapping_enabled():
resolved_member_users: list[str] = []
unresolved_source_user_ids: list[str] = []
seen_dest_user_ids: set[str] = set()
for source_user_id in member_users:
if not isinstance(source_user_id, str) or not source_user_id:
continue
mapped_user_id = await self._resolve_group_member_user_id(source_user_id)
if mapped_user_id:
if mapped_user_id not in seen_dest_user_ids:
resolved_member_users.append(mapped_user_id)
seen_dest_user_ids.add(mapped_user_id)
else:
unresolved_source_user_ids.append(source_user_id)

if resolved_member_users:
create_params["member_users"] = resolved_member_users
else:
create_params.pop("member_users", None)

if unresolved_source_user_ids:
self._logger.warning(
"Skipping unresolved group member users",
group_id=resource.get("id"),
group_name=resource.get("name"),
unresolved_count=len(unresolved_source_user_ids),
)
else:
self._logger.info(
"Skipping member_users migration - users are organization-specific",
group_id=resource.get("id"),
group_name=resource.get("name"),
user_count=len(member_users),
)
# Remove member_users from create_params to avoid trying to migrate them
create_params.pop("member_users", None)

# Create group using raw API
response = await self.dest_client.with_retry(
Expand Down
Loading