A high-performance, multi-protocol DEX arbitrage detection system with real-time indexing and parallel processing capabilities.
- Substreams Integration: Direct streaming from UniswapV3, Aerodrome, and PancakeSwap substreams
- Chronological Event Processing: LogIndex-based ordering ensures accurate pool state reconstruction
- Atomic Block Processing: Per-block database transactions with automatic rollback on errors
- Auto-Reconnection: Exponential backoff strategy for resilient streaming
- Worker Pool Architecture: Configurable worker threads (default: 8) for parallel arbitrage calculations
- Batch Processing: Sequential batch execution with fresh state fetching per batch
- Non-Blocking Event Loop: Uses
setImmediate()to prevent blocking during continuous simulation - Parallel Path Analysis: Multiple arbitrage paths calculated simultaneously within each batch
- Protocol-Specific SQLite Databases: Separate databases for each DEX protocol
- Efficient State Queries: Optimized pool state fetching with indexed lookups
- Transaction Management: Atomic commits ensure data consistency
- Connection Pooling: Managed database connections across workers
Substream Events → LogIndex Sorting → State Updates → SQLite Storage
↓
State Manager
↓
Batch Fetching (Fresh States Per Batch)
↓
Worker Pool (Parallel Processing)
↓
Arbitrage Opportunities
BaseStreamWorker (Abstract)
├── Connection management
├── Reconnection logic
├── Event handling framework
└── Protobuf value extraction
Protocol Workers (Concrete)
├── UniV3StreamWorker
├── AerodromeStreamWorker
└── PancakeSwapStreamWorker
Simulation System
├── SimulationRunner (Orchestrator)
├── StateManager (Fresh state fetching)
├── ArbitrageSimulator (Worker pool manager)
└── arbitrageWorker.js (Individual calculators)
- LogIndex Ordering: Events sorted by logIndex within each block for chronological accuracy
- Protobuf Type Handling: Automatic extraction of typed values from protobuf structures
- Batch Transactions: Entire blocks committed atomically, reducing I/O overhead
- Continuous Loop: Zero-delay cycles for maximum opportunity detection speed
- Fresh State Per Batch: Workers always use the most current pool states
- Configurable Workers: Scale from 1-32+ workers based on CPU cores
- Memory Efficient: Workers are reused across batches, minimizing allocation overhead
- Lazy Loading: Only fetches pool states needed for current batch
- Protocol Mapping: Unified interface across different DEX protocols
- Cache-Friendly: Sequential batch processing improves cache hit rates
# Clone the repository
git clone https://github.com/yourusername/defi-arb-engine.git
cd defi-arb-engine
# Install dependencies
npm install
# Configure environment
cp .env.example .env
# Edit .env with your configuration# Substreams API
SUBSTREAMS_API_TOKEN=your_token_here
START_BLOCK=32519000
STOP_BLOCK= # Optional, leave empty for continuous streaming
# Simulation Settings
ENABLE_SIMULATION=true
SIM_WORKERS=8 # Number of worker threads (adjust based on CPU cores)
# Arbitrage Parameters
SIM_MAX_INPUT=10 # Maximum input amount in WETH
SIM_MIN_INPUT=0.01 # Minimum input amount
SIM_PROFIT_THRESHOLD=0.01 # Minimum profit threshold
# Protocol Configuration
ENABLED_PROTOCOLS=univ3,aerodrome,pancakeswap
# Database Paths
PATHS_DB=./paths.sqlite
UNIV3_DB=./uniswap_v3.sqlite
AERODROME_DB=./aerodrome.sqlite
PANCAKE_DB=./pancake.sqlite
# Network
WETH_ADDRESS=0x4200000000000000000000000000000000000006ENABLE_SIMULATION=true SIM_WORKERS=8 node enhancedConsumer.jsENABLE_SIMULATION=false node enhancedConsumer.jsENABLE_SIMULATION=true SIM_WORKERS=16 node enhancedConsumer.jsnpm run dev- Event Processing: ~1000-2000 events/second
- Simulation Cycle: 8-12 seconds for 100 paths
- State Fetching: <500ms per batch
- Worker Throughput: 8 paths calculated in parallel per batch
- Simulation Cycle: 4-6 seconds for 100 paths
- Worker Throughput: 16 paths calculated in parallel per batch
- CPU Utilization: ~80-90% across all cores
- BaseStreamWorker.js: Abstract base with connection management
- UniV3StreamWorker.js: UniswapV3 event processing
- AerodromeStreamWorker.js: Aerodrome event processing
- PancakeSwapStreamWorker.js: PancakeSwap event processing
- SimulationRunner.js: Main orchestrator with continuous loop
- StateManager.js: Fresh state fetching from protocol databases
- ArbitrageSimulator.js: Worker pool manager
- arbitrageWorker.js: Individual worker thread logic
- DatabaseManager.js: Connection and transaction management
- models.js: Sequelize models for pools, ticks, and paths
- database-loader.js: Initial database setup
- config.js: Centralized configuration management
- .env: Environment-specific settings
🚀 Starting Enhanced Multi-Protocol Consumer...
📋 Enabled protocols: univ3,aerodrome,pancakeswap
🧮 Simulation enabled: Yes (8 workers)
✅ StateManager connected to univ3 database
✅ StateManager connected to aerodrome database
✅ StateManager connected to pancakeswap database
📊 Found 127 paths in database
🔄 Starting simulation cycle 1
📦 Processing batch: paths 1-8 of 127
🔄 Fetching fresh states for batch...
✅ Fetched fresh states for 23 pools (342ms)
📊 Batch complete: 8/127 (6.3%) - Profitable: 1
💎 Top Profitable Opportunities:
1. Path 42: 0.0234 WETH profit (5.5 input)
Route: WETH → USDC → DAI → WETH
🔄 Simulation cycle 1 completed (8.67s)
💰 Found 3 profitable opportunities
- Receive Block Data from substream
- Parse Entity Changes with protobuf handling
- Sort by LogIndex for chronological order
- Apply State Changes atomically per block
- Commit to Database with rollback on error
- Load Paths from paths database
- Divide into Batches based on worker count
- Fetch Fresh States for current batch pools
- Spawn Workers for parallel calculation
- Collect Results and identify opportunities
- Repeat Continuously with zero delay
Ensures events within a block are processed in the exact order they occurred on-chain, critical for accurate pool state reconstruction.
Rather than fetching all states once, the system fetches fresh states for each batch, ensuring workers always use the most current data available.
Workers are created once per batch and reused, minimizing thread creation overhead while maintaining parallelism.
Unified interface across different DEX protocols allows easy addition of new protocols without changing core logic.
- Enhanced Consumer Summary - Architecture and improvements
- Simulation Guide - Detailed simulation system documentation
Contributions are welcome! Please ensure:
- Code follows existing patterns
- Performance optimizations are documented
- Tests pass before submitting PRs
MIT
This software is for educational and research purposes only. Use at your own risk. Always test thoroughly before deploying with real funds.