From eca4e0788bfdc94553d5249804bb8ced4f8b22c6 Mon Sep 17 00:00:00 2001 From: ctmotox2 <118690360+ctmotox2@users.noreply.github.com> Date: Wed, 29 Apr 2026 14:14:15 +0300 Subject: [PATCH 1/3] feat: add hourly dune large-transfer monitor --- .env.example | 6 +- .github/workflows/_run-monitoring.yml | 2 + .github/workflows/hourly.yml | 1 + stables/dune_large_transfers.py | 174 ++++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 stables/dune_large_transfers.py diff --git a/.env.example b/.env.example index 84e5e9d8..e1e405af 100644 --- a/.env.example +++ b/.env.example @@ -52,8 +52,12 @@ LLM_API_KEY=your-llm-api-key # LLM_BASE_URL=https://api.venice.ai/api/v1 # auto-set for known providers # LLM_MODEL=grok-41-fast # auto-set for known providers +# Dune (hourly large-transfer monitor) +DUNE_API_KEY=your-dune-api-key +DUNE_LARGE_TRANSFERS_QUERY_ID=1234567 + # Global settings LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR (DEBUG skips Telegram sends) REQUEST_TIMEOUT=30 RETRY_COUNT=3 -BACKOFF_FACTOR=1.0 \ No newline at end of file +BACKOFF_FACTOR=1.0 diff --git a/.github/workflows/_run-monitoring.yml b/.github/workflows/_run-monitoring.yml index 40fbcd68..f4cf1b61 100644 --- a/.github/workflows/_run-monitoring.yml +++ b/.github/workflows/_run-monitoring.yml @@ -126,6 +126,8 @@ env: LLM_API_KEY: ${{ secrets.LLM_API_KEY }} LLM_MODEL: ${{ secrets.LLM_MODEL }} LLM_PROVIDER: ${{ secrets.LLM_PROVIDER }} + DUNE_API_KEY: ${{ secrets.DUNE_API_KEY }} + DUNE_LARGE_TRANSFERS_QUERY_ID: ${{ vars.DUNE_LARGE_TRANSFERS_QUERY_ID }} jobs: run: diff --git a/.github/workflows/hourly.yml b/.github/workflows/hourly.yml index 32ef47a5..be5bd6d8 100644 --- a/.github/workflows/hourly.yml +++ b/.github/workflows/hourly.yml @@ -30,6 +30,7 @@ jobs: # silo/ur_sniff.py usdai/main.py usdai/large_mints.py + stables/dune_large_transfers.py yearn/alert_large_flows.py maple/main.py timelock/timelock_alerts.py diff --git a/stables/dune_large_transfers.py b/stables/dune_large_transfers.py new file mode 100644 index 00000000..bd9d24f6 --- /dev/null +++ b/stables/dune_large_transfers.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""Hourly large-transfer monitor backed by a Dune query result. + +Expected Dune query output columns: +- block_time +- blockchain +- tx_hash +- from +- to +- contract_address +- symbol +- amount +- amount_usd +""" + +from __future__ import annotations + +import os +from typing import Any + +from dune_client.client import DuneClient +from dune_client.query import QueryBase + +from utils.alert import Alert, AlertSeverity, send_alert +from utils.cache import cache_filename, get_last_value_for_key_from_file, write_last_value_to_file +from utils.config import Config +from utils.logging import get_logger + +LOGGER = get_logger("stables.dune_large_transfers") +PROTOCOL = "stables" + +CACHE_KEY_LAST_TX = "stables_dune_large_transfers_last_tx" +MAX_ROWS_IN_ALERT = 10 +DEFAULT_LARGE_TRANSFER_THRESHOLD = 5_000_000.0 + +# Route each token to its owning protocol channel. +TOKEN_ROUTE: dict[tuple[str, str], tuple[str, str]] = { + ("ethereum", "0xcccc62962d17b8914c62d74ffb843d73b2a3cccc"): ("cUSD", "cap"), + ("ethereum", "0x48f9e38f3070ad8945dfeae3fa70987722e3d89c"): ("iUSD", "infinifi"), + ("arbitrum", "0x0a1a1a107e45b7ced86833863f482bc5f4ed82ef"): ("USDai", "usdai"), +} + +CHAIN_TX_EXPLORER: dict[str, str] = { + "ethereum": "https://etherscan.io/tx/", + "arbitrum": "https://arbiscan.io/tx/", + "optimism": "https://optimistic.etherscan.io/tx/", + "base": "https://basescan.org/tx/", + "polygon": "https://polygonscan.com/tx/", +} + + +def _as_str(value: Any) -> str: + if value is None: + return "" + return str(value) + + +def _tx_link(blockchain: str, tx_hash: str) -> str: + prefix = CHAIN_TX_EXPLORER.get(blockchain.lower()) + if not prefix: + return tx_hash + return f"{prefix}{tx_hash}" + + +def _row_key(row: dict[str, Any]) -> str: + tx_hash = _as_str(row.get("tx_hash")).lower() + block_time = _as_str(row.get("block_time")) + contract = _as_str(row.get("contract_address")).lower() + return f"{tx_hash}|{block_time}|{contract}" + + +def _route_for_row(row: dict[str, Any]) -> tuple[str, str]: + chain = _as_str(row.get("blockchain")).lower() + addr = _as_str(row.get("contract_address")).lower() + if (chain, addr) in TOKEN_ROUTE: + return TOKEN_ROUTE[(chain, addr)] + + symbol = _as_str(row.get("symbol")) or "unknown" + return (symbol, PROTOCOL) + + +def _build_row_line(row: dict[str, Any]) -> str: + chain = _as_str(row.get("blockchain")) + symbol = _as_str(row.get("symbol")) or "unknown" + amount = row.get("amount") + amount_usd = row.get("amount_usd") + tx_hash = _as_str(row.get("tx_hash")) + link = _tx_link(chain, tx_hash) + return f"- {symbol} on {chain}: amount={amount}, amount_usd={amount_usd}, tx={link}" + + +def _group_rows_by_protocol(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: + grouped: dict[str, list[dict[str, Any]]] = {} + for row in rows: + _, protocol = _route_for_row(row) + grouped.setdefault(protocol, []).append(row) + return grouped + + +def _to_float(value: Any) -> float: + try: + return float(value) + except (TypeError, ValueError): + return 0.0 + + +def _is_large_transfer(row: dict[str, Any], threshold: float) -> bool: + amount_usd = _to_float(row.get("amount_usd")) + amount = _to_float(row.get("amount")) + metric = amount_usd if amount_usd > 0 else amount + return metric >= threshold + + +def main() -> None: + api_key = os.getenv("DUNE_API_KEY") + query_id = Config.get_env_int("DUNE_LARGE_TRANSFERS_QUERY_ID", 0) + threshold = Config.get_env_float("DUNE_LARGE_TRANSFER_THRESHOLD", DEFAULT_LARGE_TRANSFER_THRESHOLD) + + if not api_key: + LOGGER.warning("DUNE_API_KEY is not set; skipping Dune large transfer monitor") + return + if query_id <= 0: + LOGGER.warning("DUNE_LARGE_TRANSFERS_QUERY_ID is not set; skipping Dune large transfer monitor") + return + + try: + dune = DuneClient(api_key) + result = dune.run_query(QueryBase(query_id=query_id, name="stables_large_transfers"), ping_frequency=2) + rows = list(result.result.rows) if result and result.result and result.result.rows else [] + except Exception as exc: + LOGGER.error("Failed to fetch Dune large-transfer query result: %s", exc) + send_alert( + Alert( + AlertSeverity.MEDIUM, + f"Dune large-transfer monitor failed while querying Dune: {exc}", + PROTOCOL, + ), + plain_text=True, + ) + return + + if not rows: + LOGGER.info("No large transfers returned by Dune query_id=%s", query_id) + return + + alert_rows = [row for row in rows if _is_large_transfer(row, threshold)] + if not alert_rows: + LOGGER.info("No rows matched large-transfer threshold >= %s", threshold) + return + + newest_key = _row_key(alert_rows[0]) + last_key = _as_str(get_last_value_for_key_from_file(cache_filename, CACHE_KEY_LAST_TX)) + if newest_key == last_key: + LOGGER.info("No new large-transfer rows since last run (top row unchanged)") + return + + grouped = _group_rows_by_protocol(alert_rows[:MAX_ROWS_IN_ALERT]) + total_rows = len(alert_rows) + for protocol, protocol_rows in grouped.items(): + first_symbol, _ = _route_for_row(protocol_rows[0]) + lines = [_build_row_line(row) for row in protocol_rows] + message = ( + f"*Dune Large Transfer Alert ({first_symbol}/{protocol})*\n\n" + f"Query ID: {query_id}\n" + f"Matched rows: {total_rows}\n" + f"Included in this alert: {len(protocol_rows)}\n\n" + "\n".join(lines) + ) + send_alert(Alert(AlertSeverity.HIGH, message, protocol), plain_text=True) + + write_last_value_to_file(cache_filename, CACHE_KEY_LAST_TX, newest_key) + + +if __name__ == "__main__": + main() From 34eb9f9cf14ebdc5325e06c4dba225a6d9db868f Mon Sep 17 00:00:00 2001 From: ctmotox2 <118690360+ctmotox2@users.noreply.github.com> Date: Wed, 13 May 2026 14:52:06 +0300 Subject: [PATCH 2/3] fix dune large transfer dedup --- .env.example | 1 + stables/dune_large_transfers.py | 85 ++++++++++++++++++++---------- tests/test_dune_large_transfers.py | 63 ++++++++++++++++++++++ 3 files changed, 120 insertions(+), 29 deletions(-) create mode 100644 tests/test_dune_large_transfers.py diff --git a/.env.example b/.env.example index e1e405af..924dd6c9 100644 --- a/.env.example +++ b/.env.example @@ -55,6 +55,7 @@ LLM_API_KEY=your-llm-api-key # Dune (hourly large-transfer monitor) DUNE_API_KEY=your-dune-api-key DUNE_LARGE_TRANSFERS_QUERY_ID=1234567 +# DUNE_LARGE_TRANSFER_THRESHOLD=5000000 # Global settings LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR (DEBUG skips Telegram sends) diff --git a/stables/dune_large_transfers.py b/stables/dune_large_transfers.py index bd9d24f6..b86872f2 100644 --- a/stables/dune_large_transfers.py +++ b/stables/dune_large_transfers.py @@ -11,6 +11,9 @@ - symbol - amount - amount_usd + +Rows are sorted defensively by ``block_time`` descending before deduping and +alerting. """ from __future__ import annotations @@ -26,11 +29,11 @@ from utils.config import Config from utils.logging import get_logger -LOGGER = get_logger("stables.dune_large_transfers") +logger = get_logger("stables.dune_large_transfers") PROTOCOL = "stables" CACHE_KEY_LAST_TX = "stables_dune_large_transfers_last_tx" -MAX_ROWS_IN_ALERT = 10 +MAX_ROWS_PER_PROTOCOL_ALERT = 10 DEFAULT_LARGE_TRANSFER_THRESHOLD = 5_000_000.0 # Route each token to its owning protocol channel. @@ -64,19 +67,18 @@ def _tx_link(blockchain: str, tx_hash: str) -> str: def _row_key(row: dict[str, Any]) -> str: tx_hash = _as_str(row.get("tx_hash")).lower() - block_time = _as_str(row.get("block_time")) contract = _as_str(row.get("contract_address")).lower() - return f"{tx_hash}|{block_time}|{contract}" + log_index = _as_str(row.get("log_index")) + parts = [tx_hash, contract] + if log_index: + parts.append(log_index) + return "|".join(parts) -def _route_for_row(row: dict[str, Any]) -> tuple[str, str]: +def _route_for_row(row: dict[str, Any]) -> tuple[str, str] | None: chain = _as_str(row.get("blockchain")).lower() addr = _as_str(row.get("contract_address")).lower() - if (chain, addr) in TOKEN_ROUTE: - return TOKEN_ROUTE[(chain, addr)] - - symbol = _as_str(row.get("symbol")) or "unknown" - return (symbol, PROTOCOL) + return TOKEN_ROUTE.get((chain, addr)) def _build_row_line(row: dict[str, Any]) -> str: @@ -92,7 +94,10 @@ def _build_row_line(row: dict[str, Any]) -> str: def _group_rows_by_protocol(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: grouped: dict[str, list[dict[str, Any]]] = {} for row in rows: - _, protocol = _route_for_row(row) + route = _route_for_row(row) + if route is None: + continue + _, protocol = route grouped.setdefault(protocol, []).append(row) return grouped @@ -106,9 +111,23 @@ def _to_float(value: Any) -> float: def _is_large_transfer(row: dict[str, Any], threshold: float) -> bool: amount_usd = _to_float(row.get("amount_usd")) - amount = _to_float(row.get("amount")) - metric = amount_usd if amount_usd > 0 else amount - return metric >= threshold + return amount_usd >= threshold + + +def _sort_rows_newest_first(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + return sorted(rows, key=lambda row: _as_str(row.get("block_time")), reverse=True) + + +def _new_rows_since_last_seen(rows: list[dict[str, Any]], last_key: str) -> list[dict[str, Any]]: + if not last_key: + return rows + + new_rows: list[dict[str, Any]] = [] + for row in rows: + if _row_key(row) == last_key: + break + new_rows.append(row) + return new_rows def main() -> None: @@ -117,10 +136,10 @@ def main() -> None: threshold = Config.get_env_float("DUNE_LARGE_TRANSFER_THRESHOLD", DEFAULT_LARGE_TRANSFER_THRESHOLD) if not api_key: - LOGGER.warning("DUNE_API_KEY is not set; skipping Dune large transfer monitor") + logger.warning("DUNE_API_KEY is not set; skipping Dune large transfer monitor") return if query_id <= 0: - LOGGER.warning("DUNE_LARGE_TRANSFERS_QUERY_ID is not set; skipping Dune large transfer monitor") + logger.warning("DUNE_LARGE_TRANSFERS_QUERY_ID is not set; skipping Dune large transfer monitor") return try: @@ -128,7 +147,7 @@ def main() -> None: result = dune.run_query(QueryBase(query_id=query_id, name="stables_large_transfers"), ping_frequency=2) rows = list(result.result.rows) if result and result.result and result.result.rows else [] except Exception as exc: - LOGGER.error("Failed to fetch Dune large-transfer query result: %s", exc) + logger.error("Failed to fetch Dune large-transfer query result: %s", exc) send_alert( Alert( AlertSeverity.MEDIUM, @@ -140,34 +159,42 @@ def main() -> None: return if not rows: - LOGGER.info("No large transfers returned by Dune query_id=%s", query_id) + logger.info("No large transfers returned by Dune query_id=%s", query_id) return - alert_rows = [row for row in rows if _is_large_transfer(row, threshold)] + alert_rows = [ + row + for row in _sort_rows_newest_first(rows) + if _is_large_transfer(row, threshold) and _route_for_row(row) is not None + ] if not alert_rows: - LOGGER.info("No rows matched large-transfer threshold >= %s", threshold) + logger.info("No routed rows matched large-transfer threshold >= %s", threshold) return - newest_key = _row_key(alert_rows[0]) last_key = _as_str(get_last_value_for_key_from_file(cache_filename, CACHE_KEY_LAST_TX)) - if newest_key == last_key: - LOGGER.info("No new large-transfer rows since last run (top row unchanged)") + new_alert_rows = _new_rows_since_last_seen(alert_rows, last_key) + if not new_alert_rows: + logger.info("No new large-transfer rows since last run") return - grouped = _group_rows_by_protocol(alert_rows[:MAX_ROWS_IN_ALERT]) - total_rows = len(alert_rows) + grouped = _group_rows_by_protocol(new_alert_rows) + total_rows = len(new_alert_rows) for protocol, protocol_rows in grouped.items(): - first_symbol, _ = _route_for_row(protocol_rows[0]) - lines = [_build_row_line(row) for row in protocol_rows] + route = _route_for_row(protocol_rows[0]) + if route is None: + continue + first_symbol, _ = route + included_rows = protocol_rows[:MAX_ROWS_PER_PROTOCOL_ALERT] + lines = [_build_row_line(row) for row in included_rows] message = ( f"*Dune Large Transfer Alert ({first_symbol}/{protocol})*\n\n" f"Query ID: {query_id}\n" f"Matched rows: {total_rows}\n" - f"Included in this alert: {len(protocol_rows)}\n\n" + "\n".join(lines) + f"Included in this alert: {len(included_rows)}\n\n" + "\n".join(lines) ) send_alert(Alert(AlertSeverity.HIGH, message, protocol), plain_text=True) - write_last_value_to_file(cache_filename, CACHE_KEY_LAST_TX, newest_key) + write_last_value_to_file(cache_filename, CACHE_KEY_LAST_TX, _row_key(alert_rows[0])) if __name__ == "__main__": diff --git a/tests/test_dune_large_transfers.py b/tests/test_dune_large_transfers.py new file mode 100644 index 00000000..818365cd --- /dev/null +++ b/tests/test_dune_large_transfers.py @@ -0,0 +1,63 @@ +"""Tests for the Dune-backed large transfer monitor.""" + +from stables import dune_large_transfers as monitor + + +def _row(**overrides): + row = { + "block_time": "2026-05-12 14:30:00.000 UTC", + "blockchain": "ethereum", + "tx_hash": "0xABC", + "contract_address": "0xCCCC62962d17b8914c62D74FfB843d73B2a3cCCc", + "symbol": "cUSD", + "amount": "5000000", + "amount_usd": "5000000", + } + row.update(overrides) + return row + + +def test_row_key_excludes_colon_timestamp_from_cache_value(): + key = monitor._row_key(_row(block_time="2026-05-12 14:30:00.000 UTC")) + + assert ":" not in key + assert key == "0xabc|0xcccc62962d17b8914c62d74ffb843d73b2a3cccc" + + +def test_row_key_uses_log_index_when_present(): + key = monitor._row_key(_row(log_index=17)) + + assert key.endswith("|17") + + +def test_route_for_row_returns_protocol_for_known_token(): + assert monitor._route_for_row(_row()) == ("cUSD", "cap") + + +def test_route_for_row_skips_unknown_token_instead_of_stables_fallback(): + row = _row(contract_address="0x0000000000000000000000000000000000000000") + + assert monitor._route_for_row(row) is None + + +def test_is_large_transfer_requires_positive_usd_amount(): + row = _row(amount="5000000", amount_usd="0") + + assert monitor._is_large_transfer(row, 5_000_000) is False + + +def test_new_rows_since_last_seen_only_returns_new_prefix(): + newest = _row(tx_hash="0xnew", block_time="2026-05-12 15:00:00.000 UTC") + previous = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC") + older = _row(tx_hash="0xolder", block_time="2026-05-12 13:00:00.000 UTC") + + rows = [newest, previous, older] + + assert monitor._new_rows_since_last_seen(rows, monitor._row_key(previous)) == [newest] + + +def test_sort_rows_newest_first_defends_dedup_order(): + newest = _row(tx_hash="0xnew", block_time="2026-05-12 15:00:00.000 UTC") + older = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC") + + assert monitor._sort_rows_newest_first([older, newest]) == [newest, older] From cac44ac6d0c562193e90f9264ac991f7350af7f8 Mon Sep 17 00:00:00 2001 From: ctmotox2 <118690360+ctmotox2@users.noreply.github.com> Date: Thu, 21 May 2026 17:15:59 +0200 Subject: [PATCH 3/3] fix dune transfer alert follow-ups --- stables/dune_large_transfers.py | 17 +++++++++++++---- tests/test_dune_large_transfers.py | 21 ++++++++++++++++++++- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/stables/dune_large_transfers.py b/stables/dune_large_transfers.py index b86872f2..402df6d1 100644 --- a/stables/dune_large_transfers.py +++ b/stables/dune_large_transfers.py @@ -40,7 +40,6 @@ TOKEN_ROUTE: dict[tuple[str, str], tuple[str, str]] = { ("ethereum", "0xcccc62962d17b8914c62d74ffb843d73b2a3cccc"): ("cUSD", "cap"), ("ethereum", "0x48f9e38f3070ad8945dfeae3fa70987722e3d89c"): ("iUSD", "infinifi"), - ("arbitrum", "0x0a1a1a107e45b7ced86833863f482bc5f4ed82ef"): ("USDai", "usdai"), } CHAIN_TX_EXPLORER: dict[str, str] = { @@ -91,6 +90,17 @@ def _build_row_line(row: dict[str, Any]) -> str: return f"- {symbol} on {chain}: amount={amount}, amount_usd={amount_usd}, tx={link}" +def _build_protocol_lines(protocol_rows: list[dict[str, Any]], query_id: int) -> list[str]: + included_rows = protocol_rows[:MAX_ROWS_PER_PROTOCOL_ALERT] + lines = [_build_row_line(row) for row in included_rows] + + omitted_count = len(protocol_rows) - len(included_rows) + if omitted_count > 0: + lines.append(f"- +{omitted_count} more not shown -- see Dune query {query_id} directly") + + return lines + + def _group_rows_by_protocol(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: grouped: dict[str, list[dict[str, Any]]] = {} for row in rows: @@ -184,13 +194,12 @@ def main() -> None: if route is None: continue first_symbol, _ = route - included_rows = protocol_rows[:MAX_ROWS_PER_PROTOCOL_ALERT] - lines = [_build_row_line(row) for row in included_rows] + lines = _build_protocol_lines(protocol_rows, query_id) message = ( f"*Dune Large Transfer Alert ({first_symbol}/{protocol})*\n\n" f"Query ID: {query_id}\n" f"Matched rows: {total_rows}\n" - f"Included in this alert: {len(included_rows)}\n\n" + "\n".join(lines) + f"Included in this alert: {min(len(protocol_rows), MAX_ROWS_PER_PROTOCOL_ALERT)}\n\n" + "\n".join(lines) ) send_alert(Alert(AlertSeverity.HIGH, message, protocol), plain_text=True) diff --git a/tests/test_dune_large_transfers.py b/tests/test_dune_large_transfers.py index 818365cd..6683d015 100644 --- a/tests/test_dune_large_transfers.py +++ b/tests/test_dune_large_transfers.py @@ -18,7 +18,7 @@ def _row(**overrides): def test_row_key_excludes_colon_timestamp_from_cache_value(): - key = monitor._row_key(_row(block_time="2026-05-12 14:30:00.000 UTC")) + key = monitor._row_key(_row(block_time="2026-05-12 14:30:00.000 UTC", log_index=None)) assert ":" not in key assert key == "0xabc|0xcccc62962d17b8914c62d74ffb843d73b2a3cccc" @@ -40,6 +40,16 @@ def test_route_for_row_skips_unknown_token_instead_of_stables_fallback(): assert monitor._route_for_row(row) is None +def test_route_for_row_skips_usdai(): + row = _row( + blockchain="arbitrum", + contract_address="0x0a1a1a107e45b7ced86833863f482bc5f4ed82ef", + symbol="USDai", + ) + + assert monitor._route_for_row(row) is None + + def test_is_large_transfer_requires_positive_usd_amount(): row = _row(amount="5000000", amount_usd="0") @@ -61,3 +71,12 @@ def test_sort_rows_newest_first_defends_dedup_order(): older = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC") assert monitor._sort_rows_newest_first([older, newest]) == [newest, older] + + +def test_build_protocol_lines_appends_truncation_notice(): + rows = [_row(tx_hash=f"0x{i}") for i in range(monitor.MAX_ROWS_PER_PROTOCOL_ALERT + 2)] + + lines = monitor._build_protocol_lines(rows, query_id=1234567) + + assert len(lines) == monitor.MAX_ROWS_PER_PROTOCOL_ALERT + 1 + assert lines[-1] == "- +2 more not shown -- see Dune query 1234567 directly"