diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json index 5baf549a..96ccfd03 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json @@ -49,6 +49,7 @@ } }, "executions": [], - "scenario": "replica" + "scenario": "replica", + "clustered": false } } diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json index 55749ec6..5e48ef5b 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json @@ -42,6 +42,7 @@ "origin_minion_pool_id": null, "destination_minion_pool_id": null, "instance_osmorphing_minion_pool_mappings": {}, + "clustered": false, "executions": [ { "created_at": "2019-07-11T10:06:47.000000", diff --git a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json index 14c909b3..c94b6164 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json @@ -74,7 +74,8 @@ "instances": {} }, "id": "0460aa4d-6b16-4c98-bd56-27ee186e4a22", - "scenario": "replica" + "scenario": "replica", + "clustered": false } ] } diff --git a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json index 8bcdbbf0..8332a19f 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json @@ -133,7 +133,8 @@ "ubuntu-xenial": "echo 'anything you need'" } }, - "scenario": "replica" + "scenario": "replica", + "clustered": false } } } diff --git a/coriolis/api-refs/source/parameters.yaml b/coriolis/api-refs/source/parameters.yaml index 9fef155a..35d5b34c 100644 --- a/coriolis/api-refs/source/parameters.yaml +++ b/coriolis/api-refs/source/parameters.yaml @@ -123,6 +123,15 @@ connection_info_schema: in: body type: object required: false +clustered: + description: | + Present on transfer responses. ``true`` when more than one instance is + listed (multi-instance scheduling: sync barriers and shared-disk + coordination). Set by the server at creation from ``instances``; not + accepted on create. + in: body + type: boolean + required: false deployment_cancel: description: | Object containing information about the type of deployment cancellation. diff --git a/coriolis/api-refs/source/transfer.inc b/coriolis/api-refs/source/transfer.inc index fa17913b..e9c86d21 100644 --- a/coriolis/api-refs/source/transfer.inc +++ b/coriolis/api-refs/source/transfer.inc @@ -51,6 +51,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer List Response** @@ -111,6 +112,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Show Response** @@ -183,6 +185,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Create Response** diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index fa70e524..eca5dc30 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -1294,6 +1294,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, network_map, storage_mappings, notes=None, user_scripts=None, clone_disks=True, skip_os_morphing=False): + clustered = len(instances) > 1 supported_scenarios = [ constants.TRANSFER_SCENARIO_REPLICA, constants.TRANSFER_SCENARIO_LIVE_MIGRATION] @@ -1330,6 +1331,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, transfer.user_scripts = user_scripts or {} transfer.clone_disks = clone_disks transfer.skip_os_morphing = skip_os_morphing + transfer.clustered = clustered self._check_minion_pools_for_action(ctxt, transfer) @@ -1738,6 +1740,7 @@ def deploy_transfer_instances( deployment.user_scripts = user_scripts deployment.clone_disks = clone_disks deployment.skip_os_morphing = skip_os_morphing + deployment.clustered = bool(getattr(transfer, 'clustered', False)) deployment.deployer_id = wait_for_execution deployment.trust_id = trust_id deployment.last_execution_status = init_status @@ -2091,7 +2094,8 @@ def _cancel_tasks_execution( "of the parent tasks execution.")) elif task.status in ( constants.TASK_STATUS_PENDING, - constants.TASK_STATUS_STARTING): + constants.TASK_STATUS_STARTING, + constants.TASK_STATUS_SYNCING): # any PENDING/STARTING tasks means that they did not have a # host assigned to them yet, and presuming the host does not # start executing the task until it marks itself as the runner, @@ -2185,6 +2189,30 @@ def _cancel_tasks_execution( "No new tasks were started for execution '%s' following " "state advancement after cancellation.", execution.id) + def _abort_peer_sync_barrier_tasks_on_error( + self, ctxt, execution, errored_task, error_message): + """Mark peer tasks stuck in SYNCING on the same barrier as failed.""" + if errored_task.task_type not in constants.TASK_TYPES_TO_SYNC: + return + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + if not bool(getattr(action, "clustered", False)): + return + execution = db_api.get_tasks_execution(ctxt, execution.id) + for peer in execution.tasks: + if peer.id == errored_task.id: + continue + if peer.status != constants.TASK_STATUS_SYNCING: + continue + if peer.task_type != errored_task.task_type: + continue + db_api.set_task_status( + ctxt, peer.id, constants.TASK_STATUS_ERROR, + exception_details=( + "Aborted: peer task '%s' failed during clustered " + "execution. Original error: %s" % ( + errored_task.id, error_message))) + def _update_reservation_fulfillment_for_execution(self, ctxt, execution): """ Updates the reservation fulfillment status for the parent transfer action of the given execution based on its type. @@ -2782,6 +2810,357 @@ def _update_volumes_info_for_deployment_parent_transfer( self._update_transfer_volumes_info( ctxt, transfer_id, instance, updated_task_info) + def _clustered_shared_disk_export_maps(self, action): + """Build per-instance export disk maps and shared cluster identities. + + Returns ``(instance_disk_maps, shared_identities)`` where each + ``instance_disk_maps[instance_id]`` maps ``cluster_disk_identity`` + to that disk's export ``id`` from ``export_info``. + """ + identity_counts = {} + explicitly_shareable = {} + instance_disk_maps = {} + for instance_id in action.instances: + export_info = action.info.get(instance_id, {}).get( + "export_info", {}) + disks = export_info.get("devices", {}).get("disks", []) + disk_map = {} + for disk in disks: + ident = utils.cluster_disk_identity(disk or {}) + if not ident: + continue + disk_map[ident] = disk.get("id") + identity_counts[ident] = identity_counts.get(ident, 0) + 1 + if disk.get("shareable"): + explicitly_shareable[ident] = True + instance_disk_maps[instance_id] = disk_map + + shared_identities = { + ident for ident, count in identity_counts.items() + if count > 1 or explicitly_shareable.get(ident, False)} + return instance_disk_maps, shared_identities + + def _clustered_shared_disk_owners( + self, instance_disk_maps, shared_identities, instance_order): + """Pick one owner instance per shared disk identity. + + The owner is the first entry in ``instance_order`` (transfer instance + list order) that has that disk in ``instance_disk_maps``. + """ + owners = {} + for ident in shared_identities: + for inst in instance_order: + if ident in instance_disk_maps.get(inst, {}): + owners[ident] = inst + break + return owners + + def _promote_clustered_shared_disk_shareable_in_export_info( + self, ctxt, execution): + """Promote shareable on export disks when clustered and shared.""" + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + if not getattr(action, "clustered", False): + return + if not action.instances: + return + + _, shared_idents = self._clustered_shared_disk_export_maps(action) + if not shared_idents: + return + + for instance_id in action.instances: + export_info = action.info.get(instance_id, {}).get("export_info") + if not export_info: + continue + disks = export_info.get("devices", {}).get("disks") + if not disks: + continue + updated = False + for disk in disks: + ident = utils.cluster_disk_identity(disk or {}) + if ident in shared_idents and not disk.get("shareable"): + disk["shareable"] = True + updated = True + if updated: + LOG.info( + "Promoted shareable=True for clustered shared disk(s) in " + "export_info for instance '%s' (action '%s').", + instance_id, execution.action_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"export_info": export_info}) + + def _setup_shared_disk_volumes_info(self, ctxt, execution): + """Configure volumes_info for shared disks before parallel replicate. + + After the DEPLOY_TRANSFER_DISKS sync barrier, non-owner instances get + ``replicate_disk_data=False`` and inherit the owner's volume entry; + ``shareable`` is set where needed and volumes are ordered for attach. + """ + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + + instance_disk_maps, shared_identities = ( + self._clustered_shared_disk_export_maps(action)) + if not shared_identities: + return + + owners = self._clustered_shared_disk_owners( + instance_disk_maps, shared_identities, action.instances) + + owner_volumes_cache = {} + for instance_id in action.instances: + disk_map = instance_disk_maps.get(instance_id, {}) + shared_idents_for_inst = [ + i for i in disk_map if i in shared_identities] + if not shared_idents_for_inst: + continue + + is_owner_for_all = all( + owners.get(i) == instance_id for i in shared_idents_for_inst) + if is_owner_for_all: + continue + + volumes_info = action.info.get(instance_id, {}).get( + "volumes_info", []) + if not volumes_info: + continue + + updated = False + for ident in shared_idents_for_inst: + owner_id = owners.get(ident) + if owner_id == instance_id or not owner_id: + continue + + waiter_disk_id = disk_map[ident] + owner_disk_id = instance_disk_maps.get( + owner_id, {}).get(ident) + if not owner_disk_id: + continue + + if owner_id not in owner_volumes_cache: + owner_volumes_cache[owner_id] = { + utils.cluster_disk_identity(v): v + for v in action.info.get(owner_id, {}).get( + "volumes_info", []) + if utils.cluster_disk_identity(v)} + owner_volume = owner_volumes_cache[owner_id].get(ident) + if not owner_volume: + continue + + for vol in volumes_info: + if utils.cluster_disk_identity(vol) != ident: + continue + inherited = copy.deepcopy(owner_volume) + inherited["disk_id"] = waiter_disk_id + inherited[ + constants.VOLUME_INFO_REPLICATE_DISK_DATA] = False + inherited["shareable"] = True + vol.update(inherited) + updated = True + break + + if updated: + LOG.info( + "Pre-set shared disk volumes_info for instance '%s' " + "to skip replication of shared disks owned by other " + "instances.", instance_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"volumes_info": volumes_info}) + + for owner_id in set(owners.values()): + owner_volumes = action.info.get(owner_id, {}).get( + "volumes_info", []) + owner_updated = False + for vol in owner_volumes: + vol_key = utils.cluster_disk_identity(vol) + if not vol_key or owners.get(vol_key) != owner_id: + continue + if not vol.get("shareable"): + vol["shareable"] = True + owner_updated = True + if owner_updated: + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, owner_id, + {"volumes_info": owner_volumes}) + + for instance_id in action.instances: + disk_map = instance_disk_maps.get(instance_id, {}) + volumes_info = action.info.get(instance_id, {}).get( + "volumes_info", []) + if not volumes_info: + continue + promoted = False + for vol in volumes_info: + vol_key = utils.cluster_disk_identity(vol) + if not vol_key or vol_key not in shared_identities: + continue + if vol_key not in disk_map: + continue + if not vol.get("shareable"): + vol["shareable"] = True + promoted = True + if promoted: + LOG.info( + "Ensured shareable=True on shared-disk volumes for " + "instance '%s' before parallel minion attach.", + instance_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"volumes_info": volumes_info}) + + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + for instance_id in action.instances: + volumes_info = action.info.get(instance_id, {}).get( + "volumes_info", []) + if not volumes_info: + continue + sorted_volumes = sorted( + volumes_info, + key=lambda v: ( + 0 + if ( + v.get("shareable") + or not v.get( + constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)) + else 1)) + if sorted_volumes != volumes_info: + LOG.info( + "Reordered volumes_info for attach (shareable first) for " + "instance '%s'.", instance_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"volumes_info": sorted_volumes}) + + def _sync_shared_disk_replicate_metadata(self, ctxt, execution, action): + """Copy owner change_id onto waiter volumes after REPLICATE barrier.""" + instance_disk_maps, shared_identities = ( + self._clustered_shared_disk_export_maps(action)) + if not shared_identities: + return + + owners = self._clustered_shared_disk_owners( + instance_disk_maps, shared_identities, action.instances) + + owner_volumes_by_disk = {} + for oid in set(owners.values()): + owner_volumes_by_disk[oid] = { + utils.cluster_disk_identity(v): v + for v in action.info.get(oid, {}).get("volumes_info", []) + if utils.cluster_disk_identity(v)} + + for instance_id in action.instances: + volumes_info = action.info.get(instance_id, {}).get( + "volumes_info", []) + if not volumes_info: + continue + disk_map = instance_disk_maps.get(instance_id, {}) + updated = False + for vol in volumes_info: + if vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True): + continue + ident = utils.cluster_disk_identity(vol) + if not ident or ident not in shared_identities: + continue + if ident not in disk_map: + continue + owner_id = owners.get(ident) + if not owner_id or owner_id == instance_id: + continue + owner_disk_id = instance_disk_maps.get(owner_id, {}).get(ident) + if not owner_disk_id: + continue + owner_vol = owner_volumes_by_disk.get(owner_id, {}).get(ident) + if not owner_vol: + continue + if "change_id" in owner_vol and ( + vol.get("change_id") != owner_vol["change_id"]): + vol["change_id"] = owner_vol["change_id"] + updated = True + if updated: + LOG.info( + "Synced shared-disk change_id from owner for instance " + "'%s' after clustered REPLICATE_DISKS barrier.", + instance_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"volumes_info": volumes_info}) + + def _handle_task_sync_barrier(self, ctxt, task, execution, action): + """Implements a cross-instance sync barrier for clustered actions. + + If the completed task's type is in TASK_TYPES_TO_SYNC and the + action is clustered, the task is set to SYNCING instead of + remaining COMPLETED. Once all instances' tasks of the same type + reach SYNCING, _handle_synced_tasks runs the type-specific sync + logic, then all tasks are set to COMPLETED and their execution + states are advanced. + + Returns True if the barrier was activated (caller should return + early), False otherwise. + """ + if task.task_type not in constants.TASK_TYPES_TO_SYNC: + return False + if not bool(getattr(action, "clustered", False)): + return False + + db_api.set_task_status( + ctxt, task.id, constants.TASK_STATUS_SYNCING) + + peer_tasks = [ + t for t in execution.tasks + if t.task_type == task.task_type and t.id != task.id] + all_syncing = all( + t.status == constants.TASK_STATUS_SYNCING for t in peer_tasks) + if not all_syncing: + LOG.info( + "Task '%s' (type '%s') for instance '%s' is now SYNCING. " + "Waiting for peer tasks of other instances to reach SYNCING.", + task.id, task.task_type, task.instance) + return True + + LOG.info( + "All tasks of type '%s' across all instances have reached " + "SYNCING for execution '%s'. Running sync handler.", + task.task_type, execution.id) + + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + self._handle_synced_tasks(ctxt, task.task_type, execution, action) + + synced_tasks = [task] + peer_tasks + for synced_task in synced_tasks: + db_api.set_task_status( + ctxt, synced_task.id, constants.TASK_STATUS_COMPLETED) + + for synced_task in synced_tasks: + self._handle_post_task_actions( + ctxt, synced_task, execution, + action.info.get(synced_task.instance, {})) + self._advance_execution_state( + ctxt, execution, instance=synced_task.instance, requery=True) + + return True + + def _handle_synced_tasks(self, ctxt, task_type, execution, action): + """Runs type-specific logic after all instances reach a sync barrier. + + Called once all tasks of a given type across all instances have + reached SYNCING. The handler can inspect and modify action info + (e.g. volumes_info) for all instances. + """ + if task_type == constants.TASK_TYPE_GET_INSTANCE_INFO: + self._promote_clustered_shared_disk_shareable_in_export_info( + ctxt, execution) + elif task_type == constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS: + self._setup_shared_disk_volumes_info(ctxt, execution) + elif task_type == constants.TASK_TYPE_REPLICATE_DISKS: + self._sync_shared_disk_replicate_metadata( + ctxt, execution, action) + def _handle_post_task_actions(self, ctxt, task, execution, task_info): task_type = task.task_type @@ -3098,6 +3477,11 @@ def task_completed(self, ctxt, task_id, task_result): # NOTE: refresh the execution just in case: execution = db_api.get_tasks_execution(ctxt, task.execution_id) + + if self._handle_task_sync_barrier( + ctxt, task, execution, action): + return + self._handle_post_task_actions( ctxt, task, execution, updated_task_info) @@ -3328,8 +3712,12 @@ def set_task_error(self, ctxt, task_id, exception_details): "Some tasks are running in parallel with the " "OSMorphing task, a debug setup cannot be safely " "achieved. Proceeding with cleanup tasks as usual.") + self._abort_peer_sync_barrier_tasks_on_error( + ctxt, execution, task, exception_details) self._cancel_tasks_execution(ctxt, execution) else: + self._abort_peer_sync_barrier_tasks_on_error( + ctxt, execution, task, exception_details) self._cancel_tasks_execution(ctxt, execution) @task_synchronized diff --git a/coriolis/constants.py b/coriolis/constants.py index 71e55343..dc00a9d3 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -50,11 +50,13 @@ TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK" TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY" TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE" +TASK_STATUS_SYNCING = "SYNCING" ACTIVE_TASK_STATUSES = [ TASK_STATUS_PENDING, TASK_STATUS_STARTING, TASK_STATUS_RUNNING, + TASK_STATUS_SYNCING, TASK_STATUS_CANCELLING, TASK_STATUS_CANCELLING_AFTER_COMPLETION ] @@ -161,6 +163,11 @@ TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION" TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION" +TASK_TYPES_TO_SYNC = [ + TASK_TYPE_GET_INSTANCE_INFO, + TASK_TYPE_DEPLOY_TRANSFER_DISKS, + TASK_TYPE_REPLICATE_DISKS, +] MINION_POOL_OPERATIONS_TASKS = [ TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS, @@ -218,6 +225,7 @@ DISK_FORMAT_QCOW2 = 'qcow2' DISK_FORMAT_VHD = 'vhd' DISK_FORMAT_VHDX = 'vhdx' +VOLUME_INFO_REPLICATE_DISK_DATA = "replicate_disk_data" DISK_ALLOCATION_TYPE_STATIC = "static" DISK_ALLOCATION_TYPE_DYNAMIC = "dynamic" diff --git a/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py new file mode 100644 index 00000000..9e834e85 --- /dev/null +++ b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py @@ -0,0 +1,20 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +import sqlalchemy + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData() + meta.bind = migrate_engine + + base_transfer = sqlalchemy.Table( + 'base_transfer_action', meta, autoload=True) + if 'clustered' in base_transfer.c: + return + # server_default so existing rows get a value when the column is added + # (MySQL stores booleans as TINYINT). + clustered = sqlalchemy.Column( + 'clustered', sqlalchemy.Boolean, nullable=False, + server_default=sqlalchemy.text('0')) + base_transfer.create_column(clustered) diff --git a/coriolis/db/sqlalchemy/models.py b/coriolis/db/sqlalchemy/models.py index d4377999..c379689a 100644 --- a/coriolis/db/sqlalchemy/models.py +++ b/coriolis/db/sqlalchemy/models.py @@ -285,6 +285,10 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase, sqlalchemy.Boolean, nullable=False, default=True) skip_os_morphing = sqlalchemy.Column( sqlalchemy.Boolean, nullable=False, default=False) + # Multi-instance transfer: enables cross-instance sync barriers and + # shared-disk handling. Must be set on INSERT (MySQL NOT NULL). + clustered = sqlalchemy.Column( + sqlalchemy.Boolean, nullable=False, default=False) __mapper_args__ = { 'polymorphic_identity': 'base_transfer_action', @@ -320,6 +324,7 @@ def to_dict(self, include_task_info=True, include_executions=True): "user_scripts": self.user_scripts, "clone_disks": self.clone_disks, "skip_os_morphing": self.skip_os_morphing, + "clustered": bool(self.clustered), } if include_executions: for ex in self.executions: diff --git a/coriolis/schemas/disk_sync_resources_info_schema.json b/coriolis/schemas/disk_sync_resources_info_schema.json index 4a86c375..e3597738 100644 --- a/coriolis/schemas/disk_sync_resources_info_schema.json +++ b/coriolis/schemas/disk_sync_resources_info_schema.json @@ -18,7 +18,7 @@ "description": "String device path (ex: /dev/sdc) from within the temporary minion VM where the disk was attached." } }, - "required": ["disk_id", "volume_dev"], + "required": ["disk_id"], "additionalProperties": true } } diff --git a/coriolis/schemas/vm_export_info_schema.json b/coriolis/schemas/vm_export_info_schema.json index 05f3017e..b9506df1 100644 --- a/coriolis/schemas/vm_export_info_schema.json +++ b/coriolis/schemas/vm_export_info_schema.json @@ -118,9 +118,14 @@ "type": "string", "description": "The allocation scheme for the given disk (static = thick; dynamic = thin)", "enum": ["static", "dynamic"] + }, + "shareable": { + "type": "boolean", + "description": "Whether the disk is shared (multi-writer) and can be attached to multiple VMs simultaneously." } }, "required": [ + "id", "size_bytes" ] } diff --git a/coriolis/tasks/minion_pool_tasks.py b/coriolis/tasks/minion_pool_tasks.py index 75e24571..42431ad9 100644 --- a/coriolis/tasks/minion_pool_tasks.py +++ b/coriolis/tasks/minion_pool_tasks.py @@ -8,6 +8,7 @@ from coriolis import exception from coriolis.providers import factory as providers_factory from coriolis.tasks import base +from coriolis import utils LOG = logging.getLogger(__name__) @@ -477,6 +478,16 @@ class _BaseAttachVolumesToTransferMinionTask( def _get_volumes_info_from_task_info(cls, task_info): return task_info["volumes_info"] + def _run( + self, ctxt, instance, origin, destination, task_info, + event_handler): + export_info = task_info.get("export_info") + if export_info: + utils.apply_export_disk_shareable_metadata_to_volumes_info( + export_info, self._get_volumes_info_from_task_info(task_info)) + return super(_BaseAttachVolumesToTransferMinionTask, self)._run( + ctxt, instance, origin, destination, task_info, event_handler) + @classmethod def get_required_task_info_properties(cls): fields = super( diff --git a/coriolis/tasks/replica_tasks.py b/coriolis/tasks/replica_tasks.py index 0407e8bc..c5dfd22b 100644 --- a/coriolis/tasks/replica_tasks.py +++ b/coriolis/tasks/replica_tasks.py @@ -244,12 +244,27 @@ def _run(self, ctxt, instance, origin, destination, task_info, source_environment = task_info['source_environment'] source_resources = task_info.get('source_resources', {}) - volumes_info = provider.replicate_disks( - ctxt, connection_info, source_environment, instance, - source_resources, migr_source_conn_info, migr_target_conn_info, - volumes_info, incremental) - schemas.validate_value( - volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) + volumes_to_replicate = [ + vol for vol in volumes_info + if vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)] + pre_replicated_volumes = [ + vol for vol in volumes_info + if not vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)] + + if volumes_to_replicate: + replicated_volumes = provider.replicate_disks( + ctxt, connection_info, source_environment, instance, + source_resources, migr_source_conn_info, migr_target_conn_info, + volumes_to_replicate, incremental) + schemas.validate_value( + replicated_volumes, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) + else: + LOG.info( + "No disks marked for replication for instance '%s'. " + "Using pre-provisioned volumes_info.", instance) + replicated_volumes = [] + + volumes_info = pre_replicated_volumes + replicated_volumes volumes_info = _check_ensure_volumes_info_ordering( export_info, volumes_info) @@ -572,6 +587,10 @@ def _run(self, ctxt, instance, origin, destination, task_info, connection_info = base.get_connection_info(ctxt, destination) volumes_info = _get_volumes_info(task_info) + # NOTE: parallel minion attach may run before conductor merges + # shareable. + utils.apply_export_disk_shareable_metadata_to_volumes_info( + export_info, volumes_info) replica_resources_info = provider.deploy_replica_target_resources( ctxt, connection_info, target_environment, volumes_info) diff --git a/coriolis/tests/db/sqlalchemy/test_models.py b/coriolis/tests/db/sqlalchemy/test_models.py index 7b0c7610..3bd4c761 100644 --- a/coriolis/tests/db/sqlalchemy/test_models.py +++ b/coriolis/tests/db/sqlalchemy/test_models.py @@ -283,6 +283,7 @@ def test_to_dict(self): transfer.info = mock.sentinel.info transfer.clone_disks = True transfer.skip_os_morphing = False + transfer.clustered = False expected_result = { "base_id": mock.sentinel.base_id, "user_id": mock.sentinel.user_id, @@ -314,6 +315,7 @@ def test_to_dict(self): "info": mock.sentinel.info, "clone_disks": True, "skip_os_morphing": False, + "clustered": False, } result = transfer.to_dict() diff --git a/coriolis/tests/tasks/test_replica_tasks.py b/coriolis/tests/tasks/test_replica_tasks.py index 1e3207d3..dcfc955a 100644 --- a/coriolis/tests/tasks/test_replica_tasks.py +++ b/coriolis/tests/tasks/test_replica_tasks.py @@ -145,6 +145,8 @@ def test__run(self, mock_unmarshal, mock_check_vol_info, mock_get_vol_info, task_info.get.side_effect = [ task_info['incremental'], task_info['source_resources']] prov_fun = mock_get_provider.return_value.replicate_disks + mock_get_vol_info.return_value = [{"disk_id": "disk_id1"}] + prov_fun.return_value = [{"disk_id": "disk_id1"}] expected_result = {"volumes_info": mock_check_vol_info.return_value} expected_validation_calls = [ mock.call.mock_validate_value( diff --git a/coriolis/utils.py b/coriolis/utils.py index 1bbef33e..a34ab3fd 100644 --- a/coriolis/utils.py +++ b/coriolis/utils.py @@ -196,6 +196,54 @@ def _exec_retry(*args, **kwargs): return _retry_on_error +def normalized_volume_disk_path_key(disk_id): + """Lowercase/stripped path/string key for comparing disk identifiers.""" + if disk_id is None: + return None + s = str(disk_id).strip().lower() + return s if s else None + + +def cluster_disk_identity(disk_id_or_obj): + """Return a stable key for matching the same disk across cluster nodes. + + If ``disk_id_or_obj`` is a mapping with ``cluster_disk_identity`` (set by + the source export provider, e.g. VMware vSphere for VMDK paths), that value + is normalized (strip + lowercase). Otherwise the key is derived from + ``disk_id`` or ``id`` on the mapping, or from a plain string, using the + same generic normalization only. + """ + if isinstance(disk_id_or_obj, dict): + pre = disk_id_or_obj.get("cluster_disk_identity") + if pre is not None and str(pre).strip(): + return normalized_volume_disk_path_key(pre) + disk_id = disk_id_or_obj.get("disk_id") + if disk_id is None: + disk_id = disk_id_or_obj.get("id") + return normalized_volume_disk_path_key(disk_id) + return normalized_volume_disk_path_key(disk_id_or_obj) + + +def apply_export_disk_shareable_metadata_to_volumes_info( + export_info, volumes_info): + """Propagate shareable from export_info disks to volumes_info entries.""" + if not export_info or not volumes_info: + return + disks = export_info.get("devices", {}).get("disks") or [] + share_idents = set() + for d in disks: + if d.get("shareable"): + cid = cluster_disk_identity(d) + if cid: + share_idents.add(cid) + if not share_idents: + return + for vol in volumes_info: + cid = cluster_disk_identity(vol) + if cid and cid in share_idents: + vol["shareable"] = True + + def get_udev_net_rules(net_ifaces_info): content = "" for name, mac_address in net_ifaces_info.items():