Complete technical documentation for DataGenFlow developers.
lib/
blocks/
builtin/ # Stable blocks (text_generator, structured_generator, validators, metrics)
custom/ # Experimental blocks
base.py # BaseBlock interface
config.py # BlockConfigSchema (schema extraction)
registry.py # Auto-discovery engine
entities/ # Pydantic models
block_execution_context.py # BlockExecutionContext
pipeline.py # ExecutionResult, Constraints, Usage
api.py, database.py, job.py, record.py, llm_config.py
templates/ # Pipeline templates (YAML)
errors.py # Custom exception classes
workflow.py # Pipeline execution with tracing
storage.py # Database operations (aiosqlite)
template_renderer.py # Jinja2 template rendering
llm_config.py # LLMConfigManager
constants.py # Constants (RECORD_UPDATABLE_FIELDS)
frontend/
src/
pages/
Pipelines.tsx # Visual pipeline builder and manager
Generator.tsx # Dataset generation with progress tracking
Review.tsx # Review records with execution traces
Settings.tsx # LLM configuration
components/
pipeline-editor/ # ReactFlow-based visual editor
tests/
conftest.py # Test configuration and fixtures
blocks/ # Block unit tests
test_api.py # API endpoint tests
test_workflow.py # Pipeline execution tests
test_storage.py # Database operations tests
test_errors.py # Error handling tests
Block System
- Blocks are the fundamental building units
- Each block declares inputs and outputs
- Blocks execute asynchronously and return dictionaries
- Output validation enforced at runtime
Pipeline Execution
- Sequential execution with accumulated state
- Full trace capture (inputs, outputs, timing, errors)
- Correlation IDs for request tracking
- Graceful error handling with context
Storage Layer
- SQLite with aiosqlite for async operations
- Separate tables: pipelines, jobs, records
- Automatic schema migrations
- Foreign key constraints for data integrity
- Python 3.10+
- Node.js 20+ and Yarn
- uv (Python package manager)
# Clone repository
git clone <repository-url>
cd DataGenFlow
# Install dependencies
make dev
# Run tests
make test# Start development servers (hot reload)
make run-dev
# Run separately
make dev-backend # Backend only (port 8000)
make dev-ui # Frontend only (port 5173)
# Code quality
make lint # Backend linting (ruff)
make typecheck # Type checking (mypy)
make format # Format code (ruff)
make lint-frontend # Frontend linting (ESLint)
make typecheck-frontend # TypeScript type checking
# Production build
make build-ui # Build frontend
make run # Run production server# All tests (93 tests)
make test
# Specific test suites
uv run pytest tests/blocks/ -v
uv run pytest tests/test_api.py -v
uv run pytest tests/test_workflow.py -v
# With coverage
uv run pytest --cov=lib --cov=app tests/Tests use a separate in-memory database that is automatically created and cleaned up after each test session.
import pytest
@pytest.mark.asyncio
async def test_pipeline_execution():
"""Test pipeline execution with trace"""
pipeline = Pipeline(...)
result, trace, trace_id = await pipeline.execute({"input": "test"})
assert "output" in result
assert len(trace) > 0
assert trace_id is not NoneList all blocks
GET /api/blocksResponse:
[
{
"type": "TextGenerator",
"name": "LLM Generator",
"description": "Generate text using LLM",
"inputs": ["system", "user"],
"outputs": ["assistant"],
"config": {
"temperature": "float",
"max_tokens": "int"
}
}
]Create pipeline
POST /api/pipelines
Content-Type: application/json
{
"name": "My Pipeline",
"blocks": [
{
"type": "TextGenerator",
"config": {"temperature": 0.7}
}
]
}Execute pipeline
POST /api/pipelines/{id}/execute
Content-Type: application/json
{"text": "input data"}Response:
{
"result": {"output": "..."},
"trace": [
{
"block_type": "TextGenerator",
"input": {...},
"output": {...},
"accumulated_state": {...},
"execution_time": 1.234
}
],
"trace_id": "uuid"
}Start generation job
POST /api/generate
Content-Type: multipart/form-data
file=@seeds.json
pipeline_id=1Get job status
GET /api/jobs/{job_id}Response:
{
"id": 1,
"pipeline_id": 1,
"status": "running",
"total_seeds": 100,
"records_generated": 45,
"records_failed": 2
}List records
GET /api/records?status=pending&limit=100&job_id=1Update record
PUT /api/records/{id}
Content-Type: application/json
{
"status": "accepted",
"output": "updated text"
}Export records
GET /api/export?status=accepted&job_id=1
GET /api/export/download?status=acceptedDataGenFlow provides a simple debugging workflow using VS Code:
- Create your pipeline using the visual editor
- Note the pipeline ID from the UI (click "Debug Instructions" to expand)
- Open
debug_pipeline.pyand set:PIPELINE_IDto your pipeline IDSEED_DATAwith test input
- Set breakpoints in your custom blocks
- Press F5 in VS Code and select "Debug Pipeline"
The debugger will stop at your breakpoints, allowing you to inspect variables, step through code, and debug your custom block logic.
Tips:
- Use the "Copy" button next to pipeline ID in the UI
- Edit seed data directly in debug_pipeline.py for fast iteration
- The pipeline executes exactly as it would in production
- No need to rebuild frontend - pipelines persist in database
Enable detailed logging in .env:
DEBUG=trueFeatures:
- Correlation IDs in all logs
- Per-block execution timing
- Full input/output state logging
- Stack traces with context
Example logs:
2025-10-14 10:15:32 [INFO] [a1b2c3d4] Pipeline 'Data Gen' started (3 blocks)
2025-10-14 10:15:32 [DEBUG] [a1b2c3d4] Block 1/3: TextGenerator executing
2025-10-14 10:15:35 [DEBUG] [a1b2c3d4] TextGenerator completed (3.124s)
2025-10-14 10:15:35 [INFO] [a1b2c3d4] Pipeline completed successfullyCustom Exceptions
BlockNotFoundError: Block type not registeredBlockExecutionError: Runtime execution failureValidationError: Output validation failure
All exceptions include:
- Structured error message
- Context dictionary with details
- HTTP-appropriate status codes
Error Response Format:
{
"error": "Block 'InvalidBlock' not found",
"detail": {
"block_type": "InvalidBlock",
"available_blocks": ["TextGenerator", "ValidatorBlock", ...]
}
}Mypy Configuration (pyproject.toml):
[tool.mypy]
python_version = "3.10"
strict = true
warn_return_any = true
[[tool.mypy.overrides]]
module = "tests.*"
disable_error_code = ["no-untyped-def"]Run type checking:
make typecheck # Backend
make typecheck-frontend # FrontendRuff (Backend)
make lint # Check
make format # FixESLint (Frontend)
make lint-frontend # Check- Backend: Follow PEP 8, enforced by ruff
- Frontend: Prettier + ESLint
- Line length: 100 characters
- Imports: Sorted automatically
- Type hints: Required for all public APIs
from lib.blocks.base import BaseBlock
from lib.entities.block_execution_context import BlockExecutionContext
from typing import Any
class MyBlock(BaseBlock):
# Required class attributes
name: str = "My Block"
description: str = "What this block does"
category: str = "general" # generators, validators, metrics, seeders, general
inputs: list[str] = ["input_field"]
outputs: list[str] = ["output_field"]
# Optional: Get config schema for UI
def get_config_schema(self) -> dict[str, Any]:
return {
"my_param": {
"type": "string",
"default": "default_value",
"description": "Parameter description"
}
}
# Required: Execute logic
async def execute(self, context: BlockExecutionContext) -> dict[str, Any]:
# Access config
param = self.config.get("my_param", "default")
# Access input from accumulated state
input_value = context.get_state("input_field")
# Your logic here
result = process(input_value, param)
# Return only declared outputs
return {"output_field": result}- Create file in
user_blocks/orlib/blocks/custom/ - Inherit from
BaseBlock - Restart server
- Block automatically appears in UI
Registry scans:
lib/blocks/builtin/- Stable blockslib/blocks/custom/- Experimental blocksuser_blocks/- User-created blocks
import pytest
from lib.entities.block_execution_context import BlockExecutionContext
from your_block import MyBlock
@pytest.mark.asyncio
async def test_my_block():
block = MyBlock(config={"my_param": "test"})
context = BlockExecutionContext(
trace_id="test",
pipeline_id=1,
accumulated_state={"input_field": "test data"}
)
result = await block.execute(context)
assert "output_field" in result
assert result["output_field"] == expected_value- Use
LIMITandOFFSETfor large result sets - Records API supports pagination
- Indexes on frequently queried fields (
status,created_at)
- Blocks execute sequentially (one at a time)
- Use
asynciofor I/O-bound operations - Trace overhead is minimal (~1-2ms per block)
- ReactFlow handles large pipelines efficiently
- Record review uses windowed scrolling
- API calls are debounced where appropriate
# Build frontend
make build-ui
# Set production environment
DEBUG=false
LLM_API_KEY=your-production-key
# Run with production server
make run
# Or use systemd/supervisor/docker
uv run uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4FROM python:3.10-slim
WORKDIR /app
COPY . .
RUN pip install uv && uv sync
CMD ["uv", "run", "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]See CONTRIBUTING for guidelines on:
- Code style and conventions
- PR title format
- Review process
- Testing requirements