From e8f5e43af0920a7dae155b623e61d1c0e4f47a9b Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Mon, 27 Apr 2026 17:16:41 -0400 Subject: [PATCH 1/2] fix: defer xloader job if resource multipart upload is pending --- ckanext/xloader/plugin.py | 7 ++++ ckanext/xloader/tests/test_plugin.py | 53 ++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 0d023553..b39d2ad8 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -185,6 +185,13 @@ def after_update(self, context, resource_dict): def _submit_to_xloader(self, resource_dict): context = {"ignore_auth": True, "defer_commit": True} + if toolkit.asbool(resource_dict.get("cloudstorage_multipart_pending", False)): + log.debug( + "Deferring xloading resource %s because a cloudstorage " + "multipart upload is still in progress.", + resource_dict.get("id"), + ) + return resource_format = resource_dict.get("format") if not XLoaderFormats.is_it_an_xloader_format(resource_format): log.debug( diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index f22dafbd..6550bc6c 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -34,6 +34,59 @@ def test_submit_on_resource_create(self, monkeypatch): assert func.called + def test_skip_when_cloudstorage_multipart_pending(self, monkeypatch): + """xloader must not submit while a cloudstorage multipart upload is + still in flight. When the flag is later cleared (e.g. by + cloudstorage_finish_multipart calling resource_patch) xloader should + submit.""" + func = mock.Mock() + monkeypatch.setitem(_actions, "xloader_submit", func) + + dataset = factories.Dataset() + + resource = helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + format="CSV", + cloudstorage_multipart_pending="True", + ) + + assert not func.called, ( + "xloader_submit should be skipped while " + "cloudstorage_multipart_pending=True" + ) + + helpers.call_action( + "resource_patch", + {}, + id=resource["id"], + cloudstorage_multipart_pending="False", + ) + + assert func.called, ( + "xloader_submit should run once the pending flag is cleared" + ) + + def test_submit_when_no_cloudstorage_multipart_pending(self, monkeypatch): + """Resources without the cloudstorage_multipart_pending flag must + behave as before (plain resource_update path is unaffected).""" + func = mock.Mock() + monkeypatch.setitem(_actions, "xloader_submit", func) + + dataset = factories.Dataset() + + helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + format="CSV", + ) + + assert func.called + def test_submit_when_url_changes(self, monkeypatch): func = mock.Mock() monkeypatch.setitem(_actions, "xloader_submit", func) From e0a3e472cc35c43f6f4142d98eca9d1259c6b7d6 Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Tue, 28 Apr 2026 17:03:47 -0400 Subject: [PATCH 2/2] fix: add ignore_pending argument for stuck pending flags --- .github/workflows/test.yml | 2 +- ckanext/xloader/action.py | 13 ++++++ ckanext/xloader/plugin.py | 7 ---- ckanext/xloader/schema.py | 1 + ckanext/xloader/tests/test_action.py | 48 ++++++++++++++++++++++ ckanext/xloader/tests/test_plugin.py | 60 ++++++++++++++-------------- 6 files changed, 93 insertions(+), 38 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2d38b576..25f60c0b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -88,7 +88,7 @@ jobs: sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini # Install unzip for SonarQube Scan apt-get update - apt-get install unzip -y + apt-get install -y unzip gnupg - name: Setup extension continue-on-error: ${{ matrix.experimental }} diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index c0f3f84f..ab0f1dc0 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -40,6 +40,10 @@ def xloader_submit(context, data_dict): :param ignore_hash: If set to True, the xloader will reload the file even if it haven't changed. (optional, default: False) :type ignore_hash: bool + :param ignore_pending: If set to True, submit even when + ``cloudstorage_multipart_pending`` is set (e.g. stuck flag after a + failed clear). (optional, default: False) + :type ignore_pending: bool Returns ``True`` if the job has been submitted and ``False`` if the job has not been submitted, i.e. when ckanext-xloader is not configured. @@ -61,6 +65,15 @@ def xloader_submit(context, data_dict): except p.toolkit.ObjectNotFound: return False + if p.toolkit.asbool(resource_dict.get('cloudstorage_multipart_pending', False)): + if not p.toolkit.asbool(data_dict.get("ignore_pending", False)): + log.debug( + "Deferring xloading resource %s because a cloudstorage " + "multipart upload is still in progress.", + resource_dict.get("id"), + ) + return False + for plugin in p.PluginImplementations(xloader_interfaces.IXloader): upload = plugin.can_upload(res_id) if not upload: diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index b39d2ad8..0d023553 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -185,13 +185,6 @@ def after_update(self, context, resource_dict): def _submit_to_xloader(self, resource_dict): context = {"ignore_auth": True, "defer_commit": True} - if toolkit.asbool(resource_dict.get("cloudstorage_multipart_pending", False)): - log.debug( - "Deferring xloading resource %s because a cloudstorage " - "multipart upload is still in progress.", - resource_dict.get("id"), - ) - return resource_format = resource_dict.get("format") if not XLoaderFormats.is_it_an_xloader_format(resource_format): log.debug( diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index c0e8d938..2b865a73 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -29,6 +29,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], + 'ignore_pending': [ignore_missing, boolean_validator], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/tests/test_action.py b/ckanext/xloader/tests/test_action.py index 8b0e2729..d4c9fcfc 100644 --- a/ckanext/xloader/tests/test_action.py +++ b/ckanext/xloader/tests/test_action.py @@ -31,6 +31,54 @@ def test_submit(self): ) assert 1 == enqueue_mock.call_count + def test_submit_defers_when_cloudstorage_multipart_pending(self): + user = factories.User() + # Invalid format so resource_create does not enqueue via plugin + res = factories.Resource(user=user, format="aaa") + with mock.patch( + "ckanext.xloader.action.enqueue_job", + return_value=mock.MagicMock(id=123), + ) as enqueue_mock: + helpers.call_action( + "resource_patch", + {"user": user["name"]}, + id=res["id"], + format="csv", + cloudstorage_multipart_pending="True", + ) + assert enqueue_mock.call_count == 0 + out = helpers.call_action( + "xloader_submit", + context=dict(user=user["name"]), + resource_id=res["id"], + ) + assert out is False + assert enqueue_mock.call_count == 0 + + def test_submit_with_ignore_pending_when_multipart_pending(self): + user = factories.User() + res = factories.Resource(user=user, format="aaa") + with mock.patch( + "ckanext.xloader.action.enqueue_job", + return_value=mock.MagicMock(id=123), + ) as enqueue_mock: + helpers.call_action( + "resource_patch", + {"user": user["name"]}, + id=res["id"], + format="csv", + cloudstorage_multipart_pending="True", + ) + assert enqueue_mock.call_count == 0 + out = helpers.call_action( + "xloader_submit", + context=dict(user=user["name"]), + resource_id=res["id"], + ignore_pending=True, + ) + assert out is True + assert enqueue_mock.call_count == 1 + def test_submit_to_custom_queue_without_auth(self): # check that xloader_submit doesn't allow regular users to change queues user = factories.User() diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index 6550bc6c..f8abf3f3 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -34,40 +34,40 @@ def test_submit_on_resource_create(self, monkeypatch): assert func.called - def test_skip_when_cloudstorage_multipart_pending(self, monkeypatch): + def test_skip_when_cloudstorage_multipart_pending(self): """xloader must not submit while a cloudstorage multipart upload is still in flight. When the flag is later cleared (e.g. by cloudstorage_finish_multipart calling resource_patch) xloader should submit.""" - func = mock.Mock() - monkeypatch.setitem(_actions, "xloader_submit", func) - - dataset = factories.Dataset() - - resource = helpers.call_action( - "resource_create", - {}, - package_id=dataset["id"], - url="http://example.com/file.csv", - format="CSV", - cloudstorage_multipart_pending="True", - ) - - assert not func.called, ( - "xloader_submit should be skipped while " - "cloudstorage_multipart_pending=True" - ) - - helpers.call_action( - "resource_patch", - {}, - id=resource["id"], - cloudstorage_multipart_pending="False", - ) - - assert func.called, ( - "xloader_submit should run once the pending flag is cleared" - ) + with mock.patch( + "ckanext.xloader.action.enqueue_job", + return_value=mock.MagicMock(id=123), + ) as enqueue_mock: + dataset = factories.Dataset() + + resource = helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + format="CSV", + cloudstorage_multipart_pending="True", + ) + + assert enqueue_mock.call_count == 0, ( + "should not enqueue while cloudstorage_multipart_pending=True" + ) + + helpers.call_action( + "resource_patch", + {}, + id=resource["id"], + cloudstorage_multipart_pending="False", + ) + + assert enqueue_mock.call_count == 1, ( + "xloader should enqueue once the pending flag is cleared" + ) def test_submit_when_no_cloudstorage_multipart_pending(self, monkeypatch): """Resources without the cloudstorage_multipart_pending flag must