Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[run]
branch = True
source =
ckanext/reactor

[report]
show_missing = True
skip_covered = False
fail_under = 90
exclude_lines =
pragma: no cover
if __name__ == .__main__.:
7 changes: 7 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# RESOURCES
# - https://git-scm.com/docs/gitignore#_pattern_format
# - https://help.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners
# ----------------------------------------------------------------
###

* @OpenGov-OpenData/CKAN
67 changes: 67 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
name: Tests
on: [pull_request]
env:
CODE_COVERAGE_THRESHOLD_REQUIRED: 90
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: Install requirements
run: pip install flake8 pycodestyle
- name: Check syntax
run: flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan
- name: Run flake8
run: flake8 . --count --max-line-length=127 --statistics --exclude ckan

test:
needs: lint
strategy:
matrix:
include:
- ckan-version: "2.9"
ckan-image: "ckan/ckan-dev:2.9-py3.9"
fail-fast: false

name: CKAN ${{ matrix.ckan-version }}
runs-on: ubuntu-latest
container:
image: ${{ matrix.ckan-image }}
options: --user root
services:
solr:
image: ckan/ckan-solr:${{ matrix.ckan-version }}-solr9
postgres:
image: ckan/ckan-postgres-dev:${{ matrix.ckan-version }}
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
redis:
image: redis:3
env:
CKAN_SQLALCHEMY_URL: postgresql://ckan_default:pass@postgres/ckan_test
CKAN_DATASTORE_WRITE_URL: postgresql://datastore_write:pass@postgres/datastore_test
CKAN_DATASTORE_READ_URL: postgresql://datastore_read:pass@postgres/datastore_test
CKAN_SOLR_URL: http://solr:8983/solr/ckan
CKAN_REDIS_URL: redis://redis:6379/1

steps:
- uses: actions/checkout@v5
- name: Install requirements (common)
run: |
pip install -r requirements.txt
pip install -r dev-requirements.txt
pip install -e .
# Replace default path to CKAN core config file with the one on the container
sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini
- name: Setup extension (CKAN >= 2.9)
run: |
ckan -c test.ini db init
- name: Run tests
run: |
pytest --cov=ckanext.consumer --ckan-ini=test.ini --disable-warnings --cov-fail-under=${CODE_COVERAGE_THRESHOLD_REQUIRED} ckanext/tests/
661 changes: 661 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# ckanext-consumer

**Event-driven architecture extension for CKAN.**

`ckanext-consumer` allows your CKAN instance to react to external events. It listens to a Kafka queue, consumes CloudEvents (or standard JSON), and dispatches them to registered Python functions in your other CKAN extensions.

## 🏗 Architecture

1. **Consumer** acts as the worker/consumer process (`ckan consumer consume`).
2. It discovers other plugins implementing the `IConsumer` interface.
3. It routes incoming Kafka messages to the correct plugin based on the **Topic Name**.

## 🔧 Installation

1. **Install the extension:**
```bash
pip install -e .
```

2. **Install dependencies:**
```bash
pip install -r requirements.txt
```

3. **Enable the plugin** in your `ckan.ini`:
```ini
ckan.plugins = ... consumer
```

## ⚙️ Configuration

Add the following settings to your `ckan.ini` file.

### Basic Configuration
ckan.consumer.kafka.bootstrap.servers = your-kafka-broker:9092
ckan.consumer.kafka.group_id = ckan_consumer_prod
ckan.consumer.kafka.client.id = ckan_instance_1

### Security & Authentication (SASL/SSL)
# Protocol: PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL
ckan.consumer.kafka.security.protocol = SASL_SSL

# Mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (Kerberos)
ckan.consumer.kafka.sasl.mechanisms = PLAIN

# Credentials
ckan.consumer.kafka.sasl.username = your_username
ckan.consumer.kafka.sasl.password = your_password

### Tuning & Reliability
# Best practice for higher availability in librdkafka clients
ckan.consumer.kafka.session.timeout.ms = 45000
ckan.consumer.kafka.auto.offset.reset = earliest
7 changes: 7 additions & 0 deletions ckanext/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This file serves as a namespace package
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
Empty file added ckanext/consumer/__init__.py
Empty file.
134 changes: 134 additions & 0 deletions ckanext/consumer/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging
import sys
import json
from confluent_kafka import Consumer, KafkaError

log = logging.getLogger(__name__)


def get_kafka_config(ckan_config):
"""
Extracts and validates Kafka configuration from CKAN .ini file.
Supports SASL/SSL authentication and client tuning.
"""
# 1. Base Required Configuration
bootstrap_servers = ckan_config.get('ckan.consumer.kafka.bootstrap.servers')
group_id = ckan_config.get('ckan.consumer.kafka.group_id')

if not all([bootstrap_servers, group_id]):
log.error(
"Missing required config. Set 'ckan.consumer.kafka.bootstrap.servers' and "
"'ckan.consumer.kafka.group_id' in ckan.ini"
)
sys.exit(1)

# Initialize config with defaults
conf = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'enable.auto.commit': True,
'auto.offset.reset': ckan_config.get('ckan.consumer.kafka.auto.offset.reset', 'earliest'),
}

# 2. Extended Configuration (Security & Tuning)
# Map CKAN .ini keys to librdkafka properties
config_mapping = {
# Identification
'ckan.consumer.kafka.client.id': 'client.id',

# Security Protocol (PLAINTEXT, SASL_SSL, etc.)
'ckan.consumer.kafka.security.protocol': 'security.protocol',

# SASL Auth
'ckan.consumer.kafka.sasl.mechanisms': 'sasl.mechanisms',
'ckan.consumer.kafka.sasl.username': 'sasl.username',
'ckan.consumer.kafka.sasl.password': 'sasl.password',

# Connection Tuning
'ckan.consumer.kafka.session.timeout.ms': 'session.timeout.ms',
'ckan.consumer.kafka.socket.timeout.ms': 'socket.timeout.ms',
}

# 3. Apply optional settings if they exist in .ini
for ckan_key, kafka_key in config_mapping.items():
value = ckan_config.get(ckan_key)
if value:
conf[kafka_key] = value

return conf


def process_message(msg, handlers):
"""
Routes the message to the correct handler based on the topic.
Implements Resilient Error Handling strategy.
"""
topic = msg.topic()
msg_value = msg.value()

handler = handlers.get(topic)

if not handler:
# Avoid log spam if subscribing to patterns, but useful for debugging
log.debug(f"No handler registered for topic '{topic}'. Skipping.")
return

try:
if msg_value:
# Assumes JSON payload (CloudEvents)
data = json.loads(msg_value.decode('utf-8'))
else:
data = {}

log.info(f"⚡ Processing event from topic: {topic}")
handler(data)

except json.JSONDecodeError:
log.error(f"❌ JSON Decode Error in topic '{topic}'")
except Exception as e:
log.error(f"❌ Error executing handler for topic '{topic}': {e}", exc_info=True)


def run_consumer(ckan_config, topic_handlers):
"""
Main infinite loop for the Kafka consumer.
"""
conf = get_kafka_config(ckan_config)

# Initialize Consumer
try:
consumer = Consumer(conf)
except Exception as e:
log.error(f"Failed to initialize Kafka Consumer: {e}")
return

topics_to_subscribe = list(topic_handlers.keys())

if not topics_to_subscribe:
log.error("No topic handlers registered. Exiting.")
return

try:
consumer.subscribe(topics_to_subscribe)
log.info(f"⚛️ Consumer started as client: {conf.get('client.id', 'unknown')}")
log.info(f"🎧 Listening on topics: {topics_to_subscribe}")

while True:
# Poll with 1.0s timeout
msg = consumer.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
log.error(f"Kafka Protocol Error: {msg.error()}")
continue

process_message(msg, topic_handlers)

except KeyboardInterrupt:
log.info("Consumer stopped by user.")
finally:
consumer.close()
26 changes: 26 additions & 0 deletions ckanext/consumer/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from ckan.plugins.interfaces import Interface


class IConsumer(Interface):
"""Interface for registering event handlers from other extensions.

Any plugin implementing this interface can subscribe to Kafka topics
handled by the consumer.
"""

def get_event_handlers(self):
"""
Returns a dictionary mapping topic names to handler functions.

The handler function must accept a single argument: the data payload (dict).

Example:
return {
'com.opengov.users.create': self.handle_user_create,
'com.opengov.datasets.update': self.handle_dataset_update
}

Returns:
dict: { 'topic_name': callable_function }
"""
return {}
54 changes: 54 additions & 0 deletions ckanext/consumer/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import click
import ckan.plugins as plugins
import ckan.plugins.toolkit as toolkit

from ckanext.consumer.interfaces import IConsumer
from ckanext.consumer.consumer import run_consumer


class ConsumerPlugin(plugins.SingletonPlugin):
"""CKAN Consumer: Event-driven infrastructure plugin.

1. Registers the 'ckan consumer consume' CLI command.
2. Collects event handlers from other plugins via IConsumer.
"""

plugins.implements(plugins.IClick)

def get_commands(self):
return [consumer]


@click.group()
def consumer():
"""Commands for the Consumer event system."""


@consumer.command()
def consume():
"""Starts the Kafka consumer worker process."""
click.echo("Initializing Consumer environment...")

topic_handlers = {}

# --- Dynamic Handler Registration ---
# Iterate over all active plugins that implement IConsumer
for plugin in plugins.PluginImplementations(IConsumer):
try:
handlers = plugin.get_event_handlers()
if handlers:
click.echo(f" > Loaded handlers from extension '{plugin.name}': {list(handlers.keys())}")
topic_handlers.update(handlers)
except Exception as e:
click.echo(f" ! Error loading handlers from {plugin.name}: {e}")

if not topic_handlers:
click.echo("⚠️ No handlers registered. The worker will start but will be idle.")
else:
click.echo(f"✅ Total topics monitored: {len(topic_handlers)}")

# Load CKAN config
conf = toolkit.config

# Start the consumer loop (blocking)
run_consumer(conf, topic_handlers)
Loading