Simple, Fast and Enterprise-Ready Change Data Capture for MySQL
Duality CDC es un sistema de Change Data Capture (CDC) de alto rendimiento diseñado para capturar, procesar y distribuir cambios de bases de datos MySQL en tiempo real con capacidades empresariales avanzadas.
- ✅ Change Data Capture MySQL - Captura de cambios en tiempo real mediante MySQL binlog
- ✅ API REST Completa - Gestión de eventos y configuración via HTTP
- ✅ Filtrado Inteligente - Sistema avanzado de filtros por tabla, operación y contenido
- ✅ Almacenamiento Flexible - Soporte para SQLite, PostgreSQL y más
- ✅ Autenticación JWT - Sistema de seguridad robusto con RBAC
- ✅ Métricas Prometheus - Observabilidad y monitoreo completo
- 🌐 Multi-Instance Clustering - Alta disponibilidad con leader election
- 📡 Real-time Streaming - WebSocket + Server-Sent Events
- 🔄 Advanced Transformations - Motor JavaScript V8 para transformaciones
- 🔍 Query DSL - Lenguaje de consultas avanzado con optimizador
- 🎛️ Admin API - Panel administrativo web completo
- 💾 Automated Backup - Sistema de respaldo programado con recuperación
- Instalación
- Configuración
- Uso Básico
- Características Enterprise
- API Reference
- Arquitectura
- Desarrollo
- Despliegue
- Troubleshooting
- Contributing
- Rust 1.70+ - Instalar Rust
- MySQL 8.0+ - Con binlog habilitado
- Docker (opcional) - Para despliegue containerizado
# Clonar el repositorio
git clone https://github.com/RikardoBonilla/duality.git
cd duality
# Compilar versión release
cargo build --release
# Ejecutar con configuración por defecto
./target/release/duality# Ejecutar con Docker
docker run -d \
--name duality-cdc \
-p 8080:8080 \
-v $(pwd)/config:/app/config \
duality:latest# Instalar desde crates.io (cuando esté disponible)
cargo install duality-cdc
duality --helpCrear archivo config/default.toml:
[database]
host = "localhost"
port = 3306
username = "duality_user"
password = "secure_password"
database = "target_db"
[server]
host = "0.0.0.0"
port = 8080
workers = 4
[security]
jwt_secret = "your-super-secret-key-256-bits-long"
enable_tls = false
[storage]
type = "sqlite"
path = "./data/duality.db"Para habilitar características empresariales, crear config/enterprise.toml:
[features]
enterprise = true
[cluster]
enabled = true
node_id = "node-001"
discovery_type = "static"
bind_addr = "0.0.0.0:9090"
[streaming]
enabled = true
websocket_port = 8081
sse_port = 8082
[transformations]
enabled = true
javascript_engine = true
max_memory_mb = 512
[backup]
enabled = true
interval_hours = 6
storage_path = "./backups"
retention_days = 30Habilitar binlog en MySQL:
-- Configurar binlog (añadir a my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
binlog-row-image = FULL
-- Crear usuario para Duality
CREATE USER 'duality_user'@'%' IDENTIFIED BY 'secure_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'duality_user'@'%';
FLUSH PRIVILEGES;# Iniciar con configuración por defecto
duality
# Especificar archivo de configuración
duality --config /path/to/config.toml
# Habilitar logging detallado
RUST_LOG=debug duality
# Iniciar con características enterprise
duality --features enterprisecurl http://localhost:8080/api/v1/healthcurl -H "Authorization: Bearer YOUR_JWT_TOKEN" \
http://localhost:8080/api/v1/eventscurl -H "Authorization: Bearer YOUR_JWT_TOKEN" \
"http://localhost:8080/api/v1/events?table=users&operation=INSERT"curl http://localhost:8080/metricsconst ws = new WebSocket('ws://localhost:8081/stream');
ws.onmessage = function(event) {
const change = JSON.parse(event.data);
console.log('Database change:', change);
};
// Suscribirse a tabla específica
ws.send(JSON.stringify({
type: 'subscribe',
table: 'users',
operations: ['INSERT', 'UPDATE']
}));const eventSource = new EventSource('http://localhost:8082/stream?table=orders');
eventSource.onmessage = function(event) {
const change = JSON.parse(event.data);
console.log('Order change:', change);
};Configurar cluster de alta disponibilidad:
[cluster]
enabled = true
node_id = "node-001"
discovery_type = "etcd"
etcd_endpoints = ["http://etcd1:2379", "http://etcd2:2379"]
[cluster.leader_election]
enabled = true
lease_duration_seconds = 30
renew_interval_seconds = 10
[cluster.failover]
enabled = true
health_check_interval_seconds = 5
failure_threshold = 3Definir transformaciones JavaScript:
// config/transformations/user_transform.js
function transform(event) {
if (event.table === 'users' && event.operation === 'INSERT') {
// Enmascarar datos sensibles
if (event.data.email) {
event.data.email_domain = event.data.email.split('@')[1];
delete event.data.email;
}
// Agregar metadata
event.metadata = {
processed_at: new Date().toISOString(),
transformer: 'user_privacy_filter'
};
}
return event;
}Usar el lenguaje de consultas avanzado:
curl -X POST http://localhost:8080/api/v1/query \
-H "Content-Type: application/json" \
-d '{
"query": "SELECT * FROM events WHERE table = \"users\" AND operation = \"INSERT\" AND data.age > 18 ORDER BY timestamp DESC LIMIT 100"
}'# Crear backup manual
curl -X POST http://localhost:8080/api/v1/admin/backup
# Listar backups disponibles
curl http://localhost:8080/api/v1/admin/backup/list
# Restaurar desde backup
curl -X POST http://localhost:8080/api/v1/admin/backup/restore/backup-id-123Todas las operaciones requieren autenticación JWT excepto health checks y métricas.
# Obtener token JWT
curl -X POST http://localhost:8080/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"username": "admin", "password": "admin123"}'Obtener eventos capturados con paginación y filtros.
Query Parameters:
table- Filtrar por tabla específicaoperation- Filtrar por operación (INSERT, UPDATE, DELETE)from- Timestamp inicial (ISO 8601)to- Timestamp final (ISO 8601)limit- Número máximo de resultados (default: 100, max: 1000)offset- Offset para paginación
Response:
{
"events": [
{
"id": "uuid-event-id",
"table": "users",
"operation": "INSERT",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
"id": 123,
"name": "John Doe",
"email": "john@example.com"
},
"metadata": {
"binlog_position": 12345,
"gtid": "uuid:1-10"
}
}
],
"pagination": {
"total": 1500,
"limit": 100,
"offset": 0,
"has_more": true
}
}Obtener estadísticas de eventos.
Response:
{
"total_events": 15000,
"events_by_table": {
"users": 5000,
"orders": 7500,
"products": 2500
},
"events_by_operation": {
"INSERT": 8000,
"UPDATE": 5500,
"DELETE": 1500
},
"last_24h": 1200
}Listar fuentes de datos configuradas.
Crear nueva fuente de datos.
Request Body:
{
"name": "Production MySQL",
"type": "mysql",
"config": {
"host": "prod-mysql.example.com",
"port": 3306,
"username": "cdc_user",
"password": "secure_password",
"database": "production"
},
"auto_start": true
}Conexión WebSocket para eventos en tiempo real.
Mensajes de suscripción:
{
"type": "subscribe",
"table": "users",
"operations": ["INSERT", "UPDATE"],
"filters": {
"data.status": "active"
}
}HTTP streaming para eventos en tiempo real.
Query Parameters:
table- Tabla específicaoperation- Operación específicafilter- Filtro JSON
┌─────────────────────────────────────────────────────────────────┐
│ Duality CDC Architecture │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ MySQL │ │ Transform │ │ Storage Layer │ │
│ │ Binlog │─▶│ Engine │─▶│ SQLite/PostgreSQL │ │
│ │ Connector │ │ (JS V8) │ │ + Cache │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Event │ │ Filtering │ │ API Layer │ │
│ │ Processor │ │ System │ │ REST + WebSocket │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ │ │ │ │
│ └─────────────────┼───────────────────────┘ │
│ │ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Cluster │ │ Backup │ │ Monitoring │ │
│ │ Manager │ │ System │ │ Metrics + Health │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
- Captura - MySQL binlog reader captura cambios en tiempo real
- Procesamiento - Eventos son procesados y filtrados
- Transformación - Motor JavaScript aplica transformaciones personalizadas
- Almacenamiento - Eventos son persistidos en storage configurable
- Distribución - API REST, WebSocket y SSE distribuyen eventos
- Monitoreo - Métricas y health checks en Prometheus
- Event Sourcing - Todos los cambios son eventos inmutables
- CQRS - Separación de comandos y consultas
- Circuit Breaker - Resiliencia ante fallos de conectores
- Bulkhead - Aislamiento de componentes críticos
- Observer Pattern - Sistema de suscripciones para streaming
# Clonar repositorio
git clone https://github.com/RikardoBonilla/duality.git
cd duality
# Instalar dependencias
cargo fetch
# Ejecutar tests
cargo test
# Ejecutar con hot reload
cargo watch -x run
# Linting y formatting
cargo clippy
cargo fmt# Ejecutar todos los tests
cargo test
# Tests de integración específicos
cargo test --test integration_tests
# Tests de performance
cargo test --release --test performance
# Coverage report
cargo tarpaulin --out htmlsrc/
├── main.rs # Entry point
├── lib.rs # Library root
├── config/ # Configuration management
├── connectors/ # Database connectors
│ └── mysql/ # MySQL binlog connector
├── events/ # Event processing
├── storage/ # Storage abstraction
├── api/ # REST API
├── security/ # Authentication & authorization
├── metrics/ # Monitoring & observability
├── cluster/ # Multi-instance clustering (Enterprise)
├── streaming/ # Real-time streaming (Enterprise)
├── transformations/ # Advanced transformations (Enterprise)
├── query_dsl/ # Query DSL engine (Enterprise)
├── admin/ # Admin API (Enterprise)
├── backup/ # Backup system (Enterprise)
└── utils/ # Utilities
- Fork el repositorio
- Crear feature branch (
git checkout -b feature/amazing-feature) - Commit cambios (
git commit -m 'Add amazing feature') - Push a branch (
git push origin feature/amazing-feature) - Abrir Pull Request
version: '3.8'
services:
duality:
image: duality:latest
ports:
- "8080:8080"
- "8081:8081" # WebSocket
- "8082:8082" # SSE
environment:
- RUST_LOG=info
- DUALITY_CONFIG=/app/config/production.toml
volumes:
- ./config:/app/config
- ./data:/app/data
depends_on:
- mysql
- etcd
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: testdb
command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW
etcd:
image: quay.io/coreos/etcd:v3.5.0
command:
- /usr/local/bin/etcd
- --data-dir=/etcd-data
- --listen-client-urls=http://0.0.0.0:2379
- --advertise-client-urls=http://etcd:2379apiVersion: apps/v1
kind: Deployment
metadata:
name: duality-cdc
spec:
replicas: 3
selector:
matchLabels:
app: duality-cdc
template:
metadata:
labels:
app: duality-cdc
spec:
containers:
- name: duality
image: duality:latest
ports:
- containerPort: 8080
- containerPort: 8081
- containerPort: 8082
env:
- name: RUST_LOG
value: info
- name: DUALITY_FEATURES
value: enterprise
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'duality-cdc'
static_configs:
- targets: ['duality:8080']
metrics_path: '/metrics'
scrape_interval: 5sError: Failed to connect to MySQL: Connection refused
Solución:
- Verificar que MySQL esté ejecutándose
- Verificar credenciales en configuración
- Verificar que binlog esté habilitado
- Verificar permisos del usuario CDC
Warning: High memory usage detected (>1GB)
Solución:
- Ajustar
storage.buffer_sizeen configuración - Implementar filtros más específicos
- Ajustar
transformations.max_memory_mb - Verificar memory leaks con herramientas de profiling
Warning: Event sequence gap detected
Solución:
- Verificar estabilidad de conexión MySQL
- Aumentar
connector.reconnect_attempts - Verificar que binlog no haya sido rotado/purgado
- Implementar recovery desde backup
# Debug completo
RUST_LOG=debug duality
# Solo errores
RUST_LOG=error duality
# Log específico por módulo
RUST_LOG=duality::connectors::mysql=debug duality
# Output JSON para análisis
RUST_LOG=info DUALITY_LOG_FORMAT=json duality# Health check detallado
curl http://localhost:8080/api/v1/health/detailed
# Métricas Prometheus
curl http://localhost:8080/metrics
# Estadísticas de performance
curl -H "Authorization: Bearer TOKEN" \
http://localhost:8080/api/v1/admin/stats/performance- Throughput: 10,000+ eventos/segundo
- Latencia: <50ms end-to-end
- Memoria: ~100MB base + buffer configurable
- CPU: <10% en idle, ~50% bajo carga máxima
[storage]
buffer_size = 10000
batch_size = 1000
flush_interval_ms = 100
[connector]
connection_pool_size = 10
read_timeout_seconds = 30
[transformations]
max_concurrent_transforms = 8duality_events_total- Total de eventos procesadosduality_events_per_second- Throughput actualduality_processing_duration_seconds- Latencia de procesamientoduality_memory_usage_bytes- Uso de memoriaduality_connection_errors_total- Errores de conexión
[security]
# JWT Configuration
jwt_secret = "your-256-bit-secret-key-here"
jwt_expiration_hours = 24
jwt_issuer = "duality-cdc"
# Password Policy
min_password_length = 12
require_special_chars = true
require_numbers = true
# TLS Configuration
enable_tls = true
cert_file = "/path/to/cert.pem"
key_file = "/path/to/key.pem"
# Rate Limiting
enable_rate_limiting = true
max_requests_per_minute = 1000
# Audit
enable_audit_log = true
audit_log_path = "./logs/audit.log"- Usar HTTPS en producción
- Rotar JWT secrets regularmente
- Implementar rate limiting
- Habilitar audit logging
- Usar usuarios con permisos mínimos
- Encriptar backups
| Característica | Duality CDC | Debezium | AWS DMS | Confluent Platform |
|---|---|---|---|---|
| Lenguaje | Rust | Java | Managed | Java/Scala |
| Performance | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| Memoria | ~100MB | ~500MB | N/A | ~1GB |
| Configuración | Simple | Compleja | GUI | Compleja |
| Clustering | ✅ | ✅ | ✅ | ✅ |
| Transformaciones JS | ✅ | ❌ | ❌ | ✅ |
| Costo | Gratis | Gratis | $$$ | $$$$ |
Este proyecto está licenciado bajo la Licencia MIT - ver el archivo LICENSE para más detalles.
- MySQL Team - Por la excelente documentación del binlog
- Rust Community - Por las increíbles herramientas y libraries
- Tokio Project - Por el runtime async de alto rendimiento
- Contributors - Todos los que han contribuido al proyecto
¿Preguntas o problemas? Abrir un issue
¿Quieres contribuir? Ver guía de contribución
Hecho con ❤️ en Rust