diff --git a/coriolis/providers/backup_writers.py b/coriolis/providers/backup_writers.py index 3aa0300c..ccb99d2c 100644 --- a/coriolis/providers/backup_writers.py +++ b/coriolis/providers/backup_writers.py @@ -31,6 +31,11 @@ cfg.BoolOpt('compress_transfers', default=True, help='Use compression if possible during disk transfers'), + cfg.IntOpt('disk_checksum_timeout', + default=3600, + help='Maximum number of seconds to wait for a disk checksum ' + 'job to complete on the backup writer. Larger disks may ' + 'require a higher value.'), ] CONF.register_opts(opts) _CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer" @@ -66,6 +71,10 @@ 15: "ERR_OUT_OF_BOUDS", } +_CHECKSUM_JOB_POLL_INTERVAL = 15 # seconds between writer checksum job polls +_CHECKSUM_JOB_FINISHED = "finished" +_CHECKSUM_JOB_FAILED = "failed" + def _disable_lvm2_lvmetad(ssh): """Disables lvm2-lvmetad service. This service is responsible @@ -193,6 +202,10 @@ def write(self, data): def close(self): pass + def get_disk_checksum(self, algorithm): + """Returns the destination disk checksum, or None if unsupported.""" + return None + class BaseBackupWriter(with_metaclass(abc.ABCMeta)): @abc.abstractmethod @@ -735,6 +748,103 @@ def _wait_for_queues(self): LOG.info("Waiting for unfinished transfers to complete") time.sleep(0.5) + def _create_checksum_job(self, algorithm, start_offset=0, end_offset=0): + """Creates a full-disk checksum job on the writer. + + The device must already be acquired. + + :param algorithm: Checksumming algorithm to use. + :param start_offset: Checksumming starts from this offset. + :param end_offset: Checksumming stops at this offset. If it is 0, + the end of the disk is considered instead. + :returns: job ID string. + """ + self._ensure_session() + uri = "%s/checksumJob" % self._uri + headers = {"X-Client-Token": self._id} + body = { + "start_offset": start_offset, + "end_offset": end_offset, + "checksum_algorithm": algorithm, + } + + resp = self._session.post( + uri, headers=headers, json=body, + timeout=CONF.default_requests_timeout) + resp.raise_for_status() + + return resp.json()["job_id"] + + def _get_checksum_job_status(self, job_id): + """Returns the current status of a writer checksum job.""" + self._ensure_session() + uri = "%s/checksumJob/%s" % (self._uri, job_id) + + resp = self._session.get( + uri, timeout=CONF.default_requests_timeout) + resp.raise_for_status() + + return resp.json() + + def _delete_checksum_job(self, job_id): + """Deletes a writer checksum job.""" + self._ensure_session() + uri = "%s/checksumJob/%s" % (self._uri, job_id) + + resp = self._session.delete( + uri, timeout=CONF.default_requests_timeout) + resp.raise_for_status() + + def get_disk_checksum(self, algorithm, start_offset=0, end_offset=0): + """Computes and returns the checksum of the entire destination disk. + + Must be called while the device is acquired (inside open() context). + Flushes all pending writes before starting the checksum job. + + :param algorithm: Checksumming algorithm to use. + :param start_offset: Checksumming starts from this offset. + :param end_offset: Checksumming stops at this offset. If it is 0, + the end of the disk is considered instead. + :returns: dict with 'checksum' and 'algorithm' keys. + """ + self._wait_for_queues() + if self._exception: + raise exception.CoriolisException( + "Cannot checksum disk '%s', pending write error: %s" % ( + self._path, self._exception)) + + timeout = CONF.disk_checksum_timeout + deadline = time.monotonic() + timeout + job_id = self._create_checksum_job(algorithm) + try: + while True: + status = self._get_checksum_job_status(job_id) + execution_status = status.get("execution_status") + if execution_status == _CHECKSUM_JOB_FINISHED: + return { + "checksum": status["checksum_value"], + "algorithm": status["checksum_algorithm"], + } + + if execution_status == _CHECKSUM_JOB_FAILED: + raise exception.CoriolisException( + "Checksum job failed for disk '%s': %s" % ( + self._path, status.get("error_message", ""))) + + if time.monotonic() >= deadline: + raise exception.CoriolisException( + "Timed out waiting for checksum job for disk '%s' " + "after %d seconds." % (self._path, timeout)) + + time.sleep(_CHECKSUM_JOB_POLL_INTERVAL) + finally: + try: + self._delete_checksum_job(job_id) + except Exception: + LOG.warning( + "Failed to delete checksum job %s for disk %s", + job_id, self._path) + def close(self): self._closing = True self._wait_for_queues() diff --git a/coriolis/providers/replicator.py b/coriolis/providers/replicator.py index b4df88ff..88f0b5d2 100644 --- a/coriolis/providers/replicator.py +++ b/coriolis/providers/replicator.py @@ -204,6 +204,22 @@ def get_disk_size(self, disk): info.raise_for_status() return int(info.headers["Content-Length"]) + @utils.retry_on_error() + def get_disk_checksum(self, device): + """Returns the total checksum of the given disk. + + :raises HTTPError: with HTTP 409 status if checksumming has not + completed yet. + :returns: dict with 'checksum' and 'algorithm' keys. + """ + uri = "%s/api/v1/dev/%s/checksum" % (self._base_uri, device) + + resp = self._cli.get( + uri, timeout=CONF.replicator.default_requests_timeout) + resp.raise_for_status() + + return resp.json() + @utils.retry_on_error() def download_chunk(self, disk, chunk): diskUri = self.raw_disk_uri(disk) @@ -768,7 +784,44 @@ def _find_vol_state(self, name, state): return vol return None - def replicate_disks(self, source_volumes_info, backup_writer): + def _verify_disk_checksum(self, dev_name, destination): + """Compares source and destination checksums for a transferred disk. + + Must be called while the device is still acquired on the writer side. + + :raises CoriolisException: if the checksum algorithms do not match, or + if the checksums do not match. + """ + self._event_manager.progress_update( + "Verifying disk integrity for /dev/%s" % dev_name) + source = self._cli.get_disk_checksum(dev_name) + end_offset = self._cli.get_disk_size(dev_name) + writer = destination.get_disk_checksum( + source["algorithm"], end_offset=end_offset) + if writer is None: + self._event_manager.progress_update( + "Disk integrity check skipped for /dev/%s " + "(writer does not support checksums)" % dev_name) + return + + if source["algorithm"] != writer["algorithm"]: + raise exception.CoriolisException( + "Checksum algorithm mismatch for disk '%s': " + "source=%s, destination=%s" % ( + dev_name, source["algorithm"], writer["algorithm"])) + + if source["checksum"] != writer["checksum"]: + raise exception.CoriolisException( + "Checksum mismatch for disk '%s': " + "source=%s, destination=%s" % ( + dev_name, source["checksum"], writer["checksum"])) + + self._event_manager.progress_update( + "Disk integrity verified for /dev/%s (checksum: %s)" % ( + dev_name, source["checksum"])) + + def replicate_disks( + self, source_volumes_info, backup_writer, verify_checksum=False): """ Fetch the block diff and send it to the backup_writer. If the target_is_zeroed parameter is set to True, on initial @@ -845,6 +898,10 @@ def replicate_disks(self, source_volumes_info, backup_writer): total += 1 self._event_manager.set_percentage_step( perc_step, total) + + if verify_checksum: + self._verify_disk_checksum(devName, destination) + dst_vol["replica_state"] = state_for_vol self._repl_state = curr_state diff --git a/coriolis/resources/bin/replicator b/coriolis/resources/bin/replicator index 7af42a65..3189f0db 100755 Binary files a/coriolis/resources/bin/replicator and b/coriolis/resources/bin/replicator differ diff --git a/coriolis/tests/providers/test_backup_writers.py b/coriolis/tests/providers/test_backup_writers.py index 073b7d19..f21a7309 100644 --- a/coriolis/tests/providers/test_backup_writers.py +++ b/coriolis/tests/providers/test_backup_writers.py @@ -1061,6 +1061,153 @@ def test_close_with_exception(self, mock_release, mock_wait_for_queues): level=logging.ERROR): self.assertRaises(exception.CoriolisException, self.writer.close) + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri', + new_callable=mock.PropertyMock) + @mock.patch.object(backup_writers, 'CONF') + def test__create_checksum_job(self, mock_conf, mock_uri, + mock_ensure_session): + self.writer._set_info(self.info) + mock_uri.return_value = "https://host:port/api/v2/device/b64path" + self.writer._session = mock.MagicMock() + mock_resp = mock.MagicMock() + mock_resp.json.return_value = {"job_id": "test-job-id"} + self.writer._session.post.return_value = mock_resp + + result = self.writer._create_checksum_job("sha256") + + self.assertEqual("test-job-id", result) + self.writer._session.post.assert_called_once_with( + "https://host:port/api/v2/device/b64path/checksumJob", + headers={"X-Client-Token": self.info["id"]}, + json={"start_offset": 0, "end_offset": 0, + "checksum_algorithm": "sha256"}, + timeout=mock_conf.default_requests_timeout) + mock_resp.raise_for_status.assert_called_once() + + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri', + new_callable=mock.PropertyMock) + @mock.patch.object(backup_writers, 'CONF') + def test__get_checksum_job_status(self, mock_conf, mock_uri, + mock_ensure_session): + self.writer._set_info(self.info) + mock_uri.return_value = "https://host:port/api/v2/device/b64path" + self.writer._session = mock.MagicMock() + mock_resp = mock.MagicMock() + self.writer._session.get.return_value = mock_resp + + result = self.writer._get_checksum_job_status("test-job-id") + + self.assertEqual(result, mock_resp.json.return_value) + self.writer._session.get.assert_called_once_with( + "https://host:port/api/v2/device/b64path/checksumJob/test-job-id", + timeout=mock_conf.default_requests_timeout) + mock_resp.raise_for_status.assert_called_once() + + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri', + new_callable=mock.PropertyMock) + @mock.patch.object(backup_writers, 'CONF') + def test__delete_checksum_job(self, mock_conf, mock_uri, + mock_ensure_session): + self.writer._set_info(self.info) + mock_uri.return_value = "https://host:port/api/v2/device/b64path" + self.writer._session = mock.MagicMock() + mock_resp = mock.MagicMock() + self.writer._session.delete.return_value = mock_resp + + self.writer._delete_checksum_job("test-job-id") + + self.writer._session.delete.assert_called_once_with( + "https://host:port/api/v2/device/b64path/checksumJob/test-job-id", + timeout=mock_conf.default_requests_timeout) + mock_resp.raise_for_status.assert_called_once() + + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_delete_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_get_checksum_job_status') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_create_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues') + def test_get_disk_checksum(self, mock_wait, mock_create, mock_status, + mock_delete): + self.writer._set_info(self.info) + mock_create.return_value = "test-job-id" + mock_status.return_value = { + "execution_status": "finished", + "checksum_value": "abc123", + "checksum_algorithm": "sha256", + } + + result = self.writer.get_disk_checksum("sha256") + + self.assertEqual({"checksum": "abc123", "algorithm": "sha256"}, result) + mock_wait.assert_called_once() + mock_create.assert_called_once_with("sha256") + mock_delete.assert_called_once_with("test-job-id") + + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_delete_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_get_checksum_job_status') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_create_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues') + def test_get_disk_checksum_job_failed(self, mock_wait, mock_create, + mock_status, mock_delete): + self.writer._set_info(self.info) + mock_create.return_value = "test-job-id" + mock_status.return_value = { + "execution_status": "failed", + "error_message": "disk error", + } + + self.assertRaises( + exception.CoriolisException, + self.writer.get_disk_checksum, "sha256") + mock_delete.assert_called_once_with("test-job-id") + + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_delete_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_create_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues') + def test_get_disk_checksum_write_error(self, mock_wait, mock_create, + mock_delete): + self.writer._set_info(self.info) + self.writer._exception = exception.CoriolisException("write failed") + + self.assertRaises( + exception.CoriolisException, + self.writer.get_disk_checksum, "sha256") + mock_create.assert_not_called() + mock_delete.assert_not_called() + + @mock.patch('coriolis.providers.backup_writers.time.sleep') + @mock.patch('coriolis.providers.backup_writers.time.monotonic') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_delete_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_get_checksum_job_status') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, + '_create_checksum_job') + @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues') + def test_get_disk_checksum_timeout(self, mock_wait, mock_create, + mock_status, mock_delete, + mock_monotonic, mock_sleep): + self.writer._set_info(self.info) + mock_create.return_value = "test-job-id" + mock_status.return_value = {"execution_status": "running"} + # First call sets the deadline; second call (after the poll) exceeds it + mock_monotonic.side_effect = [0, 3601] + + self.assertRaises( + exception.CoriolisException, + self.writer.get_disk_checksum, "sha256") + mock_delete.assert_called_once_with("test-job-id") + class HTTPBackupWriterBootstrapperTestcase(test_base.CoriolisBaseTestCase): """Test suite for the Coriolis HTTPBackupWriterBootstrapper class.""" diff --git a/coriolis/tests/providers/test_replicator.py b/coriolis/tests/providers/test_replicator.py index 77db4e60..919a5776 100644 --- a/coriolis/tests/providers/test_replicator.py +++ b/coriolis/tests/providers/test_replicator.py @@ -312,6 +312,22 @@ def test_download_chunk_no_compression(self): ) self.mock_response.raise_for_status.assert_called_once() + def test_get_disk_checksum(self): + self.client._cli.get.return_value = self.mock_response + + original_get_disk_checksum = testutils.get_wrapped_function( + self.client.get_disk_checksum) + + result = original_get_disk_checksum(self.client, self.device) + + self.assertEqual(result, self.mock_response.json.return_value) + self.client._cli.get.assert_called_once_with( + "https://%s:%s/api/v1/dev/%s/checksum" % ( + self.ip, self.port, self.device), + timeout=replicator_module.CONF.replicator.default_requests_timeout, + ) + self.mock_response.raise_for_status.assert_called_once() + class ReplicatorTestCase(test_base.CoriolisBaseTestCase): """Test suite for the Coriolis Replicator class.""" @@ -1090,8 +1106,82 @@ def test__find_vol_state_not_found(self): self.assertIsNone(result) + def test__verify_disk_checksum(self): + checksum = {"checksum": "abc123", "algorithm": "sha256"} + self.replicator._cli.get_disk_checksum.return_value = checksum + mock_destination = mock.MagicMock() + mock_destination.get_disk_checksum.return_value = checksum + + self.replicator._verify_disk_checksum("sdb", mock_destination) + + self.replicator._cli.get_disk_checksum.assert_called_once_with("sdb") + mock_get_disk_size = self.replicator._cli.get_disk_size + mock_get_disk_size.assert_called_once_with("sdb") + mock_destination.get_disk_checksum.assert_called_once_with( + "sha256", end_offset=mock_get_disk_size.return_value) + + def test__verify_disk_checksum_value_mismatch(self): + self.replicator._cli.get_disk_checksum.return_value = { + "checksum": "abc123", "algorithm": "sha256"} + mock_destination = mock.MagicMock() + mock_destination.get_disk_checksum.return_value = { + "checksum": "different", "algorithm": "sha256"} + + self.assertRaises( + exception.CoriolisException, + self.replicator._verify_disk_checksum, + "sdb", mock_destination) + + def test__verify_disk_checksum_algorithm_mismatch(self): + self.replicator._cli.get_disk_checksum.return_value = { + "checksum": "abc123", "algorithm": "sha256"} + mock_destination = mock.MagicMock() + mock_destination.get_disk_checksum.return_value = { + "checksum": "abc123", "algorithm": "xxhash"} + + self.assertRaises( + exception.CoriolisException, + self.replicator._verify_disk_checksum, + "sdb", mock_destination) + + def test__verify_disk_checksum_not_supported(self): + self.replicator._cli.get_disk_checksum.return_value = { + "checksum": "abc123", "algorithm": "sha256"} + mock_destination = mock.MagicMock() + mock_destination.get_disk_checksum.return_value = None + + self.replicator._verify_disk_checksum("sdb", mock_destination) + + self.replicator._cli.get_disk_checksum.assert_called_once_with("sdb") + mock_get_disk_size = self.replicator._cli.get_disk_size + mock_get_disk_size.assert_called_once_with("sdb") + mock_destination.get_disk_checksum.assert_called_once_with( + "sha256", end_offset=mock_get_disk_size.return_value) + + @mock.patch.object(replicator_module.Replicator, '_verify_disk_checksum') + @mock.patch.object(replicator_module, 'Client') + def test_replicate_disks_calls_verify_checksum( + self, mock_Client, mock_verify): + self.replicator._cli = mock_Client.return_value + self.replicator._cli.get_changes.return_value = [ + {'length': 100, 'offset': 0}] + self.replicator._volumes_info = [ + {"disk_id": "test_disk", "disk_path": "/dev/sdb"}] + source_volumes_info = [ + {"disk_id": "test_disk", "disk_path": "/dev/sdb"}] + self.replicator._repl_state = ['non-empty'] + mock_destination = mock.MagicMock(spec=['seek', 'write']) + self.backup_writer.open.return_value.__enter__.return_value = ( + mock_destination) + + self.replicator.replicate_disks( + source_volumes_info, self.backup_writer, True) + + mock_verify.assert_called_once_with("sdb", mock_destination) + + @mock.patch.object(replicator_module.Replicator, '_verify_disk_checksum') @mock.patch.object(replicator_module, 'Client') - def test_replicate_disks(self, mock_Client): + def test_replicate_disks(self, mock_Client, mock_verify): self.replicator._cli = mock_Client.return_value self.replicator._cli.get_changes.return_value = [ {'length': 100, 'offset': 0}, {'length': 200, 'offset': 100}] @@ -1127,10 +1217,11 @@ def test_replicate_disks_with_nonexistent_volume(self): self.replicator.replicate_disks, source_volumes_info, self.backup_writer) + @mock.patch.object(replicator_module.Replicator, '_verify_disk_checksum') @mock.patch.object(replicator_module.Replicator, '_find_vol_state') @mock.patch.object(replicator_module, 'Client') def test_replicate_disks_initial_sync(self, mock_Client, - mock_find_vol_state): + mock_find_vol_state, mock_verify): self.replicator._cli = mock_Client.return_value self.replicator._cli.get_changes.return_value = [