Transform natural language into production-ready ETL pipelines
Features • Architecture • Quick Start • Documentation • Examples
DataMorph is a revolutionary AI-powered ETL system that converts natural language descriptions into fully functional, validated, and self-healing data pipelines. Simply describe what you want in plain English, and DataMorph handles the rest.
"Join customers and orders tables on customer_id,
calculate total order amount per customer,
and filter customers with total orders greater than $1000"
↓ DataMorph transforms this into ↓
✅ Structured ETL specifications
✅ Production-ready PySpark code
✅ Executed AWS Glue jobs
✅ Validated data quality
✅ Self-healed errors automatically
|
|
|
|
graph TB
User[👤 User] -->|Natural Language| API[🌐 Flask API Gateway]
API -->|Trigger Workflow| Orch[🎯 Orchestrator Lambda]
Orch -->|Step 1| Specs[📋 Specs Generator]
Specs -->|AI Processing| Bedrock1[🤖 Claude Sonnet 4.5]
Bedrock1 -->|JSON Specs| S3_1[(📦 S3)]
Orch -->|Step 2| API
API -->|Query| RDS[(🗄️ PostgreSQL)]
Orch -->|Step 3| Glue[⚙️ Glue Executor]
Glue -->|Generate Code| Bedrock2[🤖 Claude Sonnet 4.5]
Bedrock2 -->|PySpark Code| S3_2[(📦 S3)]
Glue -->|Execute| GlueJob[🔧 AWS Glue Job]
GlueJob -->|Load Data| RDS
Orch -->|Step 4| API
Orch -->|Step 5| Valid[✅ Validator]
Valid -->|5-Phase Testing| Bedrock3[🤖 Claude Sonnet 4.5]
Bedrock3 -->|Test Results| S3_3[(📦 S3)]
Valid -->|If Failed| Remed[🔄 Remediator]
Remed -->|Fix Code| Bedrock4[🤖 Claude Sonnet 4.5]
Remed -->|Retry| Glue
Orch -->|All Steps| Logger[📝 Logger]
Logger -->|Store Logs| Dynamo[(💾 DynamoDB)]
style User fill:#e1f5ff
style API fill:#fff4e1
style Orch fill:#ffe1f5
style Specs fill:#e1ffe1
style Glue fill:#ffe1e1
style Valid fill:#e1e1ff
style Remed fill:#ffe1cc
style Logger fill:#f0f0f0
style Bedrock1 fill:#8A2BE2,color:#fff
style Bedrock2 fill:#8A2BE2,color:#fff
style Bedrock3 fill:#8A2BE2,color:#fff
style Bedrock4 fill:#8A2BE2,color:#fff
DataMorph uses a 7-agent architecture where each agent has a specific responsibility:
| Agent | Role | Key Technology | Performance |
|---|---|---|---|
| 🌐 Flask API | HTTP interface & database proxy | Flask, PostgreSQL | 50-200ms |
| 🎯 Orchestrator | Central workflow coordinator | AWS Lambda | 90-180s |
| 📋 Specs Generator | NLP to ETL specs converter | Bedrock (Claude 4.5) | 5-10s |
| ⚙️ Glue Executor | Code generator & executor | Bedrock + AWS Glue | 70-135s |
| ✅ Validator | Hybrid validation system | Bedrock + SQL | 10-30s |
| 🔄 Remediator | Autonomous error correction | Bedrock + Lambda | 60-120s/iter |
| 📝 Logger | Centralized logging | DynamoDB | 50-100ms |
- AWS Account with Bedrock access (Claude Sonnet 4.5)
- Python 3.11+
- AWS CLI configured
- PostgreSQL database (RDS)
# Clone the repository
git clone https://github.com/yourusername/datamorph.git
cd datamorph
# Install dependencies
pip install -r requirements.txt
# Configure AWS credentials
aws configure
# Set up configuration in AWS Secrets Manager
aws secretsmanager create-secret \
--name datamorph/config \
--secret-string file://config.json# Deploy all Lambda functions
python deployment/deploy_all.py
# Deploy Flask API to EC2
python deployment/launch_ec2.py
# Create Glue connection
python deployment/create_glue_connection.pyimport requests
# Send natural language ETL request
response = requests.post('http://your-api-url/start', json={
"prompt": "Join customers and orders on customer_id, calculate total order amount per customer"
})
# Get run_id
run_id = response.json()['run_id']
# Check logs
logs = requests.get(f'http://your-api-url/get/logs/{run_id}')
print(logs.json())Detailed documentation for each agent:
- 🌐 Flask API Gateway - HTTP interface and database operations
- 🎯 Orchestrator Lambda - Workflow coordination
- 📋 Specs Generator - Natural language processing
- ⚙️ Glue Executor - Code generation and execution
- ✅ Validator - Hybrid validation system
- 🔄 Remediator - Autonomous error correction
- 📝 Logger - Centralized logging
Input:
"Join customers and orders tables on customer_id,
calculate total order amount per customer"
Generated Specs:
{
"source_tables": [
{"table_name": "customers", "alias": "c"},
{"table_name": "orders", "alias": "o"}
],
"target_table": {
"name": "customer_order_summary",
"description": "Customer order aggregation"
},
"join_conditions": [{
"left_table": "customers",
"right_table": "orders",
"join_type": "left",
"on_columns": ["customer_id"]
}],
"aggregations": [{
"function": "sum",
"column": "order_amount",
"alias": "total_amount",
"group_by": ["customer_id", "customer_name"]
}]
}Generated PySpark Code:
# Read source tables
customers_df = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": "true",
"dbtable": "customers",
"connectionName": "dmdb-connection"
}
).toDF()
orders_df = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": "true",
"dbtable": "orders",
"connectionName": "dmdb-connection"
}
).toDF()
# Apply transformations
customers_df = customers_df.alias("c")
orders_df = orders_df.alias("o")
joined_df = customers_df.join(
orders_df,
col("c.customer_id") == col("o.customer_id"),
"left"
)
result_df = joined_df.groupBy(
col("c.customer_id").alias("customer_id"),
col("c.customer_name").alias("customer_name")
).agg(
F.sum(col("o.order_amount")).alias("total_amount")
)
# Write to target
final_dynamic_frame = DynamicFrame.fromDF(result_df, glueContext, "final")
glueContext.write_dynamic_frame.from_options(...)Validation Results:
✅ All 12 tests passed
- 7 rule-based tests (structural validation)
- 5 AI-generated tests (data quality)
✅ Target table created successfully
✅ Data loaded and validated
Input:
"Join employees, departments, and salaries tables.
Calculate average salary by department.
Filter departments with average salary > $50,000"
Result:
- ✅ Specifications generated in 6 seconds
- ✅ PySpark code generated and executed in 95 seconds
- ✅ Validation passed (14/14 tests)
- ✅ Total time: 112 seconds
See more examples for additional use cases.
First Attempt: ████████████████░░░░ 85%
After Remediation: ███████████████████░ 95%
Overall: ███████████████████▓ 98%
| Workflow Type | Time Range | Average |
|---|---|---|
| Simple ETL | 90-120s | 105s |
| Complex ETL | 150-180s | 165s |
| With Remediation | +60-120s | +90s |
| Service | Cost |
|---|---|
| Bedrock (Claude) | $0.10-0.30 |
| Lambda | $0.05-0.10 |
| Glue | $0.03 |
| Total | $0.20-0.45 |
DataMorph automatically detects and fixes errors:
sequenceDiagram
participant U as User
participant O as Orchestrator
participant G as Glue Executor
participant V as Validator
participant R as Remediator
U->>O: Submit ETL Request
O->>G: Generate & Execute Code
G->>G: Attempt 1: Generate Code
G->>G: Execute Glue Job
G-->>O: ❌ Execution Failed
Note over G: Self-Healing Activated
G->>G: Attempt 2: Analyze Error
G->>G: Generate Corrected Code
G->>G: Execute Glue Job
G-->>O: ✅ Execution Success
O->>V: Validate Results
V->>V: Run 12 Tests
V-->>O: ❌ 2 Tests Failed
Note over R: Remediation Activated
O->>R: Fix Validation Failures
R->>R: Iteration 1: Analyze & Fix
R->>G: Re-execute with Fixed Code
G-->>R: ✅ Success
R->>V: Re-validate
V-->>R: ✅ All Tests Pass
R-->>O: ✅ Remediation Complete
O-->>U: ✅ ETL Pipeline Ready
- 🔐 Encryption: All data encrypted at rest and in transit
- 🔑 IAM Roles: Least privilege access for all services
- 🔒 Secrets Management: Credentials stored in AWS Secrets Manager
- 🛡️ VPC: Database in private subnet
- 📝 Audit Trail: Complete logging to DynamoDB
|
❌ Manual code writing (hours/days) |
✅ Natural language input (seconds) |
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create your feature branch (
git checkout -b feature/AmazingFeature) - Commit your changes (
git commit -m 'Add some AmazingFeature') - Push to the branch (
git push origin feature/AmazingFeature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- AWS Bedrock for providing Claude Sonnet 4.5 AI model
- AWS Glue for serverless ETL execution
- Anthropic for Claude AI technology
- All contributors and users of DataMorph
If you find DataMorph useful, please consider giving us a star ⭐
Made with ❤️ by the DataMorph Team