Skip to content

RikardoBonilla/duality

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Duality CDC - Change Data Capture for MySQL

Rust License: MIT Build Status Coverage

Simple, Fast and Enterprise-Ready Change Data Capture for MySQL

Duality CDC es un sistema de Change Data Capture (CDC) de alto rendimiento diseñado para capturar, procesar y distribuir cambios de bases de datos MySQL en tiempo real con capacidades empresariales avanzadas.


🚀 Características Principales

Core Features

  • Change Data Capture MySQL - Captura de cambios en tiempo real mediante MySQL binlog
  • API REST Completa - Gestión de eventos y configuración via HTTP
  • Filtrado Inteligente - Sistema avanzado de filtros por tabla, operación y contenido
  • Almacenamiento Flexible - Soporte para SQLite, PostgreSQL y más
  • Autenticación JWT - Sistema de seguridad robusto con RBAC
  • Métricas Prometheus - Observabilidad y monitoreo completo

🏢 Enterprise Features (Phase 6 MVP)

  • 🌐 Multi-Instance Clustering - Alta disponibilidad con leader election
  • 📡 Real-time Streaming - WebSocket + Server-Sent Events
  • 🔄 Advanced Transformations - Motor JavaScript V8 para transformaciones
  • 🔍 Query DSL - Lenguaje de consultas avanzado con optimizador
  • 🎛️ Admin API - Panel administrativo web completo
  • 💾 Automated Backup - Sistema de respaldo programado con recuperación

📋 Tabla de Contenidos


🛠 Instalación

Prerrequisitos

  • Rust 1.70+ - Instalar Rust
  • MySQL 8.0+ - Con binlog habilitado
  • Docker (opcional) - Para despliegue containerizado

Opción 1: Compilación desde fuente

# Clonar el repositorio
git clone https://github.com/RikardoBonilla/duality.git
cd duality

# Compilar versión release
cargo build --release

# Ejecutar con configuración por defecto
./target/release/duality

Opción 2: Docker

# Ejecutar con Docker
docker run -d \
  --name duality-cdc \
  -p 8080:8080 \
  -v $(pwd)/config:/app/config \
  duality:latest

Opción 3: Cargo Install

# Instalar desde crates.io (cuando esté disponible)
cargo install duality-cdc
duality --help

⚙️ Configuración

Configuración Básica

Crear archivo config/default.toml:

[database]
host = "localhost"
port = 3306
username = "duality_user"  
password = "secure_password"
database = "target_db"

[server]
host = "0.0.0.0"
port = 8080
workers = 4

[security]
jwt_secret = "your-super-secret-key-256-bits-long"
enable_tls = false

[storage]
type = "sqlite"
path = "./data/duality.db"

Configuración Enterprise

Para habilitar características empresariales, crear config/enterprise.toml:

[features]
enterprise = true

[cluster]
enabled = true
node_id = "node-001"
discovery_type = "static"
bind_addr = "0.0.0.0:9090"

[streaming]
enabled = true
websocket_port = 8081
sse_port = 8082

[transformations]
enabled = true
javascript_engine = true
max_memory_mb = 512

[backup]
enabled = true
interval_hours = 6
storage_path = "./backups"
retention_days = 30

Configuración de MySQL

Habilitar binlog en MySQL:

-- Configurar binlog (añadir a my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
binlog-row-image = FULL

-- Crear usuario para Duality
CREATE USER 'duality_user'@'%' IDENTIFIED BY 'secure_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'duality_user'@'%';
FLUSH PRIVILEGES;

🎯 Uso Básico

Iniciar el servidor

# Iniciar con configuración por defecto
duality

# Especificar archivo de configuración
duality --config /path/to/config.toml

# Habilitar logging detallado
RUST_LOG=debug duality

# Iniciar con características enterprise
duality --features enterprise

API REST Endpoints

Health Check

curl http://localhost:8080/api/v1/health

Listar eventos capturados

curl -H "Authorization: Bearer YOUR_JWT_TOKEN" \
     http://localhost:8080/api/v1/events

Filtrar eventos por tabla

curl -H "Authorization: Bearer YOUR_JWT_TOKEN" \
     "http://localhost:8080/api/v1/events?table=users&operation=INSERT"

Obtener métricas

curl http://localhost:8080/metrics

WebSocket Streaming (Enterprise)

const ws = new WebSocket('ws://localhost:8081/stream');

ws.onmessage = function(event) {
    const change = JSON.parse(event.data);
    console.log('Database change:', change);
};

// Suscribirse a tabla específica
ws.send(JSON.stringify({
    type: 'subscribe',
    table: 'users',
    operations: ['INSERT', 'UPDATE']
}));

Server-Sent Events (Enterprise)

const eventSource = new EventSource('http://localhost:8082/stream?table=orders');

eventSource.onmessage = function(event) {
    const change = JSON.parse(event.data);
    console.log('Order change:', change);
};

🏢 Características Enterprise

Multi-Instance Clustering

Configurar cluster de alta disponibilidad:

[cluster]
enabled = true
node_id = "node-001"
discovery_type = "etcd"
etcd_endpoints = ["http://etcd1:2379", "http://etcd2:2379"]

[cluster.leader_election]
enabled = true
lease_duration_seconds = 30
renew_interval_seconds = 10

[cluster.failover]
enabled = true
health_check_interval_seconds = 5
failure_threshold = 3

Advanced Transformations

Definir transformaciones JavaScript:

// config/transformations/user_transform.js
function transform(event) {
    if (event.table === 'users' && event.operation === 'INSERT') {
        // Enmascarar datos sensibles
        if (event.data.email) {
            event.data.email_domain = event.data.email.split('@')[1];
            delete event.data.email;
        }
        
        // Agregar metadata
        event.metadata = {
            processed_at: new Date().toISOString(),
            transformer: 'user_privacy_filter'
        };
    }
    
    return event;
}

Query DSL

Usar el lenguaje de consultas avanzado:

curl -X POST http://localhost:8080/api/v1/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": "SELECT * FROM events WHERE table = \"users\" AND operation = \"INSERT\" AND data.age > 18 ORDER BY timestamp DESC LIMIT 100"
  }'

Backup y Recuperación

# Crear backup manual
curl -X POST http://localhost:8080/api/v1/admin/backup

# Listar backups disponibles
curl http://localhost:8080/api/v1/admin/backup/list

# Restaurar desde backup
curl -X POST http://localhost:8080/api/v1/admin/backup/restore/backup-id-123

📚 API Reference

Authentication

Todas las operaciones requieren autenticación JWT excepto health checks y métricas.

# Obtener token JWT
curl -X POST http://localhost:8080/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "admin", "password": "admin123"}'

Events API

GET /api/v1/events

Obtener eventos capturados con paginación y filtros.

Query Parameters:

  • table - Filtrar por tabla específica
  • operation - Filtrar por operación (INSERT, UPDATE, DELETE)
  • from - Timestamp inicial (ISO 8601)
  • to - Timestamp final (ISO 8601)
  • limit - Número máximo de resultados (default: 100, max: 1000)
  • offset - Offset para paginación

Response:

{
  "events": [
    {
      "id": "uuid-event-id",
      "table": "users",
      "operation": "INSERT",
      "timestamp": "2024-01-15T10:30:00Z",
      "data": {
        "id": 123,
        "name": "John Doe",
        "email": "john@example.com"
      },
      "metadata": {
        "binlog_position": 12345,
        "gtid": "uuid:1-10"
      }
    }
  ],
  "pagination": {
    "total": 1500,
    "limit": 100,
    "offset": 0,
    "has_more": true
  }
}

GET /api/v1/events/stats

Obtener estadísticas de eventos.

Response:

{
  "total_events": 15000,
  "events_by_table": {
    "users": 5000,
    "orders": 7500,
    "products": 2500
  },
  "events_by_operation": {
    "INSERT": 8000,
    "UPDATE": 5500,
    "DELETE": 1500
  },
  "last_24h": 1200
}

Admin API (Enterprise)

GET /api/v1/admin/sources

Listar fuentes de datos configuradas.

POST /api/v1/admin/sources

Crear nueva fuente de datos.

Request Body:

{
  "name": "Production MySQL",
  "type": "mysql",
  "config": {
    "host": "prod-mysql.example.com",
    "port": 3306,
    "username": "cdc_user",
    "password": "secure_password",
    "database": "production"
  },
  "auto_start": true
}

Streaming API (Enterprise)

WebSocket /stream

Conexión WebSocket para eventos en tiempo real.

Mensajes de suscripción:

{
  "type": "subscribe",
  "table": "users",
  "operations": ["INSERT", "UPDATE"],
  "filters": {
    "data.status": "active"
  }
}

Server-Sent Events /stream

HTTP streaming para eventos en tiempo real.

Query Parameters:

  • table - Tabla específica
  • operation - Operación específica
  • filter - Filtro JSON

🏗 Arquitectura

Componentes Principales

┌─────────────────────────────────────────────────────────────────┐
│                        Duality CDC Architecture                  │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │   MySQL     │  │  Transform  │  │      Storage Layer      │  │
│  │  Binlog     │─▶│   Engine    │─▶│   SQLite/PostgreSQL     │  │
│  │ Connector   │  │ (JS V8)     │  │        + Cache          │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
│         │                 │                       │              │
│         ▼                 ▼                       ▼              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │   Event     │  │  Filtering  │  │      API Layer          │  │
│  │  Processor  │  │   System    │  │   REST + WebSocket      │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
│         │                 │                       │              │
│         └─────────────────┼───────────────────────┘              │
│                           │                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │  Cluster    │  │   Backup    │  │      Monitoring         │  │
│  │  Manager    │  │   System    │  │   Metrics + Health      │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Flujo de Datos

  1. Captura - MySQL binlog reader captura cambios en tiempo real
  2. Procesamiento - Eventos son procesados y filtrados
  3. Transformación - Motor JavaScript aplica transformaciones personalizadas
  4. Almacenamiento - Eventos son persistidos en storage configurable
  5. Distribución - API REST, WebSocket y SSE distribuyen eventos
  6. Monitoreo - Métricas y health checks en Prometheus

Patrones de Diseño

  • Event Sourcing - Todos los cambios son eventos inmutables
  • CQRS - Separación de comandos y consultas
  • Circuit Breaker - Resiliencia ante fallos de conectores
  • Bulkhead - Aislamiento de componentes críticos
  • Observer Pattern - Sistema de suscripciones para streaming

🔧 Desarrollo

Configurar entorno de desarrollo

# Clonar repositorio
git clone https://github.com/RikardoBonilla/duality.git
cd duality

# Instalar dependencias
cargo fetch

# Ejecutar tests
cargo test

# Ejecutar con hot reload
cargo watch -x run

# Linting y formatting
cargo clippy
cargo fmt

Tests

# Ejecutar todos los tests
cargo test

# Tests de integración específicos
cargo test --test integration_tests

# Tests de performance
cargo test --release --test performance

# Coverage report
cargo tarpaulin --out html

Estructura del proyecto

src/
├── main.rs                 # Entry point
├── lib.rs                  # Library root
├── config/                 # Configuration management
├── connectors/            # Database connectors
│   └── mysql/            # MySQL binlog connector
├── events/               # Event processing
├── storage/              # Storage abstraction
├── api/                  # REST API
├── security/             # Authentication & authorization
├── metrics/              # Monitoring & observability
├── cluster/              # Multi-instance clustering (Enterprise)
├── streaming/            # Real-time streaming (Enterprise)
├── transformations/      # Advanced transformations (Enterprise)
├── query_dsl/            # Query DSL engine (Enterprise)
├── admin/                # Admin API (Enterprise)
├── backup/               # Backup system (Enterprise)
└── utils/                # Utilities

Contribución

  1. Fork el repositorio
  2. Crear feature branch (git checkout -b feature/amazing-feature)
  3. Commit cambios (git commit -m 'Add amazing feature')
  4. Push a branch (git push origin feature/amazing-feature)
  5. Abrir Pull Request

🚀 Despliegue

Docker Compose

version: '3.8'
services:
  duality:
    image: duality:latest
    ports:
      - "8080:8080"
      - "8081:8081"  # WebSocket
      - "8082:8082"  # SSE
    environment:
      - RUST_LOG=info
      - DUALITY_CONFIG=/app/config/production.toml
    volumes:
      - ./config:/app/config
      - ./data:/app/data
    depends_on:
      - mysql
      - etcd

  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: testdb
    command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW

  etcd:
    image: quay.io/coreos/etcd:v3.5.0
    command:
      - /usr/local/bin/etcd
      - --data-dir=/etcd-data
      - --listen-client-urls=http://0.0.0.0:2379
      - --advertise-client-urls=http://etcd:2379

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: duality-cdc
spec:
  replicas: 3
  selector:
    matchLabels:
      app: duality-cdc
  template:
    metadata:
      labels:
        app: duality-cdc
    spec:
      containers:
      - name: duality
        image: duality:latest
        ports:
        - containerPort: 8080
        - containerPort: 8081
        - containerPort: 8082
        env:
        - name: RUST_LOG
          value: info
        - name: DUALITY_FEATURES
          value: enterprise
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"

Monitoreo con Prometheus

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'duality-cdc'
    static_configs:
      - targets: ['duality:8080']
    metrics_path: '/metrics'
    scrape_interval: 5s

🐛 Troubleshooting

Problemas Comunes

Error de conexión MySQL

Error: Failed to connect to MySQL: Connection refused

Solución:

  1. Verificar que MySQL esté ejecutándose
  2. Verificar credenciales en configuración
  3. Verificar que binlog esté habilitado
  4. Verificar permisos del usuario CDC

Alto uso de memoria

Warning: High memory usage detected (>1GB)

Solución:

  1. Ajustar storage.buffer_size en configuración
  2. Implementar filtros más específicos
  3. Ajustar transformations.max_memory_mb
  4. Verificar memory leaks con herramientas de profiling

Eventos perdidos

Warning: Event sequence gap detected

Solución:

  1. Verificar estabilidad de conexión MySQL
  2. Aumentar connector.reconnect_attempts
  3. Verificar que binlog no haya sido rotado/purgado
  4. Implementar recovery desde backup

Logging y Debugging

# Debug completo
RUST_LOG=debug duality

# Solo errores
RUST_LOG=error duality

# Log específico por módulo
RUST_LOG=duality::connectors::mysql=debug duality

# Output JSON para análisis
RUST_LOG=info DUALITY_LOG_FORMAT=json duality

Métricas de Diagnóstico

# Health check detallado
curl http://localhost:8080/api/v1/health/detailed

# Métricas Prometheus
curl http://localhost:8080/metrics

# Estadísticas de performance
curl -H "Authorization: Bearer TOKEN" \
     http://localhost:8080/api/v1/admin/stats/performance

📊 Benchmarks y Performance

Performance Baseline

  • Throughput: 10,000+ eventos/segundo
  • Latencia: <50ms end-to-end
  • Memoria: ~100MB base + buffer configurable
  • CPU: <10% en idle, ~50% bajo carga máxima

Optimización de Performance

Configuración para alto throughput

[storage]
buffer_size = 10000
batch_size = 1000
flush_interval_ms = 100

[connector]
connection_pool_size = 10
read_timeout_seconds = 30

[transformations]
max_concurrent_transforms = 8

Métricas importantes

  • duality_events_total - Total de eventos procesados
  • duality_events_per_second - Throughput actual
  • duality_processing_duration_seconds - Latencia de procesamiento
  • duality_memory_usage_bytes - Uso de memoria
  • duality_connection_errors_total - Errores de conexión

🔐 Seguridad

Configuración de Seguridad

[security]
# JWT Configuration
jwt_secret = "your-256-bit-secret-key-here"
jwt_expiration_hours = 24
jwt_issuer = "duality-cdc"

# Password Policy
min_password_length = 12
require_special_chars = true
require_numbers = true

# TLS Configuration
enable_tls = true
cert_file = "/path/to/cert.pem"
key_file = "/path/to/key.pem"

# Rate Limiting
enable_rate_limiting = true
max_requests_per_minute = 1000

# Audit
enable_audit_log = true
audit_log_path = "./logs/audit.log"

Best Practices

  1. Usar HTTPS en producción
  2. Rotar JWT secrets regularmente
  3. Implementar rate limiting
  4. Habilitar audit logging
  5. Usar usuarios con permisos mínimos
  6. Encriptar backups

🔗 Referencias y Recursos

Documentación Oficial

Comparación con Otras Soluciones

Característica Duality CDC Debezium AWS DMS Confluent Platform
Lenguaje Rust Java Managed Java/Scala
Performance ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐
Memoria ~100MB ~500MB N/A ~1GB
Configuración Simple Compleja GUI Compleja
Clustering
Transformaciones JS
Costo Gratis Gratis $$$ $$$$

📝 License

Este proyecto está licenciado bajo la Licencia MIT - ver el archivo LICENSE para más detalles.


🙏 Agradecimientos

  • MySQL Team - Por la excelente documentación del binlog
  • Rust Community - Por las increíbles herramientas y libraries
  • Tokio Project - Por el runtime async de alto rendimiento
  • Contributors - Todos los que han contribuido al proyecto

¿Preguntas o problemas? Abrir un issue

¿Quieres contribuir? Ver guía de contribución

Hecho con ❤️ en Rust

About

High-performance Change Data Capture engine for MySQL built in Rust. Real-time binlog streaming, enterprise clustering, JS transformations, Prometheus metrics and S3 backup.

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors