This demo application illustrates the queue-of-work pattern for scalable API ingestion, as described in the accompanying blog post.
The demo fetches data from the free JSONPlaceholder API (https://jsonplaceholder.typicode.com/) and demonstrates:
- Breaking large workloads into small, independent work items
- Enqueueing work items to a queue
- Processing work items with registered processors
- Fault tolerance through automatic retry
- Poison message handling for persistently failing items
- Parallel processing simulation
┌─────────────┐
│ Ingestor │ → Creates work items and enqueues them
└──────┬──────┘
│
▼
┌─────────────┐
│ Queue │ → In-memory queue (simulates Azure Storage Queue)
└──────┬──────┘
│
▼
┌─────────────┐
│ Processor │ → Dequeues and dispatches to registered processors
└──────┬──────┘
│
▼
┌─────────────┐
│ Output │ → Saves results to JSON files
└─────────────┘
- Open this folder in VS Code
- When prompted, click "Reopen in Container" (or run "Remote-Containers: Reopen in Container" from command palette)
- Once the container is built, open a terminal and run:
python src/main.pyRequirements:
- Python 3.10 or higher
# Install dependencies
pip install -r requirements.txt
# Run the demo
python src/main.py-
Enqueue Phase: Creates work items to fetch:
- All users (1 work item)
- All posts in batches (10 work items, 10 posts each)
- All comments in batches (50 work items, 10 comments each)
-
Process Phase: Dequeues and processes each work item:
- Calls the JSONPlaceholder API
- Saves results to
output/directory - Handles errors and retries automatically
- Moves persistently failing items to poison queue
-
Results: Check the
output/directory for:users.json- All usersposts_*.json- Post batchescomments_*.json- Comment batches
Strongly-typed dataclasses representing units of work:
@dataclass
class FetchUsersWorkItem(WorkItem):
pass
@dataclass
class FetchPostsWorkItem(WorkItem):
start_id: int
end_id: intFunctions decorated to handle specific work item types:
@work_item_processor(FetchUsersWorkItem)
def process_fetch_users(work_item, logger):
# Fetch and save users
...Simulates Azure Storage Queue with in-memory collections:
- Automatic retry on failure
- Poison queue for persistent failures
- Visibility timeout simulation
Routes work items to their registered processors based on type.
Main processing loop that dequeues and dispatches work items.
To see the retry and poison queue mechanisms:
- Edit
src/processors/api_processors.py - Add a conditional failure (e.g., fail on certain IDs)
- Run the demo
- Check the console output to see retries
- Items that fail 3+ times move to poison queue
Try these exercises:
- Add a new work item type: Fetch albums or photos from JSONPlaceholder
- Implement parallel processing: Use
multiprocessingorthreadingto run multiple queue processors - Add Azure Storage Queue: Replace
InMemoryWorkQueuewith real Azure Storage Queue - Add data transformation: Process the fetched data before saving
- Implement incremental loading: Track processed IDs and skip already-fetched items
This demo simplifies some aspects for clarity:
| Aspect | Demo | Production |
|---|---|---|
| Queue | In-memory | Azure Storage Queue |
| Workers | Single process | Multiple container instances |
| Persistence | JSON files | Data Lake (Delta Lake) |
| Monitoring | Console logs | OpenTelemetry + Application Insights |
| Scaling | Manual | Azure Container Apps auto-scaling |
| Max retries | 3 | 5+ |
| Visibility timeout | 30s | 300s (5 min) |
See the accompanying blog post for a detailed explanation of the pattern, its benefits, and real-world usage in production data pipelines.