Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini
# Install unzip for SonarQube Scan
apt-get update
apt-get install unzip -y
apt-get install -y unzip gnupg

- name: Setup extension
continue-on-error: ${{ matrix.experimental }}
Expand Down
13 changes: 13 additions & 0 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def xloader_submit(context, data_dict):
:param ignore_hash: If set to True, the xloader will reload the file
even if it haven't changed. (optional, default: False)
:type ignore_hash: bool
:param ignore_pending: If set to True, submit even when
``cloudstorage_multipart_pending`` is set (e.g. stuck flag after a
failed clear). (optional, default: False)
:type ignore_pending: bool

Returns ``True`` if the job has been submitted and ``False`` if the job
has not been submitted, i.e. when ckanext-xloader is not configured.
Expand All @@ -61,6 +65,15 @@ def xloader_submit(context, data_dict):
except p.toolkit.ObjectNotFound:
return False

if p.toolkit.asbool(resource_dict.get('cloudstorage_multipart_pending', False)):
if not p.toolkit.asbool(data_dict.get("ignore_pending", False)):
log.debug(
"Deferring xloading resource %s because a cloudstorage "
"multipart upload is still in progress.",
resource_dict.get("id"),
)
return False

for plugin in p.PluginImplementations(xloader_interfaces.IXloader):
upload = plugin.can_upload(res_id)
if not upload:
Expand Down
1 change: 1 addition & 0 deletions ckanext/xloader/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def xloader_submit_schema():
'id': [ignore_missing],
'set_url_type': [ignore_missing, boolean_validator],
'ignore_hash': [ignore_missing, boolean_validator],
'ignore_pending': [ignore_missing, boolean_validator],
'__junk': [empty],
'__before': [dsschema.rename('id', 'resource_id')]
}
Expand Down
48 changes: 48 additions & 0 deletions ckanext/xloader/tests/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,54 @@ def test_submit(self):
)
assert 1 == enqueue_mock.call_count

def test_submit_defers_when_cloudstorage_multipart_pending(self):
user = factories.User()
# Invalid format so resource_create does not enqueue via plugin
res = factories.Resource(user=user, format="aaa")
with mock.patch(
"ckanext.xloader.action.enqueue_job",
return_value=mock.MagicMock(id=123),
) as enqueue_mock:
helpers.call_action(
"resource_patch",
{"user": user["name"]},
id=res["id"],
format="csv",
cloudstorage_multipart_pending="True",
)
assert enqueue_mock.call_count == 0
out = helpers.call_action(
"xloader_submit",
context=dict(user=user["name"]),
resource_id=res["id"],
)
assert out is False
assert enqueue_mock.call_count == 0

def test_submit_with_ignore_pending_when_multipart_pending(self):
user = factories.User()
res = factories.Resource(user=user, format="aaa")
with mock.patch(
"ckanext.xloader.action.enqueue_job",
return_value=mock.MagicMock(id=123),
) as enqueue_mock:
helpers.call_action(
"resource_patch",
{"user": user["name"]},
id=res["id"],
format="csv",
cloudstorage_multipart_pending="True",
)
assert enqueue_mock.call_count == 0
out = helpers.call_action(
"xloader_submit",
context=dict(user=user["name"]),
resource_id=res["id"],
ignore_pending=True,
)
assert out is True
assert enqueue_mock.call_count == 1

def test_submit_to_custom_queue_without_auth(self):
# check that xloader_submit doesn't allow regular users to change queues
user = factories.User()
Expand Down
53 changes: 53 additions & 0 deletions ckanext/xloader/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,59 @@ def test_submit_on_resource_create(self, monkeypatch):

assert func.called

def test_skip_when_cloudstorage_multipart_pending(self):
"""xloader must not submit while a cloudstorage multipart upload is
still in flight. When the flag is later cleared (e.g. by
cloudstorage_finish_multipart calling resource_patch) xloader should
submit."""
with mock.patch(
"ckanext.xloader.action.enqueue_job",
return_value=mock.MagicMock(id=123),
) as enqueue_mock:
dataset = factories.Dataset()

resource = helpers.call_action(
"resource_create",
{},
package_id=dataset["id"],
url="http://example.com/file.csv",
format="CSV",
cloudstorage_multipart_pending="True",
)

assert enqueue_mock.call_count == 0, (
"should not enqueue while cloudstorage_multipart_pending=True"
)

helpers.call_action(
"resource_patch",
{},
id=resource["id"],
cloudstorage_multipart_pending="False",
)

assert enqueue_mock.call_count == 1, (
"xloader should enqueue once the pending flag is cleared"
)

def test_submit_when_no_cloudstorage_multipart_pending(self, monkeypatch):
"""Resources without the cloudstorage_multipart_pending flag must
behave as before (plain resource_update path is unaffected)."""
func = mock.Mock()
monkeypatch.setitem(_actions, "xloader_submit", func)

dataset = factories.Dataset()

helpers.call_action(
"resource_create",
{},
package_id=dataset["id"],
url="http://example.com/file.csv",
format="CSV",
)

assert func.called

def test_submit_when_url_changes(self, monkeypatch):
func = mock.Mock()
monkeypatch.setitem(_actions, "xloader_submit", func)
Expand Down
Loading