diff --git a/.env.example b/.env.example index 84e5e9d8..924dd6c9 100644 --- a/.env.example +++ b/.env.example @@ -52,8 +52,13 @@ LLM_API_KEY=your-llm-api-key # LLM_BASE_URL=https://api.venice.ai/api/v1 # auto-set for known providers # LLM_MODEL=grok-41-fast # auto-set for known providers +# Dune (hourly large-transfer monitor) +DUNE_API_KEY=your-dune-api-key +DUNE_LARGE_TRANSFERS_QUERY_ID=1234567 +# DUNE_LARGE_TRANSFER_THRESHOLD=5000000 + # Global settings LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR (DEBUG skips Telegram sends) REQUEST_TIMEOUT=30 RETRY_COUNT=3 -BACKOFF_FACTOR=1.0 \ No newline at end of file +BACKOFF_FACTOR=1.0 diff --git a/.github/workflows/_run-monitoring.yml b/.github/workflows/_run-monitoring.yml index 40fbcd68..f4cf1b61 100644 --- a/.github/workflows/_run-monitoring.yml +++ b/.github/workflows/_run-monitoring.yml @@ -126,6 +126,8 @@ env: LLM_API_KEY: ${{ secrets.LLM_API_KEY }} LLM_MODEL: ${{ secrets.LLM_MODEL }} LLM_PROVIDER: ${{ secrets.LLM_PROVIDER }} + DUNE_API_KEY: ${{ secrets.DUNE_API_KEY }} + DUNE_LARGE_TRANSFERS_QUERY_ID: ${{ vars.DUNE_LARGE_TRANSFERS_QUERY_ID }} jobs: run: diff --git a/.github/workflows/hourly.yml b/.github/workflows/hourly.yml index 32ef47a5..be5bd6d8 100644 --- a/.github/workflows/hourly.yml +++ b/.github/workflows/hourly.yml @@ -30,6 +30,7 @@ jobs: # silo/ur_sniff.py usdai/main.py usdai/large_mints.py + stables/dune_large_transfers.py yearn/alert_large_flows.py maple/main.py timelock/timelock_alerts.py diff --git a/stables/dune_large_transfers.py b/stables/dune_large_transfers.py new file mode 100644 index 00000000..402df6d1 --- /dev/null +++ b/stables/dune_large_transfers.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +"""Hourly large-transfer monitor backed by a Dune query result. + +Expected Dune query output columns: +- block_time +- blockchain +- tx_hash +- from +- to +- contract_address +- symbol +- amount +- amount_usd + +Rows are sorted defensively by ``block_time`` descending before deduping and +alerting. +""" + +from __future__ import annotations + +import os +from typing import Any + +from dune_client.client import DuneClient +from dune_client.query import QueryBase + +from utils.alert import Alert, AlertSeverity, send_alert +from utils.cache import cache_filename, get_last_value_for_key_from_file, write_last_value_to_file +from utils.config import Config +from utils.logging import get_logger + +logger = get_logger("stables.dune_large_transfers") +PROTOCOL = "stables" + +CACHE_KEY_LAST_TX = "stables_dune_large_transfers_last_tx" +MAX_ROWS_PER_PROTOCOL_ALERT = 10 +DEFAULT_LARGE_TRANSFER_THRESHOLD = 5_000_000.0 + +# Route each token to its owning protocol channel. +TOKEN_ROUTE: dict[tuple[str, str], tuple[str, str]] = { + ("ethereum", "0xcccc62962d17b8914c62d74ffb843d73b2a3cccc"): ("cUSD", "cap"), + ("ethereum", "0x48f9e38f3070ad8945dfeae3fa70987722e3d89c"): ("iUSD", "infinifi"), +} + +CHAIN_TX_EXPLORER: dict[str, str] = { + "ethereum": "https://etherscan.io/tx/", + "arbitrum": "https://arbiscan.io/tx/", + "optimism": "https://optimistic.etherscan.io/tx/", + "base": "https://basescan.org/tx/", + "polygon": "https://polygonscan.com/tx/", +} + + +def _as_str(value: Any) -> str: + if value is None: + return "" + return str(value) + + +def _tx_link(blockchain: str, tx_hash: str) -> str: + prefix = CHAIN_TX_EXPLORER.get(blockchain.lower()) + if not prefix: + return tx_hash + return f"{prefix}{tx_hash}" + + +def _row_key(row: dict[str, Any]) -> str: + tx_hash = _as_str(row.get("tx_hash")).lower() + contract = _as_str(row.get("contract_address")).lower() + log_index = _as_str(row.get("log_index")) + parts = [tx_hash, contract] + if log_index: + parts.append(log_index) + return "|".join(parts) + + +def _route_for_row(row: dict[str, Any]) -> tuple[str, str] | None: + chain = _as_str(row.get("blockchain")).lower() + addr = _as_str(row.get("contract_address")).lower() + return TOKEN_ROUTE.get((chain, addr)) + + +def _build_row_line(row: dict[str, Any]) -> str: + chain = _as_str(row.get("blockchain")) + symbol = _as_str(row.get("symbol")) or "unknown" + amount = row.get("amount") + amount_usd = row.get("amount_usd") + tx_hash = _as_str(row.get("tx_hash")) + link = _tx_link(chain, tx_hash) + return f"- {symbol} on {chain}: amount={amount}, amount_usd={amount_usd}, tx={link}" + + +def _build_protocol_lines(protocol_rows: list[dict[str, Any]], query_id: int) -> list[str]: + included_rows = protocol_rows[:MAX_ROWS_PER_PROTOCOL_ALERT] + lines = [_build_row_line(row) for row in included_rows] + + omitted_count = len(protocol_rows) - len(included_rows) + if omitted_count > 0: + lines.append(f"- +{omitted_count} more not shown -- see Dune query {query_id} directly") + + return lines + + +def _group_rows_by_protocol(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: + grouped: dict[str, list[dict[str, Any]]] = {} + for row in rows: + route = _route_for_row(row) + if route is None: + continue + _, protocol = route + grouped.setdefault(protocol, []).append(row) + return grouped + + +def _to_float(value: Any) -> float: + try: + return float(value) + except (TypeError, ValueError): + return 0.0 + + +def _is_large_transfer(row: dict[str, Any], threshold: float) -> bool: + amount_usd = _to_float(row.get("amount_usd")) + return amount_usd >= threshold + + +def _sort_rows_newest_first(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + return sorted(rows, key=lambda row: _as_str(row.get("block_time")), reverse=True) + + +def _new_rows_since_last_seen(rows: list[dict[str, Any]], last_key: str) -> list[dict[str, Any]]: + if not last_key: + return rows + + new_rows: list[dict[str, Any]] = [] + for row in rows: + if _row_key(row) == last_key: + break + new_rows.append(row) + return new_rows + + +def main() -> None: + api_key = os.getenv("DUNE_API_KEY") + query_id = Config.get_env_int("DUNE_LARGE_TRANSFERS_QUERY_ID", 0) + threshold = Config.get_env_float("DUNE_LARGE_TRANSFER_THRESHOLD", DEFAULT_LARGE_TRANSFER_THRESHOLD) + + if not api_key: + logger.warning("DUNE_API_KEY is not set; skipping Dune large transfer monitor") + return + if query_id <= 0: + logger.warning("DUNE_LARGE_TRANSFERS_QUERY_ID is not set; skipping Dune large transfer monitor") + return + + try: + dune = DuneClient(api_key) + result = dune.run_query(QueryBase(query_id=query_id, name="stables_large_transfers"), ping_frequency=2) + rows = list(result.result.rows) if result and result.result and result.result.rows else [] + except Exception as exc: + logger.error("Failed to fetch Dune large-transfer query result: %s", exc) + send_alert( + Alert( + AlertSeverity.MEDIUM, + f"Dune large-transfer monitor failed while querying Dune: {exc}", + PROTOCOL, + ), + plain_text=True, + ) + return + + if not rows: + logger.info("No large transfers returned by Dune query_id=%s", query_id) + return + + alert_rows = [ + row + for row in _sort_rows_newest_first(rows) + if _is_large_transfer(row, threshold) and _route_for_row(row) is not None + ] + if not alert_rows: + logger.info("No routed rows matched large-transfer threshold >= %s", threshold) + return + + last_key = _as_str(get_last_value_for_key_from_file(cache_filename, CACHE_KEY_LAST_TX)) + new_alert_rows = _new_rows_since_last_seen(alert_rows, last_key) + if not new_alert_rows: + logger.info("No new large-transfer rows since last run") + return + + grouped = _group_rows_by_protocol(new_alert_rows) + total_rows = len(new_alert_rows) + for protocol, protocol_rows in grouped.items(): + route = _route_for_row(protocol_rows[0]) + if route is None: + continue + first_symbol, _ = route + lines = _build_protocol_lines(protocol_rows, query_id) + message = ( + f"*Dune Large Transfer Alert ({first_symbol}/{protocol})*\n\n" + f"Query ID: {query_id}\n" + f"Matched rows: {total_rows}\n" + f"Included in this alert: {min(len(protocol_rows), MAX_ROWS_PER_PROTOCOL_ALERT)}\n\n" + "\n".join(lines) + ) + send_alert(Alert(AlertSeverity.HIGH, message, protocol), plain_text=True) + + write_last_value_to_file(cache_filename, CACHE_KEY_LAST_TX, _row_key(alert_rows[0])) + + +if __name__ == "__main__": + main() diff --git a/tests/test_dune_large_transfers.py b/tests/test_dune_large_transfers.py new file mode 100644 index 00000000..6683d015 --- /dev/null +++ b/tests/test_dune_large_transfers.py @@ -0,0 +1,82 @@ +"""Tests for the Dune-backed large transfer monitor.""" + +from stables import dune_large_transfers as monitor + + +def _row(**overrides): + row = { + "block_time": "2026-05-12 14:30:00.000 UTC", + "blockchain": "ethereum", + "tx_hash": "0xABC", + "contract_address": "0xCCCC62962d17b8914c62D74FfB843d73B2a3cCCc", + "symbol": "cUSD", + "amount": "5000000", + "amount_usd": "5000000", + } + row.update(overrides) + return row + + +def test_row_key_excludes_colon_timestamp_from_cache_value(): + key = monitor._row_key(_row(block_time="2026-05-12 14:30:00.000 UTC", log_index=None)) + + assert ":" not in key + assert key == "0xabc|0xcccc62962d17b8914c62d74ffb843d73b2a3cccc" + + +def test_row_key_uses_log_index_when_present(): + key = monitor._row_key(_row(log_index=17)) + + assert key.endswith("|17") + + +def test_route_for_row_returns_protocol_for_known_token(): + assert monitor._route_for_row(_row()) == ("cUSD", "cap") + + +def test_route_for_row_skips_unknown_token_instead_of_stables_fallback(): + row = _row(contract_address="0x0000000000000000000000000000000000000000") + + assert monitor._route_for_row(row) is None + + +def test_route_for_row_skips_usdai(): + row = _row( + blockchain="arbitrum", + contract_address="0x0a1a1a107e45b7ced86833863f482bc5f4ed82ef", + symbol="USDai", + ) + + assert monitor._route_for_row(row) is None + + +def test_is_large_transfer_requires_positive_usd_amount(): + row = _row(amount="5000000", amount_usd="0") + + assert monitor._is_large_transfer(row, 5_000_000) is False + + +def test_new_rows_since_last_seen_only_returns_new_prefix(): + newest = _row(tx_hash="0xnew", block_time="2026-05-12 15:00:00.000 UTC") + previous = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC") + older = _row(tx_hash="0xolder", block_time="2026-05-12 13:00:00.000 UTC") + + rows = [newest, previous, older] + + assert monitor._new_rows_since_last_seen(rows, monitor._row_key(previous)) == [newest] + + +def test_sort_rows_newest_first_defends_dedup_order(): + newest = _row(tx_hash="0xnew", block_time="2026-05-12 15:00:00.000 UTC") + older = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC") + + assert monitor._sort_rows_newest_first([older, newest]) == [newest, older] + + +def test_build_protocol_lines_appends_truncation_notice(): + rows = [_row(tx_hash=f"0x{i}") for i in range(monitor.MAX_ROWS_PER_PROTOCOL_ALERT + 2)] + + lines = monitor._build_protocol_lines(rows, query_id=1234567) + + assert len(lines) == monitor.MAX_ROWS_PER_PROTOCOL_ALERT + 1 + assert lines[-1] == "- +2 more not shown -- see Dune query 1234567 directly"