Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions coriolis/providers/backup_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
cfg.BoolOpt('compress_transfers',
default=True,
help='Use compression if possible during disk transfers'),
cfg.IntOpt('disk_checksum_timeout',
default=3600,
help='Maximum number of seconds to wait for a disk checksum '
'job to complete on the backup writer. Larger disks may '
'require a higher value.'),
]
CONF.register_opts(opts)
_CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer"
Expand Down Expand Up @@ -66,6 +71,10 @@
15: "ERR_OUT_OF_BOUDS",
}

_CHECKSUM_JOB_POLL_INTERVAL = 15 # seconds between writer checksum job polls
_CHECKSUM_JOB_FINISHED = "finished"
_CHECKSUM_JOB_FAILED = "failed"


def _disable_lvm2_lvmetad(ssh):
"""Disables lvm2-lvmetad service. This service is responsible
Expand Down Expand Up @@ -193,6 +202,10 @@ def write(self, data):
def close(self):
pass

def get_disk_checksum(self, algorithm):
"""Returns the destination disk checksum, or None if unsupported."""
return None


class BaseBackupWriter(with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
Expand Down Expand Up @@ -735,6 +748,103 @@ def _wait_for_queues(self):
LOG.info("Waiting for unfinished transfers to complete")
time.sleep(0.5)

def _create_checksum_job(self, algorithm, start_offset=0, end_offset=0):
"""Creates a full-disk checksum job on the writer.

The device must already be acquired.

:param algorithm: Checksumming algorithm to use.
:param start_offset: Checksumming starts from this offset.
:param end_offset: Checksumming stops at this offset. If it is 0,
the end of the disk is considered instead.
:returns: job ID string.
"""
self._ensure_session()
uri = "%s/checksumJob" % self._uri
headers = {"X-Client-Token": self._id}
body = {
"start_offset": start_offset,
"end_offset": end_offset,
"checksum_algorithm": algorithm,
}

resp = self._session.post(
uri, headers=headers, json=body,
timeout=CONF.default_requests_timeout)
resp.raise_for_status()

return resp.json()["job_id"]

def _get_checksum_job_status(self, job_id):
"""Returns the current status of a writer checksum job."""
self._ensure_session()
uri = "%s/checksumJob/%s" % (self._uri, job_id)

resp = self._session.get(
uri, timeout=CONF.default_requests_timeout)
resp.raise_for_status()

return resp.json()

def _delete_checksum_job(self, job_id):
"""Deletes a writer checksum job."""
self._ensure_session()
uri = "%s/checksumJob/%s" % (self._uri, job_id)

resp = self._session.delete(
uri, timeout=CONF.default_requests_timeout)
resp.raise_for_status()

def get_disk_checksum(self, algorithm, start_offset=0, end_offset=0):
"""Computes and returns the checksum of the entire destination disk.

Must be called while the device is acquired (inside open() context).
Flushes all pending writes before starting the checksum job.

:param algorithm: Checksumming algorithm to use.
:param start_offset: Checksumming starts from this offset.
:param end_offset: Checksumming stops at this offset. If it is 0,
the end of the disk is considered instead.
:returns: dict with 'checksum' and 'algorithm' keys.
"""
self._wait_for_queues()
if self._exception:
raise exception.CoriolisException(
"Cannot checksum disk '%s', pending write error: %s" % (
self._path, self._exception))

timeout = CONF.disk_checksum_timeout
deadline = time.monotonic() + timeout
job_id = self._create_checksum_job(algorithm)
try:
while True:
status = self._get_checksum_job_status(job_id)
execution_status = status.get("execution_status")
if execution_status == _CHECKSUM_JOB_FINISHED:
return {
"checksum": status["checksum_value"],
"algorithm": status["checksum_algorithm"],
}

if execution_status == _CHECKSUM_JOB_FAILED:
raise exception.CoriolisException(
"Checksum job failed for disk '%s': %s" % (
self._path, status.get("error_message", "")))

if time.monotonic() >= deadline:
raise exception.CoriolisException(
"Timed out waiting for checksum job for disk '%s' "
"after %d seconds." % (self._path, timeout))

time.sleep(_CHECKSUM_JOB_POLL_INTERVAL)
finally:
try:
self._delete_checksum_job(job_id)
except Exception:
LOG.warning(
"Failed to delete checksum job %s for disk %s",
job_id, self._path)

def close(self):
self._closing = True
self._wait_for_queues()
Expand Down
59 changes: 58 additions & 1 deletion coriolis/providers/replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ def get_disk_size(self, disk):
info.raise_for_status()
return int(info.headers["Content-Length"])

@utils.retry_on_error()
def get_disk_checksum(self, device):
"""Returns the total checksum of the given disk.

:raises HTTPError: with HTTP 409 status if checksumming has not
completed yet.
:returns: dict with 'checksum' and 'algorithm' keys.
"""
uri = "%s/api/v1/dev/%s/checksum" % (self._base_uri, device)

resp = self._cli.get(
uri, timeout=CONF.replicator.default_requests_timeout)
resp.raise_for_status()

return resp.json()

@utils.retry_on_error()
def download_chunk(self, disk, chunk):
diskUri = self.raw_disk_uri(disk)
Expand Down Expand Up @@ -768,7 +784,44 @@ def _find_vol_state(self, name, state):
return vol
return None

def replicate_disks(self, source_volumes_info, backup_writer):
def _verify_disk_checksum(self, dev_name, destination):
"""Compares source and destination checksums for a transferred disk.

Must be called while the device is still acquired on the writer side.

:raises CoriolisException: if the checksum algorithms do not match, or
if the checksums do not match.
"""
self._event_manager.progress_update(
"Verifying disk integrity for /dev/%s" % dev_name)
source = self._cli.get_disk_checksum(dev_name)
end_offset = self._cli.get_disk_size(dev_name)
writer = destination.get_disk_checksum(
source["algorithm"], end_offset=end_offset)
if writer is None:
self._event_manager.progress_update(
"Disk integrity check skipped for /dev/%s "
"(writer does not support checksums)" % dev_name)
return

if source["algorithm"] != writer["algorithm"]:
raise exception.CoriolisException(
"Checksum algorithm mismatch for disk '%s': "
"source=%s, destination=%s" % (
dev_name, source["algorithm"], writer["algorithm"]))

if source["checksum"] != writer["checksum"]:
raise exception.CoriolisException(
"Checksum mismatch for disk '%s': "
"source=%s, destination=%s" % (
dev_name, source["checksum"], writer["checksum"]))

self._event_manager.progress_update(
"Disk integrity verified for /dev/%s (checksum: %s)" % (
dev_name, source["checksum"]))

def replicate_disks(
self, source_volumes_info, backup_writer, verify_checksum=False):
"""
Fetch the block diff and send it to the backup_writer.
If the target_is_zeroed parameter is set to True, on initial
Expand Down Expand Up @@ -845,6 +898,10 @@ def replicate_disks(self, source_volumes_info, backup_writer):
total += 1
self._event_manager.set_percentage_step(
perc_step, total)

if verify_checksum:
self._verify_disk_checksum(devName, destination)

dst_vol["replica_state"] = state_for_vol

self._repl_state = curr_state
Expand Down
Binary file modified coriolis/resources/bin/replicator
Binary file not shown.
147 changes: 147 additions & 0 deletions coriolis/tests/providers/test_backup_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,153 @@ def test_close_with_exception(self, mock_release, mock_wait_for_queues):
level=logging.ERROR):
self.assertRaises(exception.CoriolisException, self.writer.close)

@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
new_callable=mock.PropertyMock)
@mock.patch.object(backup_writers, 'CONF')
def test__create_checksum_job(self, mock_conf, mock_uri,
mock_ensure_session):
self.writer._set_info(self.info)
mock_uri.return_value = "https://host:port/api/v2/device/b64path"
self.writer._session = mock.MagicMock()
mock_resp = mock.MagicMock()
mock_resp.json.return_value = {"job_id": "test-job-id"}
self.writer._session.post.return_value = mock_resp

result = self.writer._create_checksum_job("sha256")

self.assertEqual("test-job-id", result)
self.writer._session.post.assert_called_once_with(
"https://host:port/api/v2/device/b64path/checksumJob",
headers={"X-Client-Token": self.info["id"]},
json={"start_offset": 0, "end_offset": 0,
"checksum_algorithm": "sha256"},
timeout=mock_conf.default_requests_timeout)
mock_resp.raise_for_status.assert_called_once()

@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
new_callable=mock.PropertyMock)
@mock.patch.object(backup_writers, 'CONF')
def test__get_checksum_job_status(self, mock_conf, mock_uri,
mock_ensure_session):
self.writer._set_info(self.info)
mock_uri.return_value = "https://host:port/api/v2/device/b64path"
self.writer._session = mock.MagicMock()
mock_resp = mock.MagicMock()
self.writer._session.get.return_value = mock_resp

result = self.writer._get_checksum_job_status("test-job-id")

self.assertEqual(result, mock_resp.json.return_value)
self.writer._session.get.assert_called_once_with(
"https://host:port/api/v2/device/b64path/checksumJob/test-job-id",
timeout=mock_conf.default_requests_timeout)
mock_resp.raise_for_status.assert_called_once()

@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
new_callable=mock.PropertyMock)
@mock.patch.object(backup_writers, 'CONF')
def test__delete_checksum_job(self, mock_conf, mock_uri,
mock_ensure_session):
self.writer._set_info(self.info)
mock_uri.return_value = "https://host:port/api/v2/device/b64path"
self.writer._session = mock.MagicMock()
mock_resp = mock.MagicMock()
self.writer._session.delete.return_value = mock_resp

self.writer._delete_checksum_job("test-job-id")

self.writer._session.delete.assert_called_once_with(
"https://host:port/api/v2/device/b64path/checksumJob/test-job-id",
timeout=mock_conf.default_requests_timeout)
mock_resp.raise_for_status.assert_called_once()

@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_delete_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_get_checksum_job_status')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_create_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
def test_get_disk_checksum(self, mock_wait, mock_create, mock_status,
mock_delete):
self.writer._set_info(self.info)
mock_create.return_value = "test-job-id"
mock_status.return_value = {
"execution_status": "finished",
"checksum_value": "abc123",
"checksum_algorithm": "sha256",
}

result = self.writer.get_disk_checksum("sha256")

self.assertEqual({"checksum": "abc123", "algorithm": "sha256"}, result)
mock_wait.assert_called_once()
mock_create.assert_called_once_with("sha256")
mock_delete.assert_called_once_with("test-job-id")

@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_delete_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_get_checksum_job_status')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_create_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
def test_get_disk_checksum_job_failed(self, mock_wait, mock_create,
mock_status, mock_delete):
self.writer._set_info(self.info)
mock_create.return_value = "test-job-id"
mock_status.return_value = {
"execution_status": "failed",
"error_message": "disk error",
}

self.assertRaises(
exception.CoriolisException,
self.writer.get_disk_checksum, "sha256")
mock_delete.assert_called_once_with("test-job-id")

@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_delete_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_create_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
def test_get_disk_checksum_write_error(self, mock_wait, mock_create,
mock_delete):
self.writer._set_info(self.info)
self.writer._exception = exception.CoriolisException("write failed")

self.assertRaises(
exception.CoriolisException,
self.writer.get_disk_checksum, "sha256")
mock_create.assert_not_called()
mock_delete.assert_not_called()

@mock.patch('coriolis.providers.backup_writers.time.sleep')
@mock.patch('coriolis.providers.backup_writers.time.monotonic')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_delete_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_get_checksum_job_status')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
'_create_checksum_job')
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
def test_get_disk_checksum_timeout(self, mock_wait, mock_create,
mock_status, mock_delete,
mock_monotonic, mock_sleep):
self.writer._set_info(self.info)
mock_create.return_value = "test-job-id"
mock_status.return_value = {"execution_status": "running"}
# First call sets the deadline; second call (after the poll) exceeds it
mock_monotonic.side_effect = [0, 3601]

self.assertRaises(
exception.CoriolisException,
self.writer.get_disk_checksum, "sha256")
mock_delete.assert_called_once_with("test-job-id")


class HTTPBackupWriterBootstrapperTestcase(test_base.CoriolisBaseTestCase):
"""Test suite for the Coriolis HTTPBackupWriterBootstrapper class."""
Expand Down
Loading
Loading