diff --git a/.flake8 b/.flake8 index 32068ca7..973dec8d 100644 --- a/.flake8 +++ b/.flake8 @@ -17,4 +17,6 @@ max-line-length=127 # List ignore rules one per line. ignore = + C901 + E501 W503 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index ef7612bf..75966354 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -31,77 +31,32 @@ on: jobs: - lint: - if: github.repository == 'ckan/ckanext-xloader' + + validateVersion: runs-on: ubuntu-latest + if: github.repository == 'ckan/ckanext-xloader' steps: - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: python-version: '3.10' - - name: Install requirements - run: pip install flake8 pycodestyle - - name: Check syntax - run: flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --extend-exclude ckan - test: - needs: lint - strategy: - matrix: - include: #ckan-image see https://github.com/ckan/ckan-docker-base, ckan-version controls other image tags - - ckan-version: "2.11" - ckan-image: "2.11-py3.10" - - ckan-version: "2.10" - ckan-image: "2.10-py3.10" - - ckan-version: "2.9" - ckan-image: "2.9-py3.9" - #- ckan-version: "master" Publish does not care about master - # ckan-image: "master" - fail-fast: false - - name: CKAN ${{ matrix.ckan-version }} - runs-on: ubuntu-latest - container: - image: ckan/ckan-dev:${{ matrix.ckan-image }} - options: --user root - services: - solr: - image: ckan/ckan-solr:${{ matrix.ckan-version }}-solr9 - postgres: - image: ckan/ckan-postgres-dev:${{ matrix.ckan-version }} - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - POSTGRES_DB: postgres - ports: - - 5432:5432 - options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 - redis: - image: redis:3 - env: - CKAN_SQLALCHEMY_URL: postgresql://ckan_default:pass@postgres/ckan_test - CKAN_DATASTORE_WRITE_URL: postgresql://datastore_write:pass@postgres/datastore_test - CKAN_DATASTORE_READ_URL: postgresql://datastore_read:pass@postgres/datastore_test - CKAN_SOLR_URL: http://solr:8983/solr/ckan - CKAN_REDIS_URL: redis://redis:6379/1 + - name: Validate tag version + if: ${{ startsWith(github.ref, 'refs/tags') }} + run: | + TAG_VALUE=${GITHUB_REF/refs\/tags\//} + PYTHON_VERSION=$(grep -E '\bversion\s?=\s?"[^"]+"' pyproject.toml | awk -F '"' '{print $2}') + echo "Tag version is [$TAG_VALUE], Python version is [$PYTHON_VERSION]" + if [ "$TAG_VALUE" != "$PYTHON_VERSION" ]; then + echo "Version mismatch; tag version is [$TAG_VALUE] but Python version is [$PYTHON_VERSION]" >> $GITHUB_STEP_SUMMARY + exit 1 + fi - steps: - - uses: actions/checkout@v4 - - if: ${{ matrix.ckan-version == 2.9 }} - run: pip install "setuptools>=44.1.0,<71" - - name: Install requirements - run: | - pip install -r requirements.txt - pip install -r dev-requirements.txt - pip install -e . - pip install -U requests[security] - # Replace default path to CKAN core config file with the one on the container - sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini - - name: Setup extension (CKAN >= 2.9) - run: | - ckan -c test.ini db init - - name: Run tests - run: pytest --ckan-ini=test.ini --cov=ckanext.xloader --disable-warnings ckanext/xloader/tests + test: + needs: validateVersion + name: Test + uses: ./.github/workflows/test.yml # Call the reusable workflow publishSkipped: if: github.repository != 'ckan/ckanext-xloader' @@ -138,10 +93,10 @@ jobs: echo "reponame=${reponame}" >> $GITHUB_OUTPUT if [ "$env.ENVIRONMENT" == "testpypi" ]; then - url="https://test.pypi.com/p/$reponame" + url="https://test.pypi.org/project/$reponame/$TAG_VALUE/" echo "environment=${env.ENVIRONMENT}" >> $GITHUB_OUTPUT else - url="https://pypi.com/p/$reponame" + url="https://pypi.org/project/$reponame/$TAG_VALUE/" echo "environment=pypi" >> $GITHUB_OUTPUT fi @@ -150,16 +105,6 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - - name: Validate tag version - if: ${{ startsWith(github.ref, 'refs/tags') }} - run: | - PYTHON_VERSION=$(grep -E "\bversion='[^']+'" setup.py | awk -F "'" '{print $2}') - echo "Tag version is [${{ steps.version.outputs.version }}], Python version is [$PYTHON_VERSION]" - if [ "${{ steps.version.outputs.version }}" != "$PYTHON_VERSION" ]; then - echo "Version mismatch; tag version is [${{ steps.version.outputs.version }}] but Python version is [$PYTHON_VERSION]" >> $GITHUB_STEP_SUMMARY - exit 1 - fi - - name: Build package ${{ steps.version.outputs.reponame }} @ ${{ steps.version.outputs.version }} run: | pip install build diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2d38b576..640c10ea 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,6 +5,7 @@ on: pull_request: branches: - master + workflow_call: jobs: lint: @@ -30,19 +31,15 @@ jobs: experimental: false - ckan-version: "2.10" ckan-image: "2.10-py3.10" - solr-version: "9" - experimental: false - - ckan-version: "2.9" - ckan-image: "2.9-py3.9" - solr-version: "8" experimental: false + solr-version: "9" - ckan-version: "master" ckan-image: "master" solr-version: "9" experimental: true # master is unstable, good to know if we are compatible or not fail-fast: false - name: CKAN ${{ matrix.ckan-version }} + name: ${{ matrix.experimental && '**Fail_Ignored** ' || '' }} CKAN ${{ matrix.ckan-version }} runs-on: ubuntu-latest container: image: ckan/ckan-dev:${{ matrix.ckan-image }} @@ -60,7 +57,7 @@ jobs: - 5432:5432 options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 redis: - image: redis:3 + image: redis:7 env: CKAN_SQLALCHEMY_URL: postgresql://ckan_default:pass@postgres/ckan_test CKAN_DATASTORE_WRITE_URL: postgresql://datastore_write:pass@postgres/datastore_test @@ -72,12 +69,7 @@ jobs: - uses: actions/checkout@v4 continue-on-error: ${{ matrix.experimental }} - - name: Pin setuptools for ckan 2.9 only - if: ${{ matrix.ckan-version == 2.9 }} - run: pip install "setuptools>=44.1.0,<71" - continue-on-error: ${{ matrix.experimental }} - - - name: Install requirements + - name: ${{ matrix.experimental && '**Fail_Ignored** ' || '' }} Install requirements continue-on-error: ${{ matrix.experimental }} run: | pip install -r requirements.txt @@ -90,16 +82,20 @@ jobs: apt-get update apt-get install unzip -y - - name: Setup extension + - name: ${{ matrix.experimental && '**Fail_Ignored** ' || '' }} Setup extension continue-on-error: ${{ matrix.experimental }} run: | ckan -c test.ini db init + ckan -c test.ini user add ckan_admin email=ckan_admin@localhost password="AbCdEf12345!@#%" + ckan -c test.ini sysadmin add ckan_admin + ckan config-tool test.ini "ckanext.xloader.api_token=$(ckan -c test.ini user token add ckan_admin xloader | tail -n 1 | tr -d '\t')" + ckan -c test.ini user list - - name: Run tests + - name: ${{ matrix.experimental && '**Fail_Ignored** ' || '' }} Run tests continue-on-error: ${{ matrix.experimental }} run: pytest --ckan-ini=test.ini --cov=ckanext.xloader --disable-warnings ckanext/xloader/tests --junit-xml=/tmp/artifacts/junit/results.xml - - name: Test Summary + - name: ${{ matrix.experimental && '**Fail_Ignored** ' || '' }} Test Summary uses: test-summary/action@v2 continue-on-error: ${{ matrix.experimental }} with: @@ -108,7 +104,7 @@ jobs: - name: SonarQube Scan uses: sonarsource/sonarqube-scan-action@master - with: + with: args: > -Dsonar.projectKey=ckanext-xloader -Dsonar.sources=ckanext/xloader diff --git a/CHANGELOG b/CHANGELOG index 26bce2d0..30d92178 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,14 +1,67 @@ -1.1.2 2024-10-25 +CHANGELOG +========= +See: https://github.com/ckan/ckanext-xloader/releases if this file has drifted. + +2.0.1 2025-03-04 +================ + +## Fix + +* #244 Static webassets not included in package +* #245 support apitoken_header_name in 2.11.x. +* #241 loading R/W datasource resources via api (not hardcoded) + +2.0.0 2024-12-10 ================ -Feat: +## Major +Dropped CKAN 2.9.x and Python2. + + +## Feat: +* Adds Strip White Space fields to the Data Dictionary (defualts to `True` for each field). + This will strip surrounding white space from data values prior to inserting them into the database. +* Adds support for ckanext-validation. Config `ckanext.xloader.validation.requires_successful_report` + controls whether a resource requires a successful validation report to be XLoadered. + By default, a resource would also require a Validation Schema, which can be turned off with + `ckanext.xloader.validation.enforce_schema`. * Frontend Status Badges by @JVickery-TBS in https://github.com/ckan/ckanext-xloader/pull/224 -Fix: + +## Fix: +* Properly handle REDIS queue timeouts to close/delete any temporary files. * Fix automated PyPI publishing by @ThrawnCA in https://github.com/ckan/ckanext-xloader/pull/231 +## What's Changed +* Update README, migrate it to Markdown by @amercader in https://github.com/ckan/ckanext-xloader/pull/235 +* chore: switch to pyproject.toml by @duttonw in https://github.com/ckan/ckanext-xloader/pull/236 +* Validation Extension Support (Squashed) by @JVickery-TBS in https://github.com/ckan/ckanext-xloader/pull/237 +* Strip White Space from Cell Values (Squashed) by @JVickery-TBS in https://github.com/ckan/ckanext-xloader/pull/238 +* RQ Job Timeout Handling (Squashed) by @JVickery-TBS in https://github.com/ckan/ckanext-xloader/pull/239 +* SQLAlchemy v2 support by @smotornyuk in https://github.com/ckan/ckanext-xloader/pull/225 + +**Full Changelog**: https://github.com/ckan/ckanext-xloader/compare/1.2.0...2.0.0 + +1.2.0 2024-11-21 +================ + +## What's Changed +* Fix PyPI publishing by @ThrawnCA in https://github.com/ckan/ckanext-xloader/pull/233 +* Enhancement/Bugfix: Downstream qld-gov-au fix's by @duttonw in https://github.com/ckan/ckanext-xloader/pull/232 + * feat: @JVickery-TBS work on validation integration (await successful validation prior to doing datastore work via 'IPipeValidation' + * fix: handle gracefully if tabulator load fails by trying 'direct load' + * fix: Excel blank header row bug + * fix: Datastore truncate, restart identity so numbering restarts from 0 again (when imported data has same columns and types + * fix: parital fix on DB deadlock by adding timeouts on DDL events + * test: test_simple_large_file, test_with_blanks, test_with_empty_lines, test_with_extra_blank_cells + * test: test_require_validation, test_enforce_validation_schema + * chore: min version requirements for cve's, + * requests>=2.32.0 + * urllib3>=2.2.2 + * zipp>=3.19.1 + +**Full Changelog**: https://github.com/ckan/ckanext-xloader/compare/1.1.2...1.2.0 -**Full Changelog**: https://github.com/ckan/ckanext-xloader/compare/1.1.1...1.1.2 1.1.1 2024-10-16 ================ @@ -21,7 +74,7 @@ Fix: 1.1.0 2024-10-16 ================ -Fixes: +Fixes: * feat: Add pypi cicd publish via github action via environment controls by @duttonw in https://github.com/ckan/ckanext-xloader/pull/228 @@ -32,7 +85,7 @@ Fixes: ================ -Fixes: +Fixes: * add README note about running on separate server, #191 by @ThrawnCA in https://github.com/ckan/ckanext-xloader/pull/192 * Use IDomainObjectModification Implementation by @JVickery-TBS in https://github.com/ckan/ckanext-xloader/pull/198 diff --git a/MANIFEST.in b/MANIFEST.in index 4aeb22fa..ab418699 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,9 @@ include *requirements*.txt include CHANGELOG include LICENSE -include README.rst +include README.md include ckanext/xloader/config_declaration.yaml recursive-include ckanext/xloader/templates *.html +recursive-include ckanext/xloader/webassets *.css +recursive-include ckanext/xloader/webassets *.yml +recursive-include ckanext/xloader/webassets *.js \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 00000000..5e091308 --- /dev/null +++ b/README.md @@ -0,0 +1,621 @@ +# XLoader - ckanext-xloader + +[![Tests](https://github.com/ckan/ckanext-xloader/workflows/Tests/badge.svg?branch=master)](https://github.com/ckan/ckanext-xloader/actions) +[![Latest Version](https://img.shields.io/pypi/v/ckanext-xloader.svg)](https://pypi.org/project/ckanext-xloader/) +[![Supported Python versions](https://img.shields.io/pypi/pyversions/ckanext-xloader.svg)](https://pypi.org/project/ckanext-xloader/) +[![Development Status](https://img.shields.io/pypi/status/ckanext-xloader.svg)](https://pypi.org/project/ckanext-xloader/) +[![License](https://img.shields.io/pypi/l/ckanext-xloader.svg)](https://pypi.org/project/ckanext-xloader/) + + +Loads CSV (and similar) data into CKAN's DataStore. Designed as a +replacement for DataPusher because it offers ten times the speed and +more robustness (hence the name, derived from "Express Loader") + +**OpenGov Inc.** has sponsored this development, with the aim of +benefitting open data infrastructure worldwide. + +* [Key differences from DataPusher](#key-differences-from-datapusher) +* [Requirements](#requirements) +* [Installation](#installation) +* [Config settings](#config-settings) +* [Developer installation](#developer-installation) +* [Upgrading from DataPusher](#upgrading-from-datapusher) +* [Command-line interface](#command-line-interface) + * [Jobs and workers](#jobs-and-workers) +* [Troubleshooting](#troubleshooting) +* [Running the Tests](#running-the-tests) +* [Releasing a New Version of XLoader](#releasing-a-new-version-of-xloader) + +## Key differences from DataPusher + +### Speed of loading + +DataPusher - parses CSV rows, converts to detected column types, +converts the data to a JSON string, calls datastore_create for each +batch of rows, which reformats the data into an INSERT statement string, +which is passed to PostgreSQL. + +XLoader - pipes the CSV file directly into PostgreSQL using COPY. + +In [tests](https://github.com/ckan/ckanext-xloader/issues/25), XLoader +is over ten times faster than DataPusher. + +### Robustness + +DataPusher - one cause of failure was when casting cells to a guessed +type. The type of a column was decided by looking at the values of only +the first few rows. So if a column is mainly numeric or dates, but a +string (like "N/A") comes later on, then this will cause the load to +error at that point, leaving it half-loaded into DataStore. + +XLoader - loads all the cells as text, before allowing the admin to +convert columns to the types they want (using the Data Dictionary +feature). In future it could do automatic detection and conversion. + +### Simpler queueing tech + +DataPusher - job queue is done by ckan-service-provider which is +bespoke, complicated and stores jobs in its own database (sqlite by +default). + +XLoader - job queue is done by RQ, which is simpler, is backed by Redis, +allows access to the CKAN model and is CKAN's default queue technology. +You can also debug jobs easily using pdb. Job results are stored in +Sqlite by default, and for production simply specify CKAN's database in +the config and it's held there - easy. + +(The other obvious candidate is Celery, but we don't need its +heavyweight architecture and its jobs are not debuggable with pdb.) + +### Separate web server + +DataPusher - has the complication that the queue jobs are done by a +separate (Flask) web app, apart from CKAN. This was the design because +the job requires intensive processing to convert every line of the data +into JSON. However it means more complicated code as info needs to be +passed between the services in http requests, more for the user to +set-up and manage - another app config, another apache config, separate +log files. + +XLoader - the job runs in a worker process, in the same app as CKAN, so +can access the CKAN config, db and logging directly and avoids many HTTP +calls. This simplification makes sense because the xloader job doesn't +need to do much processing - mainly it is streaming the CSV file from +disk into PostgreSQL. + +It is still entirely possible to run the XLoader worker on a separate +server, if that is desired. The worker needs the following: + +- A copy of CKAN installed in the same Python virtualenv (but not + running). +- A copy of the CKAN config file. +- Access to the Redis instance that the running CKAN app uses to store + jobs. +- Access to the database. + +You can then run it via ckan jobs worker as below. + +### Caveat - column types + +Note: With XLoader, all columns are stored in DataStore's database as +'text' type (whereas DataPusher did some rudimentary type guessing - +see 'Robustness' above). However once a resource is xloaded, an admin +can use the resource's Data Dictionary tab to change these types to +numeric or datestamp and re-load the file. When migrating from +DataPusher to XLoader you can preserve the types of existing resources +by using the `migrate_types` command. + +There is scope to add functionality for automatically guessing column +type -offers to contribute this are welcomed. + +## Requirements + +Compatibility with core CKAN versions: + + | CKAN version | Compatibility | + | -------------- |-------------------------------------------------------| + | 2.7 | no longer supported (last supported version: 0.12.2) | + | 2.8 | no longer supported (last supported version: 0.12.2) | + | 2.9 | no longer supported (last supported version: 1.2.x) | + | 2.10 | yes | + | 2.11 | yes | + +## Installation + +To install XLoader: + +1. Activate your CKAN virtual environment, for example: + + . /usr/lib/ckan/default/bin/activate + +2. Install the ckanext-xloader Python package into your virtual + environment: + + pip install ckanext-xloader + +3. Install dependencies: + + pip install -r https://raw.githubusercontent.com/ckan/ckanext-xloader/master/requirements.txt + pip install -U requests[security] + +4. Add `xloader` to the `ckan.plugins` setting in your CKAN config file + (by default the config file is located at + `/etc/ckan/default/production.ini`). + + You should also remove `datapusher` if it is in the list, to avoid + them both trying to load resources into the DataStore. + + Ensure `datastore` is also listed, to enable CKAN DataStore. + +5. Starting CKAN 2.10 you will need to set an API Token to be able to + execute jobs against the server: + + ckanext.xloader.api_token = + ckan config-tool test.ini "ckanext.xloader.api_token=$(ckan -c test.ini user token add ckan_admin xloader | tail -n 1 | tr -d '\t')" + +6. If it is a production server, you'll want to store jobs info in a + more robust database than the default sqlite file. It can happily + use the main CKAN postgres db by adding this line to the config, but + with the same value as you have for `sqlalchemy.url`: + + ckanext.xloader.jobs_db.uri = postgresql://ckan_default:pass@localhost/ckan_default + + (This step can be skipped when just developing or testing.) + +7. Restart CKAN. For example if you've deployed CKAN with Apache on + Ubuntu: + + sudo service apache2 reload + +8. Run the worker: + + ckan -c /etc/ckan/default/ckan.ini jobs worker + +## Config settings + +Configuration: + + +This plugin supports the [`ckan.download_proxy`](https://docs.ckan.org/en/latest/maintaining/configuration.html#ckan-download-proxy) setting, +to use a proxy server when downloading files. This setting is shared +with other plugins that download resource files, such as +ckanext-archiver. Eg: + + ckan.download_proxy = + +You may also wish to configure the database to use your preferred date +input style on COPY. For example, to make +[PostgreSQL]() +expect European (day-first) dates, you could add to `postgresql.conf`: + + datestyle=ISO,DMY + +All configurations below are defined in the +[config_declaration.yaml](ckanext/xloader/config_declaration.yaml) file. + + +#### ckanext.xloader.jobs_db.uri + +Example: + +``` +postgresql://ckan_default:pass@localhost/ckan_default + +``` + + +Default value: `sqlite:////tmp/xloader_jobs.db` + +The connection string for the jobs database used by XLoader. The +default of an sqlite file is fine for development. For production use a +Postgresql database. + + +#### ckanext.xloader.api_token + +Example: + +``` +ckanext.xloader.api_token = eyJ0eXAiOiJKV1QiLCJh.eyJqdGkiOiJ0M2VNUFlQWFg0VU.8QgV8em4RA +``` + +Default value: none + +It's mandatory starting from CKAN 2.10. You can get one +running the command `ckan user token add {USER_NAME} xloader -q` + + +#### ckanext.xloader.formats + +Example: + +``` +ckanext.xloader.formats = csv application/csv xls application/vnd.ms-excel +``` + +Default value: none + +The formats that are accepted. If the value of the resource.format is +anything else then it won't be 'xloadered' to DataStore (and will therefore +only be available to users in the form of the original download/link). +Case insensitive. Defaults are listed in utils.py. + + +#### ckanext.xloader.max_content_length + +Example: + +``` +ckanext.xloader.max_content_length = 100000 +``` + +Default value: `1000000000` + +The maximum file size that XLoader will attempt to load. + + +#### ckanext.xloader.use_type_guessing + +Default value: `False` + +By default, xloader will first try to add tabular data to the DataStore +with a direct PostgreSQL COPY. This is relatively fast, but does not +guess column types. If this fails, xloader falls back to a method more +like DataPusher's behaviour. This has the advantage that the column types +are guessed. However it is more error prone and far slower. +To always skip the direct PostgreSQL COPY and use type guessing, set +this option to True. + + +#### ckanext.xloader.strict_type_guessing + +Default value: `True` + +Use with ckanext.xloader.use_type_guessing to set strict true or false +for type guessing. If set to False, the types will always fallback to string type. + +Strict means that a type will not be guessed if parsing fails for a single cell in the column. + + +#### ckanext.xloader.max_type_guessing_length + +Example: + +``` +ckanext.xloader.max_type_guessing_length = 100000 +``` + +Default value: `0` + +The maximum file size that will be passed to Tabulator if the +use_type_guessing flag is enabled. Larger files will use COPY even if +the flag is set. Defaults to 1/10 of the maximum content length. + + +#### ckanext.xloader.parse_dates_dayfirst + +Default value: `False` + +Whether ambiguous dates should be parsed day first. Defaults to False. +If set to True, dates like '01.02.2022' will be parsed as day = 01, +month = 02. +NB: isoformat dates like '2022-01-02' will be parsed as YYYY-MM-DD, and +this option will not override that. +See [dateutil docs](https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse) +for more details. + + +#### ckanext.xloader.parse_dates_yearfirst + +Default value: `False` + +Whether ambiguous dates should be parsed year first. Defaults to False. +If set to True, dates like '01.02.03' will be parsed as year = 2001, +month = 02, day = 03. See [dateutil docs](https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse) +for more details. + + +#### ckanext.xloader.job_timeout + +Example: + +``` +ckanext.xloader.job_timeout = 3600 +``` + +Default value: `3600` + +The maximum time for the loading of a resource before it is aborted. +Give an amount in seconds. Default is 60 minutes + + +#### ckanext.xloader.ignore_hash + +Default value: `False` + +Ignore the file hash when submitting to the DataStore, if set to True +resources are always submitted (if their format matches), if set to +False (default), resources are only submitted if their hash has changed. + + +#### ckanext.xloader.max_excerpt_lines + +Example: + +``` +ckanext.xloader.max_excerpt_lines = 100 +``` + +Default value: `0` + +When loading a file that is bigger than `max_content_length`, xloader can +still try and load some of the file, which is useful to display a +preview. Set this option to the desired number of lines/rows that it +loads in this case. +If the file-type is supported (CSV, TSV) an excerpt with the number of +`max_excerpt_lines` lines will be submitted while the `max_content_length` +is not exceeded. +If set to 0 (default) files that exceed the `max_content_length` will +not be loaded into the datastore. + + +#### ckanext.xloader.ssl_verify + +Example: + +``` +ckanext.xloader.ssl_verify = True +``` + +Default value: `True` + +Requests verifies SSL certificates for HTTPS requests. Setting verify to +False should only be enabled during local development or testing. Default +to True. + + +#### ckanext.xloader.clean_datastore_tables + +Example: + +``` +ckanext.xloader.clean_datastore_tables = True +``` + +Default value: `False` + +Enqueue jobs to remove Datastore tables from Resources that have a format +that is not in ckanext.xloader.formats after a Resource is updated. + + +#### ckanext.xloader.show_badges + +Default value: `True` + +Controls whether or not the status badges display in the front end. + + +#### ckanext.xloader.debug_badges + +Example: + +``` +ckanext.xloader.debug_badges = True +``` + +Default value: `False` + +Controls whether or not the status badges display all of the statuses. By default, +the badges will display "pending", "running", and "error". With debug_badges enabled, +they will also display "complete", "active", "inactive", and "unknown". + +#### ckanext.xloader.validation.requires_successful_report + +Supports: __ckanext-validation__ + +Example: + +``` +ckanext.xloader.validation.requires_successful_report = True +``` + +Default value: `False` + +Controls whether or not a resource requires a successful validation report from the ckanext-validation plugin in order to be XLoadered. + +#### ckanext.xloader.validation.enforce_schema + +Supports: __ckanext-validation__ + +Example: + +``` +ckanext.xloader.validation.enforce_schema = False +``` + +Default value: `True` + +Controls whether or not a resource requires a Validation Schema to be present from the ckanext-validation plugin to be XLoadered. + +#### ckanext.xloader.site_url +Provide an alternate site URL for the xloader_submit action. +This is useful, for example, when the site is running within a docker network. + +Note: This setting will not alter path. i.e ckan.root_path + +Example: + +``` +ckanext.xloader.site_url = http://ckan-dev:5000 +``` + +##### ckanext.xloader.site_url_ignore_path_regex +Provide the ability to ignore paths which can't be mapped to alternative site URL for resource access. +This is useful, for example, when the site is running within a docker network and the cdn front door has +Blob storage mapped to another path on the same domain. + +Example: + +``` +ckanext.xloader.site_url_ignore_path_regex = "(/PathToS3HostOriginIWantToGoDirectTo|/anotherPath)" +``` + +## Data Dictionary Fields + +#### strip_extra_white + +This plugin adds the `Strip Extra Leading and Trailing White Space` field to Data Dictionary fields. This controls whether or not to trim whitespace from data values prior to inserting into the database. Default for each field is `True` (it will trim whitespace). + +## Developer installation + +To install XLoader for development, activate your CKAN virtualenv and in +the directory up from your local ckan repo: + + git clone https://github.com/ckan/ckanext-xloader.git + cd ckanext-xloader + pip install -e . + pip install -r requirements.txt + pip install -r dev-requirements.txt + +## Upgrading from DataPusher + +To upgrade from DataPusher to XLoader: + +1. Install XLoader as above, including running the xloader worker. + +2. (Optional) For existing datasets that have been datapushed to + datastore, freeze the column types (in the data dictionaries), so + that XLoader doesn't change them back to string on next xload: + + ckan -c /etc/ckan/default/ckan.ini migrate_types + +3. If you've not already, change the enabled plugin in your config - + on the `ckan.plugins` line replace `datapusher` with `xloader`. + +4. (Optional) If you wish, you can disable the direct loading and + continue to just use tabulator - for more about this see the docs on + config option: `ckanext.xloader.use_type_guessing` + +5. Stop the datapusher worker: + + sudo a2dissite datapusher + +6. Restart CKAN: + + sudo service apache2 reload + sudo service nginx reload + +## Command-line interface + +You can submit single or multiple resources to be xloaded using the +command-line interface. + +e.g. : + + ckan -c /etc/ckan/default/ckan.ini xloader submit + +For debugging you can try xloading it synchronously (which does the load +directly, rather than asking the worker to do it) with the `-s` option: + + ckan -c /etc/ckan/default/ckan.ini xloader submit -s + +See the status of jobs: + + ckan -c /etc/ckan/default/ckan.ini xloader status + +Submit all datasets' resources to the DataStore: + + ckan -c /etc/ckan/default/ckan.ini xloader submit all + +Re-submit all the resources already in the DataStore (Ignores any +resources that have not been stored in DataStore e.g. because they are +not tabular): + + ckan -c /etc/ckan/default/ckan.ini xloader submit all-existing + +**Full list of XLoader CLI commands**: + + ckan -c /etc/ckan/default/ckan.ini xloader --help + +### Jobs and workers + +Main docs for managing jobs: + +https://docs.ckan.org/en/latest/maintaining/background-tasks.html#managing-background-jobs + +Main docs for running and managing workers are here: + +https://docs.ckan.org/en/latest/maintaining/background-tasks.html#running-background-jobs + +Useful commands: + +Clear (delete) all outstanding jobs: + + ckan -c /etc/ckan/default/ckan.ini jobs clear [QUEUES] + +If having trouble with the worker process, restarting it can help: + + sudo supervisorctl restart ckan-worker:* + +## Troubleshooting + +**KeyError: "Action 'datastore_search' not found"** + +You need to enable the [datastore]{.title-ref} plugin in your CKAN +config. See 'Installation' section above to do this and restart the +worker. + +**ProgrammingError: (ProgrammingError) relation "\_table_metadata" +does not exist** + +Your DataStore permissions have not been set-up - see: +https://docs.ckan.org/en/latest/maintaining/datastore.html#set-permissions + +## Running the Tests + +The first time, your test datastore database needs the trigger applied: + + sudo -u postgres psql datastore_test -f full_text_function.sql + +To run the tests, do: + + pytest ckan-ini=test.ini ckanext/xloader/tests + +## Releasing a New Version of XLoader + +XLoader is available on PyPI as +. + +To publish a new version to PyPI follow these steps: + +1. Update the version number in the `setup.py` file. See [PEP + 440](http://legacy.python.org/dev/peps/pep-0440/#public-version-identifiers) + for how to choose version numbers. + +2. Update the CHANGELOG. + +3. Make sure you have the latest version of necessary packages: + + pip install --upgrade setuptools wheel twine + +4. Create source and binary distributions of the new version: + + python setup.py sdist bdist_wheel && twine check dist/* + + Fix any errors you get. + +5. Upload the source distribution to PyPI: + + twine upload dist/* + +6. Commit any outstanding changes: + + git commit -a + git push + +7. Tag the new release of the project on GitHub with the version number + from the `setup.py` file. For example if the version number in + `setup.py` is 0.0.1 then do: + + git tag 0.0.1 + git push --tags diff --git a/README.rst b/README.rst deleted file mode 100644 index 4ec2e11b..00000000 --- a/README.rst +++ /dev/null @@ -1,380 +0,0 @@ -.. You should enable this project on travis-ci.org and coveralls.io to make - these badges work. The necessary Travis and Coverage config files have been - generated for you. - -.. image:: https://travis-ci.org/ckan/ckanext-xloader.svg?branch=master - :target: https://travis-ci.org/ckan/ckanext-xloader - -.. image:: https://img.shields.io/pypi/v/ckanext-xloader.svg - :target: https://pypi.org/project/ckanext-xloader/ - :alt: Latest Version - -.. image:: https://img.shields.io/pypi/pyversions/ckanext-xloader.svg - :target: https://pypi.org/project/ckanext-xloader/ - :alt: Supported Python versions - -.. image:: https://img.shields.io/pypi/status/ckanext-xloader.svg - :target: https://pypi.org/project/ckanext-xloader/ - :alt: Development Status - -.. image:: https://img.shields.io/pypi/l/ckanext-xloader.svg - :target: https://pypi.org/project/ckanext-xloader/ - :alt: License - -========================= -XLoader - ckanext-xloader -========================= - -Loads CSV (and similar) data into CKAN's DataStore. Designed as a replacement -for DataPusher because it offers ten times the speed and more robustness -(hence the name, derived from "Express Loader") - -**OpenGov Inc.** has sponsored this development, with the aim of benefitting -open data infrastructure worldwide. - -------------------------------- -Key differences from DataPusher -------------------------------- - -Speed of loading ----------------- - -DataPusher - parses CSV rows, converts to detected column types, converts the -data to a JSON string, calls datastore_create for each batch of rows, which -reformats the data into an INSERT statement string, which is passed to -PostgreSQL. - -XLoader - pipes the CSV file directly into PostgreSQL using COPY. - -In `tests `_, XLoader -is over ten times faster than DataPusher. - -Robustness ----------- - -DataPusher - one cause of failure was when casting cells to a guessed type. The -type of a column was decided by looking at the values of only the first few -rows. So if a column is mainly numeric or dates, but a string (like "N/A") -comes later on, then this will cause the load to error at that point, leaving -it half-loaded into DataStore. - -XLoader - loads all the cells as text, before allowing the admin to -convert columns to the types they want (using the Data Dictionary feature). In -future it could do automatic detection and conversion. - -Simpler queueing tech ---------------------- - -DataPusher - job queue is done by ckan-service-provider which is bespoke, -complicated and stores jobs in its own database (sqlite by default). - -XLoader - job queue is done by RQ, which is simpler, is backed by Redis, allows -access to the CKAN model and is CKAN's default queue technology. -You can also debug jobs easily using pdb. Job results are stored in -Sqlite by default, and for production simply specify CKAN's database in the -config and it's held there - easy. - -(The other obvious candidate is Celery, but we don't need its heavyweight -architecture and its jobs are not debuggable with pdb.) - -Separate web server -------------------- - -DataPusher - has the complication that the queue jobs are done by a separate -(Flask) web app, apart from CKAN. This was the design because the job requires -intensive processing to convert every line of the data into JSON. However it -means more complicated code as info needs to be passed between the services in -http requests, more for the user to set-up and manage - another app config, -another apache config, separate log files. - -XLoader - the job runs in a worker process, in the same app as CKAN, so -can access the CKAN config, db and logging directly and avoids many HTTP calls. -This simplification makes sense because the xloader job doesn't need to do much -processing - mainly it is streaming the CSV file from disk into PostgreSQL. - -It is still entirely possible to run the XLoader worker on a separate server, -if that is desired. The worker needs the following: - -- A copy of CKAN installed in the same Python virtualenv (but not running). -- A copy of the CKAN config file. -- Access to the Redis instance that the running CKAN app uses to store jobs. -- Access to the database. - -You can then run it via `ckan jobs worker` as below. - -Caveat - column types ---------------------- - -Note: With XLoader, all columns are stored in DataStore's database as 'text' -type (whereas DataPusher did some rudimentary type guessing - see 'Robustness' -above). However once a resource is xloaded, an admin can use the resource's -Data Dictionary tab to change these types to numeric or -datestamp and re-load the file. When migrating from DataPusher to XLoader you -can preserve the types of existing resources by using the ``migrate_types`` -command. - -There is scope to add functionality for automatically guessing column type - -offers to contribute this are welcomed. - - ------------- -Requirements ------------- - -Compatibility with core CKAN versions: - -=============== ============= -CKAN version Compatibility -=============== ============= -2.7 no longer supported (last supported version: 0.12.2) -2.8 no longer supported (last supported version: 0.12.2) -2.9 yes (Python3) (last supported version for Python 2.7: 0.12.2)), Must: ``pip install "setuptools>=44.1.0,<71"`` -2.10 yes -2.11 yes -=============== ============= - ------------- -Installation ------------- - -To install XLoader: - -1. Activate your CKAN virtual environment, for example:: - - . /usr/lib/ckan/default/bin/activate - -2. Install the ckanext-xloader Python package into your virtual environment:: - - pip install ckanext-xloader - -3. Install dependencies:: - - pip install -r https://raw.githubusercontent.com/ckan/ckanext-xloader/master/requirements.txt - pip install -U requests[security] - -4. Add ``xloader`` to the ``ckan.plugins`` setting in your CKAN - config file (by default the config file is located at - ``/etc/ckan/default/production.ini``). - - You should also remove ``datapusher`` if it is in the list, to avoid them - both trying to load resources into the DataStore. - - Ensure ``datastore`` is also listed, to enable CKAN DataStore. - -5. Starting CKAN 2.10 you will need to set an API Token to be able to - execute jobs against the server:: - - ckanext.xloader.api_token = - -6. If it is a production server, you'll want to store jobs info in a more - robust database than the default sqlite file. It can happily use the main - CKAN postgres db by adding this line to the config, but with the same value - as you have for ``sqlalchemy.url``:: - - ckanext.xloader.jobs_db.uri = postgresql://ckan_default:pass@localhost/ckan_default - - (This step can be skipped when just developing or testing.) - -7. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu:: - - sudo service apache2 reload - -8. Run the worker:: - - ckan -c /etc/ckan/default/ckan.ini jobs worker - - ---------------- -Config settings ---------------- - -Configuration: - -See the extension's `config_declaration.yaml `_ file. - -This plugin also supports the `ckan.download_proxy` setting, to use a proxy server when downloading files. -This setting is shared with other plugins that download resource files, such as ckanext-archiver. Eg: - - ckan.download_proxy = http://my-proxy:1234/ - -You may also wish to configure the database to use your preferred date input style on COPY. -For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT) -expect European (day-first) dates, you could add to ``postgresql.conf``: - - datestyle=ISO,DMY - -External Database credentials for datastore - - ``ckanext.xloader.jobs_db.uri = postgresql://ckan_default:pass@localhost/ckan_default`` - -API Key requires for xloader interaction CKAN 2.10 onwards, to generate ``TOKEN=ckan -c /etc/ckan/default/production.ini user token add $ACCOUNT xloader | tail -1 | tr -d '[:space:]')`` - - ``ckanext.xloader.api_token = `` - -Badge notification on what xloader is doing - - ``ckanext.xloader.show_badges = True|False (default True)`` - - ``ckanext.xloader.debug_badges = True|False (default False)`` - ------------------------- -Developer installation ------------------------- - -To install XLoader for development, activate your CKAN virtualenv and -in the directory up from your local ckan repo:: - - git clone https://github.com/ckan/ckanext-xloader.git - cd ckanext-xloader - pip install -e . - pip install -r requirements.txt - pip install -r dev-requirements.txt - - -------------------------- -Upgrading from DataPusher -------------------------- - -To upgrade from DataPusher to XLoader: - -1. Install XLoader as above, including running the xloader worker. - -2. (Optional) For existing datasets that have been datapushed to datastore, freeze the column types (in the data dictionaries), so that XLoader doesn't change them back to string on next xload:: - - ckan -c /etc/ckan/default/ckan.ini migrate_types - -3. If you've not already, change the enabled plugin in your config - on the - ``ckan.plugins`` line replace ``datapusher`` with ``xloader``. - -4. (Optional) If you wish, you can disable the direct loading and continue to - just use tabulator - for more about this see the docs on config option: - ``ckanext.xloader.use_type_guessing`` - -5. Stop the datapusher worker:: - - sudo a2dissite datapusher - -6. Restart CKAN:: - - sudo service apache2 reload - sudo service nginx reload - ----------------------- -Command-line interface ----------------------- - -You can submit single or multiple resources to be xloaded using the -command-line interface. - -e.g. :: - - ckan -c /etc/ckan/default/ckan.ini xloader submit - -For debugging you can try xloading it synchronously (which does the load -directly, rather than asking the worker to do it) with the ``-s`` option:: - - ckan -c /etc/ckan/default/ckan.ini xloader submit -s - -See the status of jobs:: - - ckan -c /etc/ckan/default/ckan.ini xloader status - -Submit all datasets' resources to the DataStore:: - - ckan -c /etc/ckan/default/ckan.ini xloader submit all - -Re-submit all the resources already in the DataStore (Ignores any resources -that have not been stored in DataStore e.g. because they are not tabular):: - - ckan -c /etc/ckan/default/ckan.ini xloader submit all-existing - - -**Full list of XLoader CLI commands**:: - - ckan -c /etc/ckan/default/ckan.ini xloader --help - - -Jobs and workers ----------------- - -Main docs for managing jobs: - -Main docs for running and managing workers are here: https://docs.ckan.org/en/latest/maintaining/background-tasks.html#running-background-jobs - -Useful commands: - -Clear (delete) all outstanding jobs:: - - ckan -c /etc/ckan/default/ckan.ini jobs clear [QUEUES] - -If having trouble with the worker process, restarting it can help:: - - sudo supervisorctl restart ckan-worker:* - ---------------- -Troubleshooting ---------------- - -**KeyError: "Action 'datastore_search' not found"** - -You need to enable the `datastore` plugin in your CKAN config. See -'Installation' section above to do this and restart the worker. - -**ProgrammingError: (ProgrammingError) relation "_table_metadata" does not -exist** - -Your DataStore permissions have not been set-up - see: - - ------------------ -Running the Tests ------------------ - -The first time, your test datastore database needs the trigger applied:: - - sudo -u postgres psql datastore_test -f full_text_function.sql - -To run the tests, do:: - - pytest ckan-ini=test.ini ckanext/xloader/tests - - ----------------------------------- -Releasing a New Version of XLoader ----------------------------------- - -XLoader is available on PyPI as https://pypi.org/project/ckanext-xloader. - -To publish a new version to PyPI follow these steps: - -1. Update the version number in the ``setup.py`` file. - See `PEP 440 `_ - for how to choose version numbers. - -2. Update the CHANGELOG. - -3. Make sure you have the latest version of necessary packages:: - - pip install --upgrade setuptools wheel twine - -4. Create source and binary distributions of the new version:: - - python setup.py sdist bdist_wheel && twine check dist/* - - Fix any errors you get. - -5. Upload the source distribution to PyPI:: - - twine upload dist/* - -6. Commit any outstanding changes:: - - git commit -a - git push - -7. Tag the new release of the project on GitHub with the version number from - the ``setup.py`` file. For example if the version number in ``setup.py`` is - 0.0.1 then do:: - - git tag 0.0.1 - git push --tags diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index c0f3f84f..108d7cc9 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -47,12 +47,20 @@ def xloader_submit(context, data_dict): :rtype: bool ''' p.toolkit.check_access('xloader_submit', context, data_dict) + api_key = utils.get_xloader_user_apitoken() custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME) schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema()) data_dict, errors = _validate(data_dict, schema, context) if errors: raise p.toolkit.ValidationError(errors) + p.toolkit.check_access('xloader_submit', context, data_dict) + + # If sync is set to True, the xloader callback will be executed right + # away, instead of a job being enqueued. It will also delete any existing jobs + # for the given resource. This is only controlled by sysadmins or the system. + sync = data_dict.pop('sync', False) + res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { @@ -140,7 +148,7 @@ def xloader_submit(context, data_dict): qualified=True ) data = { - 'api_key': utils.get_xloader_user_apitoken(), + 'api_key': api_key, 'job_type': 'xloader_to_datastore', 'result_url': callback_url, 'metadata': { @@ -166,15 +174,20 @@ def xloader_submit(context, data_dict): job = enqueue_job( jobs.xloader_data_into_datastore, [data], queue=custom_queue, title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), - rq_kwargs=dict(timeout=timeout) + rq_kwargs=dict(timeout=timeout, at_front=sync) ) except Exception: - log.exception('Unable to enqueued xloader res_id=%s', res_id) + if sync: + log.exception('Unable to xloader res_id=%s', res_id) + else: + log.exception('Unable to enqueue xloader res_id=%s', res_id) return False log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) - value = json.dumps({'job_id': job.id}) + if sync: + log.debug('Pushed xloader sync mode job=%s res_id=%s to front of queue', job.id, res_id) + task['value'] = value task['state'] = 'pending' task['last_updated'] = str(datetime.datetime.utcnow()) diff --git a/ckanext/xloader/command.py b/ckanext/xloader/command.py index 4c0f2d2f..9937c910 100644 --- a/ckanext/xloader/command.py +++ b/ckanext/xloader/command.py @@ -5,7 +5,7 @@ import ckan.plugins.toolkit as tk from ckanext.xloader.jobs import xloader_data_into_datastore_ -from ckanext.xloader.utils import XLoaderFormats +from ckanext.xloader.utils import XLoaderFormats, get_xloader_user_apitoken class XloaderCmd: @@ -117,7 +117,7 @@ def _submit_resource(self, resource, user, indent=0, sync=False, queue=None): data_dict['ckan_url'] = tk.config.get('ckan.site_url') input_dict = { 'metadata': data_dict, - 'api_key': 'TODO' + 'api_key': get_xloader_user_apitoken() } logger = logging.getLogger('ckanext.xloader.cli') xloader_data_into_datastore_(input_dict, None, logger) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 9487999d..4b529839 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -2,6 +2,22 @@ version: 1 groups: - annotation: ckanext-xloader settings options: + - key: ckanext.xloader.site_url + example: http://ckan-dev:5000 + default: + description: | + Provide an alternate site URL for the xloader_submit action. + This is useful, for example, when the site is running within a docker network. + Note: This setting will not alter path. i.e ckan.root_path + required: false + - key: ckanext.xloader.site_url_ignore_path_regex + example: "(/PathToS3HostOriginIWantToGoDirectTo|/anotherPath)" + default: + description: | + Provide the ability to ignore paths which can't be mapped to alternative site URL for resource access. + This is useful, for example, when the site is running within a docker network and the cdn front door has + Blob storage mapped to another path on the same domain. + required: false - key: ckanext.xloader.jobs_db.uri default: sqlite:////tmp/xloader_jobs.db description: | @@ -14,9 +30,9 @@ groups: example: eyJ0eXAiOiJKV1QiLCJh.eyJqdGkiOiJ0M2VNUFlQWFg0VU.8QgV8em4RA description: | Uses a specific API token for the xloader_submit action instead of the - apikey of the site_user. Will be mandatory when dropping support for - CKAN 2.9. - required: false + apikey of the site_user. + default: 'NOT_SET' + required: true - key: ckanext.xloader.formats example: csv application/csv xls application/vnd.ms-excel description: | @@ -141,8 +157,10 @@ groups: example: False description: | Resources are expected to have a Validation Schema, or use the default ones if not. + If this option is set to `False`, Resources that do not have a Validation Schema will be treated like they do not require Validation. + See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema for more details. - key: ckanext.xloader.clean_datastore_tables @@ -169,5 +187,14 @@ groups: they will also display "complete", "active", "inactive", and "unknown". type: bool required: false - - + - key: ckanext.xloader.search_update_chunks + default: 100000 + example: True + description: | + The number of rows to process in each batch when populating the full-text + search index. Chunked processing prevents database timeouts and memory + exhaustion when indexing very large datasets (4GB+ files with millions of rows). + Smaller values reduce memory usage but increase processing time. Larger values + improve performance but may cause timeouts on very large tables. + type: int + required: false diff --git a/ckanext/xloader/db.py b/ckanext/xloader/db.py index a93eb0d8..6bb25d94 100644 --- a/ckanext/xloader/db.py +++ b/ckanext/xloader/db.py @@ -39,8 +39,8 @@ def init(config, echo=False): global ENGINE, _METADATA, JOBS_TABLE, METADATA_TABLE, LOGS_TABLE db_uri = config.get('ckanext.xloader.jobs_db.uri', 'sqlite:////tmp/xloader_jobs.db') - ENGINE = sqlalchemy.create_engine(db_uri, echo=echo, convert_unicode=True) - _METADATA = sqlalchemy.MetaData(ENGINE) + ENGINE = sqlalchemy.create_engine(db_uri, echo=echo) + _METADATA = sqlalchemy.MetaData() JOBS_TABLE = _init_jobs_table() METADATA_TABLE = _init_metadata_table() LOGS_TABLE = _init_logs_table() @@ -111,8 +111,10 @@ def get_job(job_id): if job_id: job_id = six.text_type(job_id) - result = ENGINE.execute( - JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id)).first() + with ENGINE.connect() as conn: + result = conn.execute( + JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id) + ).first() if not result: return None @@ -298,10 +300,11 @@ def _update_job(job_id, job_dict): if "data" in job_dict: job_dict["data"] = six.text_type(job_dict["data"]) - ENGINE.execute( - JOBS_TABLE.update() - .where(JOBS_TABLE.c.job_id == job_id) - .values(**job_dict)) + with ENGINE.begin() as conn: + conn.execute( + JOBS_TABLE.update() + .where(JOBS_TABLE.c.job_id == job_id) + .values(**job_dict)) def mark_job_as_completed(job_id, data=None): @@ -443,9 +446,10 @@ def _get_metadata(job_id): # warnings. job_id = six.text_type(job_id) - results = ENGINE.execute( - METADATA_TABLE.select().where( - METADATA_TABLE.c.job_id == job_id)).fetchall() + with ENGINE.connect() as conn: + results = conn.execute( + METADATA_TABLE.select().where( + METADATA_TABLE.c.job_id == job_id)).fetchall() metadata = {} for row in results: value = row['value'] @@ -461,8 +465,9 @@ def _get_logs(job_id): # warnings. job_id = six.text_type(job_id) - results = ENGINE.execute( - LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)).fetchall() + with ENGINE.connect() as conn: + results = conn.execute( + LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)).fetchall() results = [dict(result) for result in results] diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 40505931..9bb0de9b 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -15,19 +15,18 @@ from six.moves.urllib.parse import urlsplit import requests from rq import get_current_job +from rq.timeouts import JobTimeoutException import sqlalchemy as sa from ckan import model -from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config +from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h from . import db, loader from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError -from .utils import datastore_resource_exists, set_resource_metadata +from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url -try: - from ckan.lib.api_token import get_user_from_token -except ImportError: - get_user_from_token = None + +from ckan.lib.api_token import get_user_from_token log = logging.getLogger(__name__) @@ -51,6 +50,7 @@ # Retries can only occur in cases where the datastore entry exists, # so use the standard timeout RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600') +APITOKEN_HEADER_NAME = config.get('apitoken_header_name', 'Authorization') # input = { @@ -177,8 +177,14 @@ def xloader_data_into_datastore_(input, job_dict, logger): logger.info('Express Load starting: %s', resource_ckan_url) # check if the resource url_type is a datastore - if resource.get('url_type') == 'datastore': - logger.info('Ignoring resource - url_type=datastore - dump files are ' + if hasattr(h, "datastore_rw_resource_url_types"): + datastore_rw_resource_url_types = h.datastore_rw_resource_url_types() + else: + # fallback for 2.10.x or older. + datastore_rw_resource_url_types = ['datastore'] + + if resource.get('url_type') in datastore_rw_resource_url_types: + logger.info('Ignoring resource - R/W DataStore resources are ' 'managed with the Datastore API') return @@ -269,6 +275,13 @@ def tabulator_load(): logger.warning('Load using COPY failed: %s', e) logger.info('Trying again with tabulator') tabulator_load() + except JobTimeoutException: + try: + tmp_file.close() + except FileNotFoundError: + pass + logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) + raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) except FileCouldNotBeLoadedError as e: logger.warning('Loading excerpt for this format not supported.') logger.error('Loading file raised an error: %s', e) @@ -292,8 +305,9 @@ def _download_resource_data(resource, data, api_key, logger): data['datastore_contains_all_records_of_source_file'] = False which will be saved to the resource later on. ''' + # update base url (for possible local loopback) + url = modify_input_url(resource.get('url')) # check scheme - url = resource.get('url') url_parts = urlsplit(url) scheme = url_parts.scheme if scheme not in ('http', 'https', 'ftp'): @@ -319,7 +333,7 @@ def _download_resource_data(resource, data, api_key, logger): if resource.get('url_type') == 'upload': # If this is an uploaded file to CKAN, authenticate the request, # otherwise we won't get file from private resources - headers['Authorization'] = api_key + headers[APITOKEN_HEADER_NAME] = api_key # Add a constantly changing parameter to bypass URL caching. # If we're running XLoader, then either the resource has @@ -374,6 +388,7 @@ def _download_resource_data(resource, data, api_key, logger): response.close() data['datastore_contains_all_records_of_source_file'] = False except requests.exceptions.HTTPError as error: + tmp_file.close() # status code error logger.debug('HTTP error: %s', error) raise HTTPError( @@ -385,6 +400,7 @@ def _download_resource_data(resource, data, api_key, logger): raise JobError('Connection timed out after {}s'.format( DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: + tmp_file.close() try: err_message = str(e.reason) except AttributeError: @@ -393,6 +409,10 @@ def _download_resource_data(resource, data, api_key, logger): raise HTTPError( message=err_message, status_code=None, request_url=url, response=None) + except JobTimeoutException: + tmp_file.close() + logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) + raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) logger.info('Downloaded ok - %s', printable_file_size(length)) file_hash = m.hexdigest() @@ -465,12 +485,12 @@ def callback_xloader_hook(result_url, api_key, job_dict): if ':' in api_key: header, key = api_key.split(':') else: - header, key = 'Authorization', api_key + header, key = APITOKEN_HEADER_NAME, api_key headers[header] = key try: result = requests.post( - result_url, + modify_input_url(result_url), # modify with local config data=json.dumps(job_dict, cls=DatetimeJsonEncoder), verify=SSL_VERIFY, headers=headers) @@ -513,19 +533,9 @@ def update_resource(resource, patch_only=False): def _get_user_from_key(api_key_or_token): """ Gets the user using the API Token or API Key. - - This method provides backwards compatibility for CKAN 2.9 that - supported both methods and previous CKAN versions supporting - only API Keys. """ - user = None - if get_user_from_token: - user = get_user_from_token(api_key_or_token) - if not user: - user = model.Session.query(model.User).filter_by( - apikey=api_key_or_token - ).first() - return user + return get_user_from_token(api_key_or_token) + def get_resource_and_dataset(resource_id, api_key): diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index d803c0ae..82f56f19 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -14,6 +14,7 @@ from six.moves import zip from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException from unidecode import unidecode +import sqlalchemy as sa import ckan.plugins as p @@ -118,8 +119,8 @@ def _clear_datastore_resource(resource_id): ''' engine = get_write_engine() with engine.begin() as conn: - conn.execute("SET LOCAL lock_timeout = '15s'") - conn.execute('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id)) + conn.execute(sa.text("SET LOCAL lock_timeout = '15s'")) + conn.execute(sa.text('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id))) def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): @@ -171,17 +172,6 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): logger.info('Ensuring character coding is UTF8') f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False) try: - save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} - try: - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - skip_rows=skip_rows) as stream: - stream.save(**save_args) - except (EncodingError, UnicodeDecodeError): - with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, - skip_rows=skip_rows) as stream: - stream.save(**save_args) - csv_filepath = f_write.name - # datastore db connection engine = get_write_engine() @@ -189,10 +179,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): existing = datastore_resource_exists(resource_id) existing_info = {} if existing: - existing_fields = existing.get('fields', []) + if p.toolkit.check_ckan_version(min_version='2.11'): + ds_info = p.toolkit.get_action('datastore_info')({'ignore_auth': True}, {'id': resource_id}) + existing_fields = ds_info.get('fields', []) + else: + existing_fields = existing.get('fields', []) existing_info = dict((f['id'], f['info']) for f in existing_fields if 'info' in f) + existing_fields_by_headers = dict((f['id'], f) + for f in existing_fields) # Column types are either set (overridden) in the Data Dictionary page # or default to text type (which is robust) @@ -207,6 +203,8 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): for f in fields: if f['id'] in existing_info: f['info'] = existing_info[f['id']] + f['strip_extra_white'] = existing_info[f['id']].get('strip_extra_white') if 'strip_extra_white' in existing_info[f['id']] \ + else existing_fields_by_headers[f['id']].get('strip_extra_white', True) ''' Delete or truncate existing datastore table before proceeding, @@ -223,11 +221,36 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): else: fields = [ {'id': header_name, - 'type': 'text'} + 'type': 'text', + 'strip_extra_white': True} for header_name in headers] is_data_dict_populated = _is_data_dict_populated(fields, logger) logger.info('Fields: %s', fields) + def _make_whitespace_stripping_iter(super_iter): + def strip_white_space_iter(): + for row in super_iter(): + if len(row) == len(fields): + for _index, _cell in enumerate(row): + # only strip white space if strip_extra_white is True + if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str): + row[_index] = _cell.strip() + yield row + return strip_white_space_iter + + save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} + try: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result, + skip_rows=skip_rows) as stream: + stream.iter = _make_whitespace_stripping_iter(stream.iter) + stream.save(**save_args) + except (EncodingError, UnicodeDecodeError): + with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, + skip_rows=skip_rows) as stream: + stream.iter = _make_whitespace_stripping_iter(stream.iter) + stream.save(**save_args) + csv_filepath = f_write.name + # Create table from ckan import model context = {'model': model, 'ignore_auth': True} @@ -253,12 +276,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): except Exception as e: raise LoaderError('Could not create the database table: {}' .format(e)) - connection = context['connection'] = engine.connect() - # datstore_active is switched on by datastore_create - TODO temporarily - # disable it until the load is complete - _disable_fulltext_trigger(connection, resource_id) - _drop_indexes(context, data_dict, False) + # datastore_active is switched on by datastore_create + # TODO temporarily disable it until the load is complete + + with engine.begin() as conn: + _disable_fulltext_trigger(conn, resource_id) + + with engine.begin() as conn: + context['connection'] = conn + _drop_indexes(context, data_dict, False) logger.info('Copying to database...') @@ -276,9 +303,8 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): # 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids # the superuser issue. <-- picked - raw_connection = engine.raw_connection() - try: - cur = raw_connection.cursor() + with engine.begin() as conn: + cur = conn.connection.cursor() try: with open(csv_filepath, 'rb') as f: # can't use :param for table name because params are only @@ -308,15 +334,14 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): finally: cur.close() - finally: - raw_connection.commit() finally: os.remove(csv_filepath) # i.e. the tempfile logger.info('...copying done') logger.info('Creating search index...') - _populate_fulltext(connection, resource_id, fields=fields) + with engine.begin() as conn: + _populate_fulltext(conn, resource_id, fields=fields, logger=logger) logger.info('...search index created') return fields, is_data_dict_populated @@ -393,10 +418,16 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): existing = datastore_resource_exists(resource_id) existing_info = None if existing: - existing_fields = existing.get('fields', []) + if p.toolkit.check_ckan_version(min_version='2.11'): + ds_info = p.toolkit.get_action('datastore_info')({'ignore_auth': True}, {'id': resource_id}) + existing_fields = ds_info.get('fields', []) + else: + existing_fields = existing.get('fields', []) existing_info = dict( (f['id'], f['info']) for f in existing_fields if 'info' in f) + existing_fields_by_headers = dict((f['id'], f) + for f in existing_fields) # Some headers might have been converted from strings to floats and such. headers = encode_headers(headers) @@ -410,6 +441,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): strict_guessing = p.toolkit.asbool( config.get('ckanext.xloader.strict_type_guessing', True)) types = type_guess(stream.sample[1:], types=TYPES, strict=strict_guessing) + fields = [] # override with types user requested if existing_info: @@ -420,6 +452,12 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): 'timestamp': datetime.datetime, }.get(existing_info.get(h, {}).get('type_override'), t) for t, h in zip(types, headers)] + for h in headers: + fields.append(existing_fields_by_headers.get(h, {})) + else: + # default strip_extra_white + for h in headers: + fields.append({'strip_extra_white': True}) # Strip leading and trailing whitespace, then truncate to maximum length, # then strip again in case the truncation exposed a space. @@ -429,7 +467,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): if header and header.strip() ] header_count = len(headers) - type_converter = TypeConverter(types=types) + type_converter = TypeConverter(types=types, fields=fields) with UnknownEncodingStream(table_filepath, file_format, decoding_result, skip_rows=skip_rows, @@ -461,10 +499,16 @@ def row_iterator(): for h in headers_dicts: if h['id'] in existing_info: h['info'] = existing_info[h['id']] + h['strip_extra_white'] = existing_info[h['id']].get('strip_extra_white') if 'strip_extra_white' in existing_info[h['id']] \ + else existing_fields_by_headers[h['id']].get('strip_extra_white', True) # create columns with types user requested type_override = existing_info[h['id']].get('type_override') if type_override in list(_TYPE_MAPPING.values()): h['type'] = type_override + else: + # default strip_extra_white + for h in headers_dicts: + h['strip_extra_white'] = True # preserve any types that we have sniffed unless told otherwise _save_type_overrides(headers_dicts) @@ -596,9 +640,9 @@ def fulltext_function_exists(connection): https://github.com/ckan/ckan/pull/3786 or otherwise it is checked on startup of this plugin. ''' - res = connection.execute(''' + res = connection.execute(sa.text(''' select * from pg_proc where proname = 'populate_full_text_trigger'; - ''') + ''')) return bool(res.rowcount) @@ -607,53 +651,107 @@ def fulltext_trigger_exists(connection, resource_id): This will only be the case if your CKAN is new enough to have: https://github.com/ckan/ckan/pull/3786 ''' - res = connection.execute(''' + res = connection.execute(sa.text(''' SELECT pg_trigger.tgname FROM pg_class JOIN pg_trigger ON pg_class.oid=pg_trigger.tgrelid WHERE pg_class.relname={table} AND pg_trigger.tgname='zfulltext'; '''.format( - table=literal_string(resource_id))) + table=literal_string(resource_id)))) return bool(res.rowcount) def _disable_fulltext_trigger(connection, resource_id): - connection.execute('ALTER TABLE {table} DISABLE TRIGGER zfulltext;' - .format(table=identifier(resource_id))) + connection.execute(sa.text('ALTER TABLE {table} DISABLE TRIGGER zfulltext;' + .format(table=identifier(resource_id, True)))) def _enable_fulltext_trigger(connection, resource_id): - connection.execute('ALTER TABLE {table} ENABLE TRIGGER zfulltext;' - .format(table=identifier(resource_id))) - - -def _populate_fulltext(connection, resource_id, fields): - '''Populates the _full_text column. i.e. the same as datastore_run_triggers - but it runs in 1/9 of the time. - - The downside is that it reimplements the code that calculates the text to - index, breaking DRY. And its annoying to pass in the column names. - - fields: list of dicts giving the each column's 'id' (name) and 'type' - (text/numeric/timestamp) + connection.execute(sa.text( + 'ALTER TABLE {table} ENABLE TRIGGER zfulltext;' + .format(table=identifier(resource_id, True)))) + + +def _get_rows_count_of_resource(connection, table): + count_query = ''' SELECT count(_id) from {table} '''.format(table=table) + results = connection.execute(count_query) + rows_count = int(results.first()[0]) + return rows_count + + +def _populate_fulltext(connection, resource_id, fields, logger): + '''Populates the _full_text column for full-text search functionality. + + This function creates a PostgreSQL tsvector (text search vector) for each row + by concatenating all non-system columns. It's equivalent to datastore_run_triggers + but runs approximately 9x faster by using direct SQL updates. + + To handle very large datasets (e.g., 4GB+ files with millions of rows), the update + operation is partitioned into chunks to prevent: + - Database statement timeouts + - Memory exhaustion + - Lock contention that could block other operations + - Transaction log overflow + + The chunking mechanism processes rows in batches based on their _id values, + with chunk size configurable via 'ckanext.xloader.search_update_chunks' + (default: 100,000 rows per chunk). + + Args: + connection: Database connection object + resource_id (str): The datastore table identifier + fields (list): List of dicts with column 'id' (name) and 'type' + (text/numeric/timestamp) + logger: Logger instance for progress tracking + + Note: + This reimplements CKAN's text indexing logic for performance, + breaking DRY principle but providing significant speed improvements. ''' - sql = \ - u''' - UPDATE {table} - SET _full_text = to_tsvector({cols}); - '''.format( - # coalesce copes with blank cells - table=identifier(resource_id), - cols=" || ' ' || ".join( - 'coalesce({}, \'\')'.format( - identifier(field['id']) - + ('::text' if field['type'] != 'text' else '') - ) - for field in fields - if not field['id'].startswith('_') - ) - ) - connection.execute(sql) + try: + # Get total row count to determine chunking strategy + rows_count = _get_rows_count_of_resource(connection, identifier(resource_id)) + except Exception as e: + rows_count = '' + logger.info("Failed to get resource rows count: {} ".format(str(e))) + raise + + if rows_count: + # Configure chunk size - prevents timeouts and memory issues on large datasets + # Default 100,000 rows per chunk balances performance vs. resource usage + chunks = int(config.get('ckanext.xloader.search_update_chunks', 100000)) + + # Process table in chunks using _id range queries + # This approach ensures consistent chunk sizes and allows resuming if interrupted + for start in range(0, rows_count, chunks): + try: + # Build SQL to update _full_text column with concatenated searchable content + sql = \ + ''' + UPDATE {table} + SET _full_text = to_tsvector({cols}) WHERE _id BETWEEN {first} and {end}; + '''.format( + table=identifier(resource_id), + # Concatenate all user columns (excluding system columns starting with '_') + # coalesce() handles NULL values by converting them to empty strings + cols=" || ' ' || ".join( + 'coalesce({}, \'\')'.format( + identifier(field['id']) + + ('::text' if field['type'] != 'text' else '') # Cast non-text types + ) + for field in fields + if not field['id'].startswith('_') # Skip system columns like _id, _full_text + ), + first=start, + end=start + chunks + ) + connection.execute(sql) + logger.info("Indexed rows {first} to {end} of {total}".format( + first=start, end=min(start + chunks, rows_count), total=rows_count)) + except Exception as e: + # Log chunk-specific errors but continue processing remaining chunks + logger.error("Failed to index rows {first}-{end}: {error}".format( + first=start, end=start + chunks, error=str(e))) def calculate_record_count(resource_id, logger): @@ -665,15 +763,18 @@ def calculate_record_count(resource_id, logger): logger.info('Calculating record count (running ANALYZE on the table)') engine = get_write_engine() conn = engine.connect() - conn.execute("ANALYZE \"{resource_id}\";" - .format(resource_id=resource_id)) + conn.execute(sa.text("ANALYZE \"{resource_id}\";" + .format(resource_id=resource_id))) -def identifier(s): +def identifier(s, escape_binds=False): # "%" needs to be escaped, otherwise connection.execute thinks it is for # substituting a bind parameter - return u'"' + s.replace(u'"', u'""').replace(u'\0', '').replace('%', '%%')\ - + u'"' + escaped = s.replace(u'"', u'""').replace(u'\0', '') + if escape_binds: + escaped = escaped.replace('%', '%%') + + return u'"' + escaped + u'"' def literal_string(s): diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py index 11e756cd..d57b7313 100644 --- a/ckanext/xloader/parser.py +++ b/ckanext/xloader/parser.py @@ -18,8 +18,9 @@ class TypeConverter: as desired. """ - def __init__(self, types=None): + def __init__(self, types=None, fields=None): self.types = types + self.fields = fields def convert_types(self, extended_rows): """ Try converting cells to numbers or timestamps if applicable. @@ -31,7 +32,16 @@ def convert_types(self, extended_rows): for cell_index, cell_value in enumerate(row): if cell_value is None: row[cell_index] = '' + if self.fields: + # only strip white space if strip_extra_white is True + if self.fields[cell_index].get('info', {}).get('strip_extra_white', True) and isinstance(cell_value, six.text_type): + cell_value = cell_value.strip() + row[cell_index] = cell_value.strip() if not cell_value: + # load_csv parody: empty of string type should be None + if self.types and self.types[cell_index] == six.text_type: + cell_value = None + row[cell_index] = None continue cell_type = self.types[cell_index] if self.types else None if cell_type in [Decimal, None]: diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 0d023553..5eb70153 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -17,13 +17,13 @@ except ImportError: HAS_IPIPE_VALIDATION = False -try: - config_declarations = toolkit.blanket.config_declarations -except AttributeError: - # CKAN 2.9 does not have config_declarations. - # Remove when dropping support. - def config_declarations(cls): - return cls +config_declarations = toolkit.blanket.config_declarations + +if toolkit.check_ckan_version(min_version='2.11'): + from ckanext.datastore.interfaces import IDataDictionaryForm + has_idata_dictionary_form = True +else: + has_idata_dictionary_form = False log = logging.getLogger(__name__) @@ -39,6 +39,8 @@ class xloaderPlugin(plugins.SingletonPlugin): plugins.implements(plugins.IResourceController, inherit=True) plugins.implements(plugins.IClick) plugins.implements(plugins.IBlueprint) + if has_idata_dictionary_form: + plugins.implements(IDataDictionaryForm, inherit=True) if HAS_IPIPE_VALIDATION: plugins.implements(IPipeValidation) @@ -68,14 +70,6 @@ def configure(self, config_): else: self.ignore_hash = False - for config_option in ("ckan.site_url",): - if not config_.get(config_option): - raise Exception( - "Config option `{0}` must be set to use ckanext-xloader.".format( - config_option - ) - ) - # IPipeValidation def receive_validation_report(self, validation_report): @@ -83,9 +77,9 @@ def receive_validation_report(self, validation_report): res_dict = toolkit.get_action('resource_show')({'ignore_auth': True}, {'id': validation_report.get('resource_id')}) if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True)) - or res_dict.get('schema', None)) and validation_report.get('status') != 'success': - # A schema is present, or required to be present - return + or res_dict.get('schema', None)) and validation_report.get('status') != 'success': + # A schema is present, or required to be present + return # if validation is running in async mode, it is running from the redis workers. # thus we need to do sync=True to have Xloader put the job at the front of the queue. sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True)) @@ -183,7 +177,7 @@ def before_show(self, resource_dict): def after_update(self, context, resource_dict): self.after_resource_update(context, resource_dict) - def _submit_to_xloader(self, resource_dict): + def _submit_to_xloader(self, resource_dict, sync=False): context = {"ignore_auth": True, "defer_commit": True} resource_format = resource_dict.get("format") if not XLoaderFormats.is_it_an_xloader_format(resource_format): @@ -203,14 +197,20 @@ def _submit_to_xloader(self, resource_dict): return try: - log.debug( - "Submitting resource %s to be xloadered", resource_dict["id"] - ) + if sync: + log.debug( + "xloadering resource %s in sync mode", resource_dict["id"] + ) + else: + log.debug( + "Submitting resource %s to be xloadered", resource_dict["id"] + ) toolkit.get_action("xloader_submit")( context, { "resource_id": resource_dict["id"], "ignore_hash": self.ignore_hash, + "sync": sync, }, ) except toolkit.ValidationError as e: @@ -246,6 +246,23 @@ def get_helpers(self): "xloader_badge": xloader_helpers.xloader_badge, } + # IDataDictionaryForm + + def update_datastore_create_schema(self, schema): + default = toolkit.get_validator('default') + boolean_validator = toolkit.get_validator('boolean_validator') + to_datastore_plugin_data = toolkit.get_validator('to_datastore_plugin_data') + schema['fields']['strip_extra_white'] = [default(True), boolean_validator, to_datastore_plugin_data('xloader')] + return schema + + def update_datastore_info_field(self, field, plugin_data): + # expose all our non-secret plugin data in the field + field.update(plugin_data.get('xloader', {})) + # CKAN version parody + if '_info' in plugin_data: + field.update({'info': plugin_data['_info']}) + return field + def _should_remove_unsupported_resource_from_datastore(res_dict): if not toolkit.asbool(toolkit.config.get('ckanext.xloader.clean_datastore_tables', False)): diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index c0e8d938..dcedfd5f 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -16,11 +16,8 @@ boolean_validator = get_validator('boolean_validator') int_validator = get_validator('int_validator') OneOf = get_validator('OneOf') - -if p.toolkit.check_ckan_version('2.9'): - unicode_safe = get_validator('unicode_safe') -else: - unicode_safe = str +ignore_not_sysadmin = get_validator('ignore_not_sysadmin') +unicode_safe = get_validator('unicode_safe') def xloader_submit_schema(): @@ -29,6 +26,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], + 'sync': [ignore_missing, boolean_validator, ignore_not_sysadmin], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/templates/datastore/snippets/dictionary_form.html b/ckanext/xloader/templates/datastore/snippets/dictionary_form.html new file mode 100644 index 00000000..6c2f576c --- /dev/null +++ b/ckanext/xloader/templates/datastore/snippets/dictionary_form.html @@ -0,0 +1,17 @@ +{% ckan_extends %} +{% import 'macros/form.html' as form %} + +{% block additional_fields %} + {{ super() }} + {% if h.check_ckan_version(min_version='2.11') %} + {% set field_prefix = 'fields__' %} + {% else %} + {% set field_prefix = 'info__' %} + {% endif %} + {% set selected_value = field.get('info', {}).get('strip_extra_white', field.get('strip_extra_white', true)) %} + {{ form.select(field_prefix ~ position ~ '__strip_extra_white', + label=_('Strip Extra Leading and Trailing White Space'), options=[ + {'text': _('Yes'), 'value': true}, + {'text': _('No'), 'value': false}, + ], selected=selected_value) }} +{% endblock %} diff --git a/ckanext/xloader/tests/samples/boston_311_sample.csv b/ckanext/xloader/tests/samples/boston_311_sample.csv index 83e0d5f2..e3a7e5be 100644 --- a/ckanext/xloader/tests/samples/boston_311_sample.csv +++ b/ckanext/xloader/tests/samples/boston_311_sample.csv @@ -1,4 +1,4 @@ -CASE_ENQUIRY_ID,open_dt,target_dt,closed_dt,OnTime_Status,CASE_STATUS,CLOSURE_REASON,CASE_TITLE,SUBJECT,REASON,TYPE,QUEUE,Department,SubmittedPhoto,ClosedPhoto,Location,Fire_district,pwd_district,city_council_district,police_district,neighborhood,neighborhood_services_district,ward,precinct,LOCATION_STREET_NAME,LOCATION_ZIPCODE,Latitude,Longitude,Source -101002153891,2017-07-06 23:38:43,2017-07-21 08:30:00,,ONTIME,Open, ,Street Light Outages,Public Works Department,Street Lights,Street Light Outages,PWDx_Street Light Outages,PWDx,,,480 Harvard St Dorchester MA 02124,8,07,4,B3,Greater Mattapan,9,Ward 14,1411,480 Harvard St,02124,42.288,-71.0927,Citizens Connect App -101002153890,2017-07-06 23:29:13,2017-09-11 08:30:00,,ONTIME,Open, ,Graffiti Removal,Property Management,Graffiti,Graffiti Removal,PROP_GRAF_GraffitiRemoval,PROP, https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg,,522 Saratoga St East Boston MA 02128,1,09,1,A7,East Boston,1,Ward 1,0110,522 Saratoga St,02128,42.3807,-71.0259,Citizens Connect App -101002153889,2017-07-06 23:24:20,2017-09-11 08:30:00,,ONTIME,Open, ,Graffiti Removal,Property Management,Graffiti,Graffiti Removal,PROP_GRAF_GraffitiRemoval,PROP, https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg,,965 Bennington St East Boston MA 02128,1,09,1,A7,East Boston,1,Ward 1,0112,965 Bennington St,02128,42.386,-71.008,Citizens Connect App +CASE_ENQUIRY_ID,open_dt,target_dt,closed_dt,OnTime_Status,CASE_STATUS,CLOSURE_REASON,CASE_TITLE,SUBJECT,REASON,TYPE,QUEUE,Department,SubmittedPhoto,ClosedPhoto,Location,Fire_district,pwd_district,city_council_district,police_district,neighborhood,neighborhood_services_district,ward,precinct,LOCATION_STREET_NAME,LOCATION_ZIPCODE,Latitude,Longitude,Source +101002153891,2017-07-06 23:38:43,2017-07-21 08:30:00,,ONTIME,Open, ,Street Light Outages,Public Works Department ,Street Lights,Street Light Outages,PWDx_Street Light Outages,PWDx,,,480 Harvard St Dorchester MA 02124,8,07,4,B3,Greater Mattapan,9,Ward 14,1411,480 Harvard St,02124,42.288,-71.0927,Citizens Connect App +101002153890,2017-07-06 23:29:13,2017-09-11 08:30:00,,ONTIME,Open, ,Graffiti Removal,Property Management,Graffiti,Graffiti Removal,PROP_GRAF_GraffitiRemoval,PROP, https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg,,522 Saratoga St East Boston MA 02128,1,09,1,A7,East Boston,1,Ward 1,0110,522 Saratoga St,02128,42.3807,-71.0259,Citizens Connect App +101002153889,2017-07-06 23:24:20,2017-09-11 08:30:00,,ONTIME,Open, ,Graffiti Removal,Property Management,Graffiti,Graffiti Removal,PROP_GRAF_GraffitiRemoval,PROP, https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg,,965 Bennington St East Boston MA 02128,1,09,1,A7,East Boston,1,Ward 1,0112,965 Bennington St,02128,42.386,-71.008,Citizens Connect App diff --git a/ckanext/xloader/tests/test_action.py b/ckanext/xloader/tests/test_action.py index 8b0e2729..4497ce98 100644 --- a/ckanext/xloader/tests/test_action.py +++ b/ckanext/xloader/tests/test_action.py @@ -1,4 +1,5 @@ import pytest +from ckan.plugins import toolkit try: from unittest import mock except ImportError: @@ -117,10 +118,24 @@ def test_status(self): assert status["status"] == "pending" - def test_xloader_user_api_token_defaults_to_site_user_apikey(self): - api_token = get_xloader_user_apitoken() - site_user = helpers.call_action("get_site_user") - assert api_token == site_user["apikey"] + + def test_xloader_user_api_token_from_config(self): + sysadmin = factories.SysadminWithToken() + apikey = sysadmin["token"] + with mock.patch.dict(toolkit.config, {'ckanext.xloader.api_token': apikey}): + api_token = get_xloader_user_apitoken() + assert api_token == apikey + + @pytest.mark.ckan_config("ckanext.xloader.api_token", "NOT_SET") + def test_xloader_user_api_token_from_config_should_throw_exceptio_when_not_set(self): + + hasNotThrownException = True + try: + get_xloader_user_apitoken() + except Exception: + hasNotThrownException = False + + assert not hasNotThrownException @pytest.mark.ckan_config("ckanext.xloader.api_token", "random-api-token") def test_xloader_user_api_token(self): diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index e819dad9..f23e7821 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -1,5 +1,6 @@ import pytest import io +import os from datetime import datetime @@ -16,6 +17,7 @@ _TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" +_TEST_LARGE_FILE_CONTENT = "\n1,2\n2,4\n3,6\n4,8\n5,10" def get_response(download_url, headers): @@ -34,15 +36,22 @@ def get_large_response(download_url, headers): return resp +def get_large_data_response(download_url, headers): + """Mock jobs.get_response() method.""" + resp = Response() + f_content = _TEST_FILE_CONTENT + (_TEST_LARGE_FILE_CONTENT * 500000) + resp.raw = io.BytesIO(f_content.encode()) + resp.headers = headers + return resp + + +def _get_temp_files(dir='/tmp'): + return [os.path.join(dir, f) for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f))] + + @pytest.fixture def apikey(): - if toolkit.check_ckan_version(min_version="2.10"): - sysadmin = factories.SysadminWithToken() - else: - # To provide support with CKAN 2.9 - sysadmin = factories.Sysadmin() - sysadmin["token"] = get_xloader_user_apitoken() - + sysadmin = factories.SysadminWithToken() return sysadmin["token"] @@ -74,6 +83,8 @@ def data(create_with_upload, apikey): @pytest.mark.usefixtures("clean_db", "with_plugins") +@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 2) +@pytest.mark.ckan_config("ckan.jobs.timeout", 2) class TestXLoaderJobs(helpers.FunctionalRQTestBase): def test_xloader_data_into_datastore(self, cli, data): @@ -81,7 +92,7 @@ def test_xloader_data_into_datastore(self, cli, data): with mock.patch("ckanext.xloader.jobs.get_response", get_response): stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output assert "File hash: d44fa65eda3675e11710682fdb5f1648" in stdout - assert "Fields: [{'id': 'x', 'type': 'text'}, {'id': 'y', 'type': 'text'}]" in stdout + assert "Fields: [{'id': 'x', 'type': 'text', 'strip_extra_white': True}, {'id': 'y', 'type': 'text', 'strip_extra_white': True}]" in stdout assert "Copying to database..." in stdout assert "Creating search index..." in stdout assert "Express Load completed" in stdout @@ -89,6 +100,63 @@ def test_xloader_data_into_datastore(self, cli, data): resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] + # Set the ckanext.xloader.site_url in the config + @pytest.mark.ckan_config("ckanext.xloader.site_url", 'http://xloader-site-url') + def test_download_resource_data_with_ckanext_xloader_site_url(self, cli, data): + + data['metadata']['original_url'] = 'http://xloader-site-url/resource.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + @pytest.mark.ckan_config("ckanext.site_url", 'http://ckan-site-url') + def test_download_resource_data_with_ckan_site_url(self, cli, data): + data['metadata']['original_url'] = 'http://ckan-site-url/resource.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + @pytest.mark.ckan_config("ckanext.site_url", 'http://ckan-site-url') + def test_download_resource_data_with_different_original_url(self, cli, data): + data['metadata']['original_url'] = 'http://external-site-url/resource.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + @pytest.mark.ckan_config("ckanext.xloader.site_url", 'http://xloader-site-url') + def test_callback_xloader_hook_with_ckanext_xloader_site_url(self, cli, data): + data['result_url'] = 'http://xloader-site-url/api/3/action/xloader_hook' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + @pytest.mark.ckan_config("ckanext.site_url", 'http://ckan-site-url') + def test_callback_xloader_hook_with_ckan_site_url(self, cli, data): + data['result_url'] = 'http://ckan-site-url/api/3/action/xloader_hook' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + def test_xloader_ignore_hash(self, cli, data): self.enqueue(jobs.xloader_data_into_datastore, [data]) with mock.patch("ckanext.xloader.jobs.get_response", get_response): @@ -123,6 +191,16 @@ def test_data_max_excerpt_lines_config(self, cli, data): resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] is False + def test_data_with_rq_job_timeout(self, cli, data): + file_suffix = 'multiplication_2.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=2)) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_data_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Job timed out after" in stdout + for f in _get_temp_files(): + # make sure that the tmp file has been closed/deleted in job timeout exception handling + assert file_suffix not in f + @pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object): diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index 99bd8292..caff02f7 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -4,6 +4,7 @@ import os import pytest import six +import sqlalchemy as sa import sqlalchemy.orm as orm import datetime import logging @@ -47,17 +48,16 @@ def _get_records( c = Session.connection() if exclude_full_text_column: cols = self._get_column_names(Session, table_name) - cols = ", ".join( - loader.identifier(col) for col in cols if col != "_full_text" - ) + cols = [ + sa.column(col) for col in cols if col != "_full_text" + ] else: - cols = "*" - sql = 'SELECT {cols} FROM "{table_name}"'.format( - cols=cols, table_name=table_name - ) + cols = [sa.text("*")] + stmt = sa.select(*cols).select_from(sa.table(table_name)) + if limit is not None: - sql += " LIMIT {}".format(limit) - results = c.execute(sql) + stmt = stmt.limit(limit) + results = c.execute(stmt) return results.fetchall() def _get_column_names(self, Session, table_name): @@ -71,7 +71,7 @@ def _get_column_names(self, Session, table_name): ORDER BY ordinal_position; """.format(table_name) ) - results = c.execute(sql) + results = c.execute(sa.text(sql)) records = results.fetchall() return [r[0] for r in records] @@ -85,7 +85,7 @@ def _get_column_types(self, Session, table_name): ORDER BY ordinal_position; """.format(table_name) ) - results = c.execute(sql) + results = c.execute(sa.text(sql)) records = results.fetchall() return [r[0] for r in records] @@ -102,6 +102,20 @@ def test_simple(self, Session): logger=logger, ) assert is_data_dict_populated == False + records = self._get_records(Session, resource_id) + print(self._get_column_names(Session, resource_id)) + assert self._get_column_names(Session, resource_id) == [ + u"_id", + u"_full_text", + u"date", + u"temperature", + u"place", + ] + print(self._get_column_types(Session, resource_id)) + assert self._get_column_types(Session, resource_id) == [ + u"int4", + u"tsvector", + ] + [u"text"] * (len(records[0]) - 1) assert self._get_records( Session, resource_id, limit=1, exclude_full_text_column=False ) == [ @@ -113,7 +127,8 @@ def test_simple(self, Session): u"Galway", ) ] - assert self._get_records(Session, resource_id) == [ + print(records) + assert records == [ (1, u"2011-01-01", u"1", u"Galway"), (2, u"2011-01-02", u"-1", u"Galway"), (3, u"2011-01-03", u"0", u"Galway"), @@ -121,20 +136,6 @@ def test_simple(self, Session): (5, None, None, u"Berkeley"), (6, u"2011-01-03", u"5", None), ] - assert self._get_column_names(Session, resource_id) == [ - u"_id", - u"_full_text", - u"date", - u"temperature", - u"place", - ] - assert self._get_column_types(Session, resource_id) == [ - u"int4", - u"tsvector", - u"text", - u"text", - u"text", - ] def test_simple_with_indexing(self, Session): csv_filepath = get_sample_filepath("simple.csv") @@ -218,6 +219,45 @@ def test_boston_311(self, Session): ) records = self._get_records(Session, resource_id) + print(self._get_column_names(Session, resource_id)) + assert self._get_column_names(Session, resource_id) == [ + u"_id", + u"_full_text", + u"CASE_ENQUIRY_ID", + u"open_dt", + u"target_dt", + u"closed_dt", + u"OnTime_Status", + u"CASE_STATUS", + u"CLOSURE_REASON", + u"CASE_TITLE", + u"SUBJECT", + u"REASON", + u"TYPE", + u"QUEUE", + u"Department", + u"SubmittedPhoto", + u"ClosedPhoto", + u"Location", + u"Fire_district", + u"pwd_district", + u"city_council_district", + u"police_district", + u"neighborhood", + u"neighborhood_services_district", + u"ward", + u"precinct", + u"LOCATION_STREET_NAME", + u"LOCATION_ZIPCODE", + u"Latitude", + u"Longitude", + u"Source", + ] # noqa + print(self._get_column_types(Session, resource_id)) + assert self._get_column_types(Session, resource_id) == [ + u"int4", + u"tsvector", + ] + [u"text"] * (len(records[0]) - 1) print(records) assert records == [ ( @@ -228,9 +268,9 @@ def test_boston_311(self, Session): None, u"ONTIME", u"Open", - u" ", + None, # " " transforms to None u"Street Light Outages", - u"Public Works Department", + u"Public Works Department", # " " trailing whitespace gets trimmed u"Street Lights", u"Street Light Outages", u"PWDx_Street Light Outages", @@ -260,14 +300,14 @@ def test_boston_311(self, Session): None, u"ONTIME", u"Open", - u" ", + None, # " " transforms to None u"Graffiti Removal", u"Property Management", u"Graffiti", u"Graffiti Removal", u"PROP_GRAF_GraffitiRemoval", u"PROP", - u" https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg", + u"https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg", # strip white spaces None, u"522 Saratoga St East Boston MA 02128", u"1", @@ -292,14 +332,14 @@ def test_boston_311(self, Session): None, u"ONTIME", u"Open", - u" ", + None, # " " transforms to None u"Graffiti Removal", u"Property Management", u"Graffiti", u"Graffiti Removal", u"PROP_GRAF_GraffitiRemoval", u"PROP", - u" https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg", + u"https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg", # strip white spaces None, u"965 Bennington St East Boston MA 02128", u"1", @@ -317,45 +357,6 @@ def test_boston_311(self, Session): u"Citizens Connect App", ), ] # noqa - print(self._get_column_names(Session, resource_id)) - assert self._get_column_names(Session, resource_id) == [ - u"_id", - u"_full_text", - u"CASE_ENQUIRY_ID", - u"open_dt", - u"target_dt", - u"closed_dt", - u"OnTime_Status", - u"CASE_STATUS", - u"CLOSURE_REASON", - u"CASE_TITLE", - u"SUBJECT", - u"REASON", - u"TYPE", - u"QUEUE", - u"Department", - u"SubmittedPhoto", - u"ClosedPhoto", - u"Location", - u"Fire_district", - u"pwd_district", - u"city_council_district", - u"police_district", - u"neighborhood", - u"neighborhood_services_district", - u"ward", - u"precinct", - u"LOCATION_STREET_NAME", - u"LOCATION_ZIPCODE", - u"Latitude", - u"Longitude", - u"Source", - ] # noqa - print(self._get_column_types(Session, resource_id)) - assert self._get_column_types(Session, resource_id) == [ - u"int4", - u"tsvector", - ] + [u"text"] * (len(records[0]) - 1) def test_brazilian(self, Session): csv_filepath = get_sample_filepath("brazilian_sample.csv") @@ -369,105 +370,6 @@ def test_brazilian(self, Session): ) records = self._get_records(Session, resource_id) - print(records) - assert records[0] == ( - 1, - u"01/01/1996 12:00:00 AM", - u"1100015", - u"ALTA FLORESTA D'OESTE", - u"RO", - None, - u"128", - u"0", - u"8", - u"119", - u"1", - u"0", - u"3613", - u"3051", - u"130", - u"7", - u"121", - u"3716", - u"3078", - u"127", - u"7", - None, - None, - None, - None, - u"6794", - u"5036", - u"1758", - None, - None, - None, - None, - None, - None, - u"337", - u"0.26112759", - u"0.17210683", - u"0.43323442", - u"0.13353115", - u"24.833692447908199", - None, - None, - u"22.704964", - u"67.080006197818605", - u"65.144188573097907", - u"74.672390253375497", - u"16.7913561569619", - u"19.4894563570641", - u"8.649237411458509", - u"7.60165422117368", - u"11.1540090366186", - u"17.263407056738099", - u"8.5269823", - u"9.2213373", - u"5.3085136", - u"52.472769803217503", - None, - None, - None, - None, - None, - None, - u"25.0011414302354", - u"22.830887000000001", - u"66.8150490097632", - u"64.893674212235595", - u"74.288246611754104", - u"17.0725384713319", - u"19.8404105332814", - u"8.856561911292371", - u"7.74275834336647", - u"11.357671741889", - u"17.9410577459881", - u"8.3696527", - u"8.9979973", - u"5.0570836", - u"53.286314230720798", - None, - None, - None, - None, - None, - u"122988", - None, - u"10.155015000000001", - u"14.826086999999999", - u"11.671533", - u"9.072917", - None, - None, - None, - None, - None, - None, - None, - None, - ) # noqa print(self._get_column_names(Session, resource_id)) assert self._get_column_names(Session, resource_id) == [ u"_id", @@ -573,6 +475,105 @@ def test_brazilian(self, Session): u"int4", u"tsvector", ] + [u"text"] * (len(records[0]) - 1) + print(records) + assert records[0] == ( + 1, + u"01/01/1996 12:00:00 AM", + u"1100015", + u"ALTA FLORESTA D'OESTE", + u"RO", + None, + u"128", + u"0", + u"8", + u"119", + u"1", + u"0", + u"3613", + u"3051", + u"130", + u"7", + u"121", + u"3716", + u"3078", + u"127", + u"7", + None, + None, + None, + None, + u"6794", + u"5036", + u"1758", + None, + None, + None, + None, + None, + None, + u"337", + u"0.26112759", + u"0.17210683", + u"0.43323442", + u"0.13353115", + u"24.833692447908199", + None, + None, + u"22.704964", + u"67.080006197818605", + u"65.144188573097907", + u"74.672390253375497", + u"16.7913561569619", + u"19.4894563570641", + u"8.649237411458509", + u"7.60165422117368", + u"11.1540090366186", + u"17.263407056738099", + u"8.5269823", + u"9.2213373", + u"5.3085136", + u"52.472769803217503", + None, + None, + None, + None, + None, + None, + u"25.0011414302354", + u"22.830887000000001", + u"66.8150490097632", + u"64.893674212235595", + u"74.288246611754104", + u"17.0725384713319", + u"19.8404105332814", + u"8.856561911292371", + u"7.74275834336647", + u"11.357671741889", + u"17.9410577459881", + u"8.3696527", + u"8.9979973", + u"5.0570836", + u"53.286314230720798", + None, + None, + None, + None, + None, + u"122988", + None, + u"10.155015000000001", + u"14.826086999999999", + u"11.671533", + u"9.072917", + None, + None, + None, + None, + None, + None, + None, + None, + ) # noqa def test_german(self, Session): csv_filepath = get_sample_filepath("german_sample.csv") @@ -586,20 +587,6 @@ def test_german(self, Session): ) records = self._get_records(Session, resource_id) - print(records) - assert records[0] == ( - 1, - u"Zürich", - u"68260", - u"65444", - u"62646", - u"6503", - u"28800", - u"1173", - u"6891", - u"24221", - u"672", - ) print(self._get_column_names(Session, resource_id)) assert self._get_column_names(Session, resource_id) == [ u"_id", @@ -620,6 +607,20 @@ def test_german(self, Session): u"int4", u"tsvector", ] + [u"text"] * (len(records[0]) - 1) + print(records) + assert records[0] == ( + 1, + u"Zürich", + u"68260", + u"65444", + u"62646", + u"6503", + u"28800", + u"1173", + u"6891", + u"24221", + u"672", + ) def test_with_blanks(self, Session): csv_filepath = get_sample_filepath("sample_with_blanks.csv") @@ -700,7 +701,6 @@ def test_reload(self, Session): logger=logger, ) - assert len(self._get_records(Session, resource_id)) == 6 assert self._get_column_names(Session, resource_id) == [ u"_id", u"_full_text", @@ -715,6 +715,7 @@ def test_reload(self, Session): u"text", u"text", ] + assert len(self._get_records(Session, resource_id)) == 6 @pytest.mark.skipif( not p.toolkit.check_ckan_version(min_version="2.7"), @@ -754,7 +755,6 @@ def test_reload_with_overridden_types(self, Session): ) assert is_data_dict_populated == True - assert len(self._get_records(Session, resource_id)) == 6 assert self._get_column_names(Session, resource_id) == [ u"_id", u"_full_text", @@ -769,6 +769,7 @@ def test_reload_with_overridden_types(self, Session): u"numeric", u"text", ] + assert len(self._get_records(Session, resource_id)) == 6 # check that rows with nulls are indexed correctly records = self._get_records( @@ -818,6 +819,181 @@ def test_column_names(self, Session): u"Galway", ) + def test_load_with_no_strip_white(self, Session): + csv_filepath = get_sample_filepath("boston_311_sample.csv") + resource = factories.Resource() + resource_id = resource['id'] + loader.load_csv( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + + # Change strip_extra_white, as it would be done by Data Dictionary + rec = p.toolkit.get_action("datastore_search")( + None, {"resource_id": resource_id, "limit": 0} + ) + fields = [f for f in rec["fields"] if not f["id"].startswith("_")] + for field in fields: + field["info"] = {"strip_extra_white": False} # <=2.10 + field["strip_extra_white"] = False # >=2.11 + p.toolkit.get_action("datastore_create")( + {"ignore_auth": True}, + {"resource_id": resource_id, "force": True, "fields": fields}, + ) + + # Load it again with new strip_extra_white + fields = loader.load_csv( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + loader.create_column_indexes( + fields=fields, resource_id=resource_id, logger=logger + ) + + records = self._get_records(Session, resource_id) + print(self._get_column_names(Session, resource_id)) + assert self._get_column_names(Session, resource_id) == [ + u"_id", + u"_full_text", + u"CASE_ENQUIRY_ID", + u"open_dt", + u"target_dt", + u"closed_dt", + u"OnTime_Status", + u"CASE_STATUS", + u"CLOSURE_REASON", + u"CASE_TITLE", + u"SUBJECT", + u"REASON", + u"TYPE", + u"QUEUE", + u"Department", + u"SubmittedPhoto", + u"ClosedPhoto", + u"Location", + u"Fire_district", + u"pwd_district", + u"city_council_district", + u"police_district", + u"neighborhood", + u"neighborhood_services_district", + u"ward", + u"precinct", + u"LOCATION_STREET_NAME", + u"LOCATION_ZIPCODE", + u"Latitude", + u"Longitude", + u"Source", + ] # noqa + print(self._get_column_types(Session, resource_id)) + assert self._get_column_types(Session, resource_id) == [ + u"int4", + u"tsvector", + ] + [u"text"] * (len(records[0]) - 1) + print(records) + assert records == [ + ( + 1, # ds auto increment id / primary key + u"101002153891", + u"2017-07-06 23:38:43", + u"2017-07-21 08:30:00", + None, + u"ONTIME", + u"Open", + u" ", # no strip_extra_white + u"Street Light Outages", + u"Public Works Department ", # no strip_extra_white + u"Street Lights", + u"Street Light Outages", + u"PWDx_Street Light Outages", + u"PWDx", + None, + None, + u"480 Harvard St Dorchester MA 02124", + u"8", + u"07", + u"4", + u"B3", + u"Greater Mattapan", + u"9", + u"Ward 14", + u"1411", + u"480 Harvard St", + u"02124", + u"42.288", + u"-71.0927", + u"Citizens Connect App", + ), # noqa + ( + 2, # ds auto increment id / primary key + u"101002153890", + u"2017-07-06 23:29:13", + u"2017-09-11 08:30:00", + None, + u"ONTIME", + u"Open", + u" ", # no strip_extra_white + u"Graffiti Removal", + u"Property Management", + u"Graffiti", + u"Graffiti Removal", + u"PROP_GRAF_GraffitiRemoval", + u"PROP", + u" https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg", # no strip_extra_white + None, + u"522 Saratoga St East Boston MA 02128", + u"1", + u"09", + u"1", + u"A7", + u"East Boston", + u"1", + u"Ward 1", + u"0110", + u"522 Saratoga St", + u"02128", + u"42.3807", + u"-71.0259", + u"Citizens Connect App", + ), # noqa + ( + 3, # ds auto increment id / primary key + u"101002153889", + u"2017-07-06 23:24:20", + u"2017-09-11 08:30:00", + None, + u"ONTIME", + u"Open", + u" ", # no strip_extra_white + u"Graffiti Removal", + u"Property Management", + u"Graffiti", + u"Graffiti Removal", + u"PROP_GRAF_GraffitiRemoval", + u"PROP", + u" https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg", # no strip_extra_white + None, + u"965 Bennington St East Boston MA 02128", + u"1", + u"09", + u"1", + u"A7", + u"East Boston", + u"1", + u"Ward 1", + u"0112", + u"965 Bennington St", + u"02128", + u"42.386", + u"-71.008", + u"Citizens Connect App", + ), + ] # noqa + class TestLoadUnhandledTypes(TestLoadBase): def test_kml(self): @@ -921,6 +1097,20 @@ def test_simple(self, Session): # "'-01':4,5 '00':6,7,8 '1':1 '2011':3 'galway':2" # "'-01':2,3 '00':5,6 '1':7 '2011':1 'galway':8 't00':4" + assert self._get_column_names(Session, resource_id) == [ + u"_id", + u"_full_text", + u"date", + u"temperature", + u"place", + ] + assert self._get_column_types(Session, resource_id) == [ + u"int4", + u"tsvector", + u"timestamp", + u"numeric", + u"text", + ] assert self._get_records(Session, resource_id) == [ (1, datetime.datetime(2011, 1, 1, 0, 0), Decimal("1"), u"Galway",), ( @@ -989,24 +1179,6 @@ def test_simple_large_file(self, Session): u"text", ] - def test_simple_large_file(self, Session): - csv_filepath = get_sample_filepath("simple-large.csv") - resource = factories.Resource() - resource_id = resource['id'] - is_data_dict_populated = loader.load_table( - csv_filepath, - resource_id=resource_id, - mimetype="text/csv", - logger=logger, - ) - assert is_data_dict_populated == True - assert self._get_column_types(Session, resource_id) == [ - u"int4", - u"tsvector", - u"numeric", - u"text", - ] - def test_with_mixed_types(self, Session): csv_filepath = get_sample_filepath("mixed_numeric_string_sample.csv") resource = factories.Resource() @@ -1093,6 +1265,74 @@ def test_boston_311(self, Session): ) records = self._get_records(Session, resource_id) + print(self._get_column_names(Session, resource_id)) + assert self._get_column_names(Session, resource_id) == [ + u"_id", # int4 + u"_full_text", # tsvector + u"CASE_ENQUIRY_ID", # numeric + u"open_dt", # timestamp + u"target_dt", # timestamp + u"closed_dt", # text + u"OnTime_Status", # text + u"CASE_STATUS", # text + u"CLOSURE_REASON", # text + u"CASE_TITLE", # text + u"SUBJECT", # text + u"REASON", # text + u"TYPE", # text + u"QUEUE", # text + u"Department", # text + u"SubmittedPhoto", # text + u"ClosedPhoto", # text + u"Location", # text + u"Fire_district", # numeric + u"pwd_district", # numeric + u"city_council_district", # numeric + u"police_district", # text + u"neighborhood", # text + u"neighborhood_services_district", # numeric + u"ward", # text + u"precinct", # numeric + u"LOCATION_STREET_NAME", # text + u"LOCATION_ZIPCODE", # numeric + u"Latitude", # numeric + u"Longitude", # numeric + u"Source", # text + ] # noqa + print(self._get_column_types(Session, resource_id)) + assert self._get_column_types(Session, resource_id) == [ + u"int4", # _id + u"tsvector", # _full_text + u"numeric", # CASE_ENQUIRY_ID + u"timestamp", # open_dt + u"timestamp", # target_dt + u"text", # closed_dt + u"text", # OnTime_Status + u"text", # CASE_STATUS + u"text", # CLOSURE_REASON + u"text", # CASE_TITLE + u"text", # SUBJECT + u"text", # REASON + u"text", # TYPE + u"text", # QUEUE + u"text", # Department + u"text", # SubmittedPhoto + u"text", # ClosedPhoto + u"text", # Location + u"numeric", # Fire_district + u"numeric", # pwd_district + u"numeric", # city_council_district + u"text", # police_district + u"text", # neighborhood + u"numeric", # neighborhood_services_district + u"text", # ward + u"numeric", # precinct + u"text", # LOCATION_STREET_NAME + u"numeric", # LOCATION_ZIPCODE + u"numeric", # Latitude + u"numeric", # Longitude + u"text", # Source + ] # noqa print(records) assert is_data_dict_populated == True assert records == [ @@ -1101,18 +1341,18 @@ def test_boston_311(self, Session): Decimal("101002153891"), datetime.datetime(2017, 7, 6, 23, 38, 43), datetime.datetime(2017, 7, 21, 8, 30), - u"", + None, u"ONTIME", u"Open", - u" ", + None, # " " transforms to None u"Street Light Outages", - u"Public Works Department", + u"Public Works Department", # " " trailing whitespace gets trimmed u"Street Lights", u"Street Light Outages", u"PWDx_Street Light Outages", u"PWDx", - u"", - u"", + None, + None, u"480 Harvard St Dorchester MA 02124", Decimal("8"), Decimal("7"), @@ -1133,18 +1373,18 @@ def test_boston_311(self, Session): Decimal("101002153890"), datetime.datetime(2017, 7, 6, 23, 29, 13), datetime.datetime(2017, 9, 11, 8, 30), - u"", + None, u"ONTIME", u"Open", - u" ", + None, # " " transforms to None u"Graffiti Removal", u"Property Management", u"Graffiti", u"Graffiti Removal", u"PROP_GRAF_GraffitiRemoval", u"PROP", - u" https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg", - u"", + u"https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg", # strip white spaces + None, u"522 Saratoga St East Boston MA 02128", Decimal("1"), Decimal("9"), @@ -1165,18 +1405,18 @@ def test_boston_311(self, Session): Decimal("101002153889"), datetime.datetime(2017, 7, 6, 23, 24, 20), datetime.datetime(2017, 9, 11, 8, 30), - u"", + None, u"ONTIME", u"Open", - u" ", + None, # " " transforms to None u"Graffiti Removal", u"Property Management", u"Graffiti", u"Graffiti Removal", u"PROP_GRAF_GraffitiRemoval", u"PROP", - u" https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg", - u"", + u"https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg", # strip white spaces + None, u"965 Bennington St East Boston MA 02128", Decimal("1"), Decimal("9"), @@ -1193,74 +1433,6 @@ def test_boston_311(self, Session): u"Citizens Connect App", ), ] # noqa - print(self._get_column_names(Session, resource_id)) - assert self._get_column_names(Session, resource_id) == [ - u"_id", - u"_full_text", - u"CASE_ENQUIRY_ID", - u"open_dt", - u"target_dt", - u"closed_dt", - u"OnTime_Status", - u"CASE_STATUS", - u"CLOSURE_REASON", - u"CASE_TITLE", - u"SUBJECT", - u"REASON", - u"TYPE", - u"QUEUE", - u"Department", - u"SubmittedPhoto", - u"ClosedPhoto", - u"Location", - u"Fire_district", - u"pwd_district", - u"city_council_district", - u"police_district", - u"neighborhood", - u"neighborhood_services_district", - u"ward", - u"precinct", - u"LOCATION_STREET_NAME", - u"LOCATION_ZIPCODE", - u"Latitude", - u"Longitude", - u"Source", - ] # noqa - print(self._get_column_types(Session, resource_id)) - assert self._get_column_types(Session, resource_id) == [ - u"int4", - u"tsvector", - u"numeric", - u"timestamp", - u"timestamp", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"text", - u"numeric", - u"numeric", - u"numeric", - u"text", - u"text", - u"numeric", - u"text", - u"numeric", - u"text", - u"numeric", - u"numeric", - u"numeric", - u"text", - ] # noqa def test_no_entries(self): csv_filepath = get_sample_filepath("no_entries.csv") @@ -1372,3 +1544,209 @@ def test_preserving_time_ranges(self, Session): (3, "Barcaldine", 4725, Decimal("-23.55327901"), Decimal("145.289156"), "9:00-12:30", "13:30-16:30", datetime.datetime(2018, 7, 20)) ] + + def test_load_with_no_strip_white(self, Session): + csv_filepath = get_sample_filepath("boston_311_sample.csv") + resource = factories.Resource() + resource_id = resource['id'] + loader.load_table( + csv_filepath, + resource_id=resource_id, + mimetype="csv", + logger=logger, + ) + + # Change strip_extra_white, as it would be done by Data Dictionary + rec = p.toolkit.get_action("datastore_search")( + None, {"resource_id": resource_id, "limit": 0} + ) + fields = [f for f in rec["fields"] if not f["id"].startswith("_")] + for field in fields: + if "info" not in field: + field["info"] = {} + field["info"]["strip_extra_white"] = False # <=2.10 + field["strip_extra_white"] = False # >=2.11 + p.toolkit.get_action("datastore_create")( + {"ignore_auth": True}, + {"resource_id": resource_id, "force": True, "fields": fields}, + ) + + # Load it again with new strip_extra_white + fields = loader.load_table( + csv_filepath, + resource_id=resource_id, + mimetype="csv", + logger=logger, + ) + loader.create_column_indexes( + fields=fields, resource_id=resource_id, logger=logger + ) + + records = self._get_records(Session, resource_id) + print(self._get_column_names(Session, resource_id)) + assert self._get_column_names(Session, resource_id) == [ + u"_id", # int4 + u"_full_text", # tsvector + u"CASE_ENQUIRY_ID", # numeric + u"open_dt", # timestamp + u"target_dt", # timestamp + u"closed_dt", # text + u"OnTime_Status", # text + u"CASE_STATUS", # text + u"CLOSURE_REASON", # text + u"CASE_TITLE", # text + u"SUBJECT", # text + u"REASON", # text + u"TYPE", # text + u"QUEUE", # text + u"Department", # text + u"SubmittedPhoto", # text + u"ClosedPhoto", # text + u"Location", # text + u"Fire_district", # numeric + u"pwd_district", # numeric + u"city_council_district", # numeric + u"police_district", # text + u"neighborhood", # text + u"neighborhood_services_district", # numeric + u"ward", # text + u"precinct", # numeric + u"LOCATION_STREET_NAME", # text + u"LOCATION_ZIPCODE", # numeric + u"Latitude", # numeric + u"Longitude", # numeric + u"Source", # text + ] # noqa + print(self._get_column_types(Session, resource_id)) + assert self._get_column_types(Session, resource_id) == [ + u"int4", # _id + u"tsvector", # _full_text + u"numeric", # CASE_ENQUIRY_ID + u"timestamp", # open_dt + u"timestamp", # target_dt + u"text", # closed_dt + u"text", # OnTime_Status + u"text", # CASE_STATUS + u"text", # CLOSURE_REASON + u"text", # CASE_TITLE + u"text", # SUBJECT + u"text", # REASON + u"text", # TYPE + u"text", # QUEUE + u"text", # Department + u"text", # SubmittedPhoto + u"text", # ClosedPhoto + u"text", # Location + u"numeric", # Fire_district + u"numeric", # pwd_district + u"numeric", # city_council_district + u"text", # police_district + u"text", # neighborhood + u"numeric", # neighborhood_services_district + u"text", # ward + u"numeric", # precinct + u"text", # LOCATION_STREET_NAME + u"numeric", # LOCATION_ZIPCODE + u"numeric", # Latitude + u"numeric", # Longitude + u"text", # Source + ] # noqa + print(records) + assert records == [ + ( + 1, # ds auto increment id / primary key + Decimal("101002153891"), + datetime.datetime(2017, 7, 6, 23, 38, 43), + datetime.datetime(2017, 7, 21, 8, 30), + None, + u"ONTIME", + u"Open", + u" ", # no strip_extra_white + u"Street Light Outages", + u"Public Works Department ", # no strip_extra_white + u"Street Lights", + u"Street Light Outages", + u"PWDx_Street Light Outages", + u"PWDx", + None, + None, + u"480 Harvard St Dorchester MA 02124", + Decimal("8"), + Decimal("7"), + Decimal("4"), + u"B3", + u"Greater Mattapan", + Decimal("9"), + u"Ward 14", + Decimal("1411"), + u"480 Harvard St", + Decimal("2124"), + Decimal("42.288"), + Decimal("-71.0927"), + u"Citizens Connect App", + ), # noqa + ( + 2, # ds auto increment id / primary key + Decimal("101002153890"), + datetime.datetime(2017, 7, 6, 23, 29, 13), + datetime.datetime(2017, 9, 11, 8, 30), + None, + u"ONTIME", + u"Open", + u" ", # no strip_extra_white + u"Graffiti Removal", + u"Property Management", + u"Graffiti", + u"Graffiti Removal", + u"PROP_GRAF_GraffitiRemoval", + u"PROP", + u" https://mayors24.cityofboston.gov/media/boston/report/photos/595f0000048560f46d94b9fa/report.jpg", # no strip_extra_white + None, + u"522 Saratoga St East Boston MA 02128", + Decimal("1"), + Decimal("9"), + Decimal("1"), + u"A7", + u"East Boston", + Decimal("1"), + u"Ward 1", + Decimal("110"), + u"522 Saratoga St", + Decimal("2128"), + Decimal("42.3807"), + Decimal("-71.0259"), + u"Citizens Connect App", + ), # noqa + ( + 3, # ds auto increment id / primary key + Decimal("101002153889"), + datetime.datetime(2017, 7, 6, 23, 24, 20), + datetime.datetime(2017, 9, 11, 8, 30), + None, + u"ONTIME", + u"Open", + u" ", # no strip_extra_white + u"Graffiti Removal", + u"Property Management", + u"Graffiti", + u"Graffiti Removal", + u"PROP_GRAF_GraffitiRemoval", + u"PROP", + u" https://mayors24.cityofboston.gov/media/boston/report/photos/595efedb048560f46d94b9ef/report.jpg", # no strip_extra_white + None, + u"965 Bennington St East Boston MA 02128", + Decimal("1"), + Decimal("9"), + Decimal("1"), + u"A7", + u"East Boston", + Decimal("1"), + u"Ward 1", + Decimal("112"), + u"965 Bennington St", + Decimal("2128"), + Decimal("42.386"), + Decimal("-71.008"), + u"Citizens Connect App", + ), + ] # noqa diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index f22dafbd..f6a0590f 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -81,7 +81,7 @@ def test_require_validation(self, monkeypatch): # TODO: test IPipeValidation assert not func.called # because of the validation_status not being `success` - func.called = None # reset + func.called = None # reset helpers.call_action( "resource_update", @@ -118,7 +118,7 @@ def test_enforce_validation_schema(self, monkeypatch): # TODO: test IPipeValidation assert not func.called # because of the schema being empty - func.called = None # reset + func.called = None # reset helpers.call_action( "resource_update", @@ -132,7 +132,7 @@ def test_enforce_validation_schema(self, monkeypatch): # TODO: test IPipeValidation assert not func.called # because of the validation_status not being `success` and there is a schema - func.called = None # reset + func.called = None # reset helpers.call_action( "resource_update", diff --git a/ckanext/xloader/tests/test_utils.py b/ckanext/xloader/tests/test_utils.py new file mode 100644 index 00000000..52886851 --- /dev/null +++ b/ckanext/xloader/tests/test_utils.py @@ -0,0 +1,70 @@ +import pytest +from unittest.mock import patch +from ckan.plugins import toolkit +from ckanext.xloader import utils + +def test_private_modify_url_no_change(): + url = "https://ckan.example.com/dataset" + assert utils._modify_url(url, "https://ckan.example.com") == url + + +@pytest.mark.parametrize("result_url, ckan_url, expected", [ + ("https://example.com/resource/123", "https://ckan.example.org", "https://ckan.example.org/resource/123"), + ("https://example.com/resource/123", "http://127.0.0.1:3001", "http://127.0.0.1:3001/resource/123"), + ("https://example.com/resource/123", "http://127.0.0.1:3001/pathnotadded", "http://127.0.0.1:3001/resource/123"), + ("https://ckan.example.org/resource/123", "https://ckan.example.org", "https://ckan.example.org/resource/123"), + ("http://old-ckan.com/resource/456", "http://new-ckan.com", "http://new-ckan.com/resource/456"), + ("https://sub.example.com/path", "https://ckan.example.com", "https://ckan.example.com/path"), + ("ftp://fileserver.com/file", "https://ckan.example.com", "ftp://fileserver.com/file"), ##should never happen + ("https://ckan.example.org/resource/789", "https://xloader.example.org", "https://xloader.example.org/resource/789"), + ("https://ckan.example.org/dataset/data", "https://xloader.example.org", "https://xloader.example.org/dataset/data"), + ("https://ckan.example.org/resource/123?foo=bar", "https://xloader.example.org", "https://xloader.example.org/resource/123?foo=bar"), + ("https://ckan.example.org/dataset/456#section", "https://xloader.example.org", "https://xloader.example.org/dataset/456#section"), + ("https://ckan.example.org/resource/123?param=value&other=123", "https://xloader.example.org", "https://xloader.example.org/resource/123?param=value&other=123"), + ("https://ckan.example.org/resource/partial#fragment", "https://xloader.example.org", "https://xloader.example.org/resource/partial#fragment"), + ("https://ckan.example.org/path/to/data?key=value#section", "https://xloader.example.org", "https://xloader.example.org/path/to/data?key=value#section"), + ("", "", ""), + ("", "http://127.0.0.1:5000", ""), + (None, None, None), + (None, "http://127.0.0.1:5000", None), +]) +def test_private_modify_url(result_url, ckan_url, expected): + assert utils._modify_url(result_url, ckan_url) == expected + + +@pytest.mark.parametrize("input_url, ckan_site_url, xloader_site_url, is_altered, expected", [ + ("https://ckan.example.org/resource/789", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/resource/789"), + ("https://ckan.example.org/resource/789", "https://ckan.example.org", "http://127.0.0.1:3012", True, "http://127.0.0.1:3012/resource/789"), + ("https://ckan.example.org/dataset/data", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/dataset/data"), + ("https://ckan.example.org/resource/123?foo=bar", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/resource/123?foo=bar"), + ("https://ckan.example.org/dataset/456#section", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/dataset/456#section"), + ("https://other-site.com/resource/999", "https://ckan.example.org", "https://xloader.example.org", False, ""), + ("https://ckan.example.org/resource/123?param=value&other=123", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/resource/123?param=value&other=123"), + ("https://ckan.example.org/resource/partial#fragment", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/resource/partial#fragment"), + ("https://ckan.example.org/path/to/data?key=value#section", "https://ckan.example.org", "https://xloader.example.org", True, "https://xloader.example.org/path/to/data?key=value#section"), + ("https://ckan.example.org/path/to/data?key=value#section", "https://ckan.example.org", "http://localhost:3000", True, "http://localhost:3000/path/to/data?key=value#section"), + ("https://ckan.example.org/blackListedPathToS3HostOrigin?key=value#section", "https://ckan.example.org", "https://xloader.example.org", False, ""), + ("ftp://ckan.example.org/dataset/456#section", "https://ckan.example.org", "https://xloader.example.org", False, ""), + ("https://ckan.example.org/dataset/456#section", "https://ckan.example.org", "", False, ""), + ("", "http://127.0.0.1:5000", None, False, ""), + ("", "http://127.0.0.1:5000", "", False, ""), + (None, "http://127.0.0.1:5000", None, False, ""), + (None, "http://127.0.0.1:5000", "", False, ""), +]) +def test_modify_input_url(input_url, ckan_site_url, xloader_site_url, is_altered, expected): + with patch.dict(toolkit.config, + {"ckan.site_url": ckan_site_url, + "ckanext.xloader.site_url": xloader_site_url, + "ckanext.xloader.site_url_ignore_path_regex": "(/blackListedPathToS3HostOrigin|/anotherpath)"}): + response = utils.modify_input_url(input_url) + if is_altered: + assert response == expected + else: + assert response == input_url + + + +def test_modify_input_url_no_xloader_site(): + url = "https://ckan.example.org/dataset" + with patch.dict(toolkit.config, {"ckan.site_url": "https://ckan.example.org", "ckanext.xloader.site_url": None}): + assert utils.modify_input_url(url) == url diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 7c386f53..c4f86f30 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -2,6 +2,7 @@ import json import datetime +import re from six import text_type as str, binary_type @@ -20,6 +21,8 @@ log = getLogger(__name__) +from urllib.parse import urlunparse, urlparse + # resource.formats accepted by ckanext-xloader. Must be lowercase here. DEFAULT_FORMATS = [ "csv", @@ -33,8 +36,6 @@ "application/vnd.oasis.opendocument.spreadsheet", ] -from .job_exceptions import JobError - class XLoaderFormats(object): formats = None @@ -61,8 +62,10 @@ def awaiting_validation(res_dict): """ Checks the existence of a logic action from the ckanext-validation plugin, thus supporting any extending of the Validation Plugin class. + Checks ckanext.xloader.validation.requires_successful_report config option value. + Checks ckanext.xloader.validation.enforce_schema config option value. Then checks the Resource's validation_status. """ @@ -173,12 +176,73 @@ def get_xloader_user_apitoken(): method returns the api_token set in the config file and defaults to the site_user. """ - api_token = p.toolkit.config.get('ckanext.xloader.api_token', None) - if api_token: + api_token = p.toolkit.config.get('ckanext.xloader.api_token') + if api_token and api_token != 'NOT_SET': return api_token + raise p.toolkit.ValidationError({u'ckanext.xloader.api_token': u'NOT_SET, please provide valid api token'}) + + + +def _modify_url(input_url: str, base_url: str) -> str: + """ Modifies the input URL with base_url provided. - site_user = p.toolkit.get_action('get_site_user')({'ignore_auth': True}, {}) - return site_user["apikey"] + Args: + input_url (str): The original URL to potentially modify + base_url (str): The base URL to compare/replace against + Returns: + str: The modified URL with replaced scheme and netloc + """ + parsed_input_url = urlparse(input_url) + parsed_base_url = urlparse(base_url) + # Do not modify non-HTTP(S) URLs (e.g., ftp://) + if parsed_input_url.scheme not in ("http", "https"): + return input_url + # replace scheme: "http/https" and netloc:"//:@:/" + new_url = urlunparse( + (parsed_base_url.scheme, + parsed_base_url.netloc, + parsed_input_url.path, + parsed_input_url.params, + parsed_input_url.query, + parsed_input_url.fragment)) + return new_url + + +def modify_input_url(input_url: str) -> str: + """Returns a potentially modified CKAN URL. + + This function takes a possible CKAN URL and potentially modifies its base URL while preserving the path, + query parameters, and fragments. The modification occurs only if three conditions are met: + 1. The base URL of the input matches the configured CKAN site URL (ckan.site_url). + 2. A `ckanext.xloader.site_url` is configured in the settings. + 3. A `ckanext.xloader.site_url_ignore_path_regex` if configured in the settings and does not match. + + Args: + input_url (str): The original CKAN URL to potentially modify + Returns: + str: Either the modified URL with new base URL from xloader_site_url, + or the original URL if conditions aren't met + """ + + xloader_site_url = config.get('ckanext.xloader.site_url') + if not xloader_site_url: + return input_url + + parsed_input_url = urlparse(input_url) + input_base_url = f"{parsed_input_url.scheme}://{parsed_input_url.netloc}" + parsed_ckan_site_url = urlparse(config.get('ckan.site_url')) + ckan_base_url = f"{parsed_ckan_site_url.scheme}://{parsed_ckan_site_url.netloc}" + + xloader_ignore_regex = config.get('ckanext.xloader.site_url_ignore_path_regex') + + #Don't alter non-matching base url's. + if input_base_url != ckan_base_url: + return input_url + #And not any urls on the ignore regex + elif xloader_ignore_regex and re.search(xloader_ignore_regex, input_url): + return input_url + + return _modify_url(input_url, xloader_site_url) def set_resource_metadata(update_dict): @@ -279,7 +343,7 @@ def type_guess(rows, types=TYPES, strict=False): at_least_one_value = [] for ri, row in enumerate(rows): diff = len(row) - len(guesses) - for _ in range(diff): + for _i in range(diff): typesdict = {} for type in types: typesdict[type] = 0 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..11521ef0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,145 @@ +[build-system] +requires = [ "setuptools",] +build-backend = "setuptools.build_meta" + +[project] +name = "ckanext-xloader" +version = "2.1.0" +description = "Express Loader - quickly load data into CKAN DataStore" +classifiers = [ "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10",] +keywords = [ "CKAN", "extension", "datastore",] +dependencies = [ "typing_extensions",] +authors = [ + {name = "ThrawnCA", email = "carl.antuar@smartservice.qld.gov.au"}, + {name = "Jesse Vickery (JVickery-TBS)", email = "jesse.vickery@tbs-sct.gc.ca"}, + {name = "Adrià Mercader (amercader)", email = "amercadero@gmail.com"}, + {name = "David Read (davidread)"}, + {name = "Brett Jones (kowh-ai)", email = "datashades@linkdigital.com.au"}, + {name = "Patricio Del Boca (pdelboca)"}, + {name = "William Dutton (duttonw)", email = "william.dutton@qld.gov.au"}, +# {name = "", email = ""}, +] +maintainers = [ + {name = "Adrià Mercader (amercader)", email = "amercadero@gmail.com"}, + {name = "William Dutton (duttonw)", email = "william.dutton@qld.gov.au"}, + {name = "Ian Ward (wardi)"}, + {name = "Brett Jones (kowh-ai)", email = "datashades@linkdigital.com.au"}, +] + +[project.readme] +file = "README.md" +content-type = "text/markdown" + +[project.license] +text = "AGPL" + +[project.urls] +Homepage = "https://github.com/ckan/ckanext-xloader" + +[project.optional-dependencies] +test = [ "pytest-factoryboy",] + +[project.entry-points."ckan.plugins"] +xloader = "ckanext.xloader.plugin:xloaderPlugin" + +[project.entry-points."babel.extractors"] +ckan = "ckan.lib.extract:extract_ckan" + +[tool.setuptools.packages] +find = {} + +[tool.black] +line-length = 79 +preview = true + +[tool.isort] +known_ckan = "ckan" +known_ckanext = "ckanext" +known_self = "ckanext.xloader" +sections = "FUTURE,STDLIB,FIRSTPARTY,THIRDPARTY,CKAN,CKANEXT,SELF,LOCALFOLDER" + +[tool.pytest.ini_options] +addopts = "--ckan-ini test.ini" +filterwarnings = [ + "ignore::sqlalchemy.exc.SADeprecationWarning", + "ignore::sqlalchemy.exc.SAWarning", + "ignore::DeprecationWarning", +] + +[tool.pyright] +pythonVersion = "3.7" +include = ["ckanext"] +exclude = [ + "**/test*", + "**/migration", +] +strict = [] + +strictParameterNoneValue = true # type must be Optional if default value is None + +# Check the meaning of rules here +# https://github.com/microsoft/pyright/blob/main/docs/configuration.md +reportFunctionMemberAccess = true # non-standard member accesses for functions +reportMissingImports = true +reportMissingModuleSource = true +reportMissingTypeStubs = false +reportImportCycles = true +reportUnusedImport = true +reportUnusedClass = true +reportUnusedFunction = true +reportUnusedVariable = true +reportDuplicateImport = true +reportOptionalSubscript = true +reportOptionalMemberAccess = true +reportOptionalCall = true +reportOptionalIterable = true +reportOptionalContextManager = true +reportOptionalOperand = true +reportTypedDictNotRequiredAccess = false # We are using Context in a way that conflicts with this check +reportConstantRedefinition = true +reportIncompatibleMethodOverride = true +reportIncompatibleVariableOverride = true +reportOverlappingOverload = true +reportUntypedFunctionDecorator = false +reportUnknownParameterType = true +reportUnknownArgumentType = false +reportUnknownLambdaType = false +reportUnknownMemberType = false +reportMissingTypeArgument = true +reportInvalidTypeVarUse = true +reportCallInDefaultInitializer = true +reportUnknownVariableType = true +reportUntypedBaseClass = true +reportUnnecessaryIsInstance = true +reportUnnecessaryCast = true +reportUnnecessaryComparison = true +reportAssertAlwaysTrue = true +reportSelfClsParameterName = true +reportUnusedCallResult = false # allow function calls for side-effect only (like logic.check_acces) +useLibraryCodeForTypes = true +reportGeneralTypeIssues = true +reportPropertyTypeMismatch = true +reportWildcardImportFromLibrary = true +reportUntypedClassDecorator = false # authenticator relies on repoze.who class-decorator +reportUntypedNamedTuple = true +reportPrivateUsage = true +reportPrivateImportUsage = true +reportInconsistentConstructor = true +reportMissingSuperCall = false +reportUninitializedInstanceVariable = true +reportInvalidStringEscapeSequence = true +reportMissingParameterType = true +reportImplicitStringConcatenation = false +reportUndefinedVariable = true +reportUnboundVariable = true +reportInvalidStubStatement = true +reportIncompleteStub = true +reportUnsupportedDunderAll = true +reportUnusedCoroutine = true +reportUnnecessaryTypeIgnoreComment = true +reportMatchNotExhaustive = true \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 484f4d2e..e5fe8b9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -ckantoolkit +ckantoolkit>=0.0.4 requests>=2.31.0 six>=1.12.0 tabulator==1.53.5 diff --git a/setup.cfg b/setup.cfg index 5e2fb6fd..168d5b0b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,11 +19,3 @@ previous = true domain = ckanext-xloader directory = ckanext/xloader/i18n statistics = true - -[tool:pytest] - -filterwarnings = - ignore::sqlalchemy.exc.SADeprecationWarning - ignore::sqlalchemy.exc.SAWarning - ignore::DeprecationWarning -addopts = --pdbcls=IPython.terminal.debugger:TerminalPdb \ No newline at end of file diff --git a/setup.py b/setup.py index 84075718..cb65266c 100644 --- a/setup.py +++ b/setup.py @@ -1,91 +1,8 @@ # -*- coding: utf-8 -*- -from setuptools import setup, find_packages # Always prefer setuptools over distutils -from codecs import open # To use a consistent encoding -from os import path +from setuptools import setup -here = path.abspath(path.dirname(__file__)) - -# Get the long description from the relevant file -with open(path.join(here, 'README.rst'), encoding='utf-8') as f: - long_description = f.read() setup( - name='''ckanext-xloader''', - - # Versions should comply with PEP440. For a discussion on single-sourcing - # the version across setup.py and the project code, see - # http://packaging.python.org/en/latest/tutorial.html#version - version='1.2.0', - - description='Express Loader - quickly load data into CKAN DataStore''', - long_description=long_description, - long_description_content_type='text/x-rst', - - # The project's main homepage. - url='https://github.com/ckan/ckanext-xloader', - - # Author details - author='''David Read''', - author_email='''david.read@hackneyworkshop.com''', - - # Choose your license - license='AGPL', - - # See https://pypi.python.org/pypi?%3Aaction=list_classifiers - classifiers=[ - 'Development Status :: 5 - Production/Stable', - - # Pick your license as you wish (should match "license" above) - 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', - - # Specify the Python versions you support here. - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - ], - - - # What does your project relate to? - keywords='''CKAN extension datastore''', - - # You can just specify the packages manually here if your project is - # simple. Or you can use find_packages(). - packages=find_packages(exclude=['contrib', 'docs', 'tests*']), - namespace_packages=['ckanext'], - - install_requires=[ - # CKAN extensions should not list dependencies here, but in a separate - # ``requirements.txt`` file. - # - # http://docs.ckan.org/en/latest/extensions/best-practices.html#add-third-party-libraries-to-requirements-txt - ], - - # If there are data files included in your packages that need to be - # installed, specify them here. If using Python 2.6 or less, then these - # have to be included in MANIFEST.in as well. - include_package_data=True, - package_data={ - }, - - # Although 'package_data' is the preferred approach, in some case you may - # need to place data files outside of your packages. - # see http://docs.python.org/3.4/distutils/setupscript.html#installing-additional-files - # In this case, 'data_file' will be installed into '/my_data' - data_files=[], - - # To provide executable scripts, use entry points in preference to the - # "scripts" keyword. Entry points provide cross-platform support and allow - # pip to create the appropriate form of executable for the target platform. - entry_points=''' - [ckan.plugins] - xloader=ckanext.xloader.plugin:xloaderPlugin - - [babel.extractors] - ckan = ckan.lib.extract:extract_ckan - - ''', - # If you are changing from the default layout of your extension, you may # have to change the message extractors, you can read more about babel # message extraction at diff --git a/test.ini b/test.ini index 7bfab684..c02827bd 100644 --- a/test.ini +++ b/test.ini @@ -34,13 +34,13 @@ handlers = console [logger_ckan] qualname = ckan -handlers = +handlers = console level = INFO [logger_ckanext_xloader] qualname = ckanext.xloader -handlers = -level = DEBUG +handlers = console +level = WARN [logger_sqlalchemy] handlers =