Skip to content

endjin/python-queue-of-work-pattern-demo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Queue-of-Work Pattern Demo

This demo application illustrates the queue-of-work pattern for scalable API ingestion, as described in the accompanying blog post.

Overview

The demo fetches data from the free JSONPlaceholder API (https://jsonplaceholder.typicode.com/) and demonstrates:

  • Breaking large workloads into small, independent work items
  • Enqueueing work items to a queue
  • Processing work items with registered processors
  • Fault tolerance through automatic retry
  • Poison message handling for persistently failing items
  • Parallel processing simulation

Architecture

┌─────────────┐
│  Ingestor   │ → Creates work items and enqueues them
└──────┬──────┘
       │
       ▼
┌─────────────┐
│    Queue    │ → In-memory queue (simulates Azure Storage Queue)
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  Processor  │ → Dequeues and dispatches to registered processors
└──────┬──────┘
       │
       ▼
┌─────────────┐
│   Output    │ → Saves results to JSON files
└─────────────┘

Quick Start

Using VS Code DevContainer (Recommended)

  1. Open this folder in VS Code
  2. When prompted, click "Reopen in Container" (or run "Remote-Containers: Reopen in Container" from command palette)
  3. Once the container is built, open a terminal and run:
python src/main.py

Using Local Python

Requirements:

  • Python 3.10 or higher
# Install dependencies
pip install -r requirements.txt

# Run the demo
python src/main.py

What the Demo Does

  1. Enqueue Phase: Creates work items to fetch:

    • All users (1 work item)
    • All posts in batches (10 work items, 10 posts each)
    • All comments in batches (50 work items, 10 comments each)
  2. Process Phase: Dequeues and processes each work item:

    • Calls the JSONPlaceholder API
    • Saves results to output/ directory
    • Handles errors and retries automatically
    • Moves persistently failing items to poison queue
  3. Results: Check the output/ directory for:

    • users.json - All users
    • posts_*.json - Post batches
    • comments_*.json - Comment batches

Key Components

Work Items (src/work_items/api_work_items.py)

Strongly-typed dataclasses representing units of work:

@dataclass
class FetchUsersWorkItem(WorkItem):
    pass

@dataclass
class FetchPostsWorkItem(WorkItem):
    start_id: int
    end_id: int

Processors (src/processors/api_processors.py)

Functions decorated to handle specific work item types:

@work_item_processor(FetchUsersWorkItem)
def process_fetch_users(work_item, logger):
    # Fetch and save users
    ...

Queue (src/work_queue/in_memory_work_queue.py)

Simulates Azure Storage Queue with in-memory collections:

  • Automatic retry on failure
  • Poison queue for persistent failures
  • Visibility timeout simulation

Dispatcher (src/work_queue/work_item_dispatcher.py)

Routes work items to their registered processors based on type.

Queue Processor (src/work_queue/queue_processor.py)

Main processing loop that dequeues and dispatches work items.

Simulating Failures

To see the retry and poison queue mechanisms:

  1. Edit src/processors/api_processors.py
  2. Add a conditional failure (e.g., fail on certain IDs)
  3. Run the demo
  4. Check the console output to see retries
  5. Items that fail 3+ times move to poison queue

Extending the Demo

Try these exercises:

  1. Add a new work item type: Fetch albums or photos from JSONPlaceholder
  2. Implement parallel processing: Use multiprocessing or threading to run multiple queue processors
  3. Add Azure Storage Queue: Replace InMemoryWorkQueue with real Azure Storage Queue
  4. Add data transformation: Process the fetched data before saving
  5. Implement incremental loading: Track processed IDs and skip already-fetched items

Production Differences

This demo simplifies some aspects for clarity:

Aspect Demo Production
Queue In-memory Azure Storage Queue
Workers Single process Multiple container instances
Persistence JSON files Data Lake (Delta Lake)
Monitoring Console logs OpenTelemetry + Application Insights
Scaling Manual Azure Container Apps auto-scaling
Max retries 3 5+
Visibility timeout 30s 300s (5 min)

Learn More

See the accompanying blog post for a detailed explanation of the pattern, its benefits, and real-world usage in production data pipelines.

About

Demo code to accompany the blog post "Scaling API Ingestion with the Queue-of-Work Pattern"

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages