From aa5cc13fc213449e7e749df616bc4b286c5751e6 Mon Sep 17 00:00:00 2001 From: ingpedrofernandez Date: Fri, 27 Mar 2026 03:14:00 +0100 Subject: [PATCH] Add market data planning docs Documents the market data interface design, GBM simulator approach, and Massive API reference for the FinAlly trading workstation. Co-Authored-By: Claude Sonnet 4.6 --- planning/MARKET_INTERFACE.md | 443 ++++++++++++++++++++++++++++++++++ planning/MARKET_SIMULATOR.md | 453 +++++++++++++++++++++++++++++++++++ planning/MASSIVE_API.md | 402 +++++++++++++++++++++++++++++++ 3 files changed, 1298 insertions(+) create mode 100644 planning/MARKET_INTERFACE.md create mode 100644 planning/MARKET_SIMULATOR.md create mode 100644 planning/MASSIVE_API.md diff --git a/planning/MARKET_INTERFACE.md b/planning/MARKET_INTERFACE.md new file mode 100644 index 00000000..13bbb7eb --- /dev/null +++ b/planning/MARKET_INTERFACE.md @@ -0,0 +1,443 @@ +# Market Data Interface Design + +Unified Python interface for market data in FinAlly. Two concrete implementations — `SimulatorDataSource` and `MassiveDataSource` — sit behind one abstract base class. All downstream code (SSE streaming, price cache, portfolio valuation, trade execution) is source-agnostic and works identically regardless of which implementation is active. + +## Design Principles + +- **Single abstraction**: one abstract base class, two implementations, one `PriceCache` +- **Push not pull**: data sources write to the cache on their own schedule; consumers read from it +- **Async-first**: implementations are async; the Massive client (synchronous SDK) is wrapped via `asyncio.to_thread` +- **Hot-swappable tickers**: add/remove tickers at runtime without restarting the source +- **Fail-safe**: polling errors are logged and skipped; the loop never crashes + +--- + +## Core Data Model + +```python +# backend/app/market/models.py +from dataclasses import dataclass + +@dataclass +class PriceUpdate: + """A single price update for one ticker.""" + ticker: str + price: float + previous_price: float + timestamp: float # Unix seconds (float) + change: float # price - previous_price + direction: str # "up", "down", or "flat" +``` + +This is the **only** data structure that leaves the market data layer. Everything downstream works with `PriceUpdate` objects. + +--- + +## Abstract Interface + +```python +# backend/app/market/interface.py +from abc import ABC, abstractmethod + +class MarketDataSource(ABC): + """Abstract interface for market data providers.""" + + @abstractmethod + async def start(self, tickers: list[str]) -> None: + """Begin producing price updates. Seeds the cache with initial prices.""" + + @abstractmethod + async def stop(self) -> None: + """Stop producing price updates and release resources.""" + + @abstractmethod + async def add_ticker(self, ticker: str) -> None: + """Add a ticker to the active set. Takes effect on the next poll/step.""" + + @abstractmethod + async def remove_ticker(self, ticker: str) -> None: + """Remove a ticker from the active set and evict it from the cache.""" + + @abstractmethod + def get_tickers(self) -> list[str]: + """Return the current list of active tickers.""" +``` + +Both implementations write to a shared `PriceCache`. The interface does **not** return prices directly — it pushes updates into the cache on its own schedule (500ms for the simulator, configurable poll interval for Massive). + +--- + +## Price Cache + +Thread-safe, in-memory store that data sources write to and all consumers read from. + +```python +# backend/app/market/interface.py (continued) +import time +from threading import Lock + +class PriceCache: + """Thread-safe in-memory cache of the latest price per ticker.""" + + def __init__(self): + self._prices: dict[str, PriceUpdate] = {} + self._lock = Lock() + + def update(self, ticker: str, price: float, timestamp: float | None = None) -> PriceUpdate: + """Update price for a ticker. Computes direction from previous price. Returns the PriceUpdate.""" + with self._lock: + ts = timestamp or time.time() + previous = self._prices.get(ticker) + previous_price = previous.price if previous else price + + if price > previous_price: + direction = "up" + elif price < previous_price: + direction = "down" + else: + direction = "flat" + + update = PriceUpdate( + ticker=ticker, + price=price, + previous_price=previous_price, + timestamp=ts, + change=round(price - previous_price, 4), + direction=direction, + ) + self._prices[ticker] = update + return update + + def get(self, ticker: str) -> PriceUpdate | None: + """Get the latest price for one ticker. Returns None if not yet available.""" + with self._lock: + return self._prices.get(ticker) + + def get_all(self) -> dict[str, PriceUpdate]: + """Return a snapshot of all current prices.""" + with self._lock: + return dict(self._prices) + + def remove(self, ticker: str) -> None: + """Evict a ticker from the cache (called on watchlist removal).""" + with self._lock: + self._prices.pop(ticker, None) +``` + +--- + +## Factory Function + +Selects the right implementation at startup based on the environment variable. + +```python +# backend/app/market/factory.py +import os + +def create_market_data_source(price_cache: PriceCache) -> MarketDataSource: + """Return a SimulatorDataSource or MassiveDataSource based on MASSIVE_API_KEY.""" + api_key = os.environ.get("MASSIVE_API_KEY", "").strip() + + if api_key: + from .massive_client import MassiveDataSource + return MassiveDataSource(api_key=api_key, price_cache=price_cache) + else: + from .simulator import SimulatorDataSource + return SimulatorDataSource(price_cache=price_cache) +``` + +--- + +## MassiveDataSource Implementation + +Polls the Massive snapshot endpoint on a timer and writes results to the cache. The Massive SDK is synchronous, so it is called via `asyncio.to_thread` to avoid blocking the event loop. + +```python +# backend/app/market/massive_client.py +import asyncio +import logging +from massive import RESTClient +from massive.rest.models import SnapshotMarketType +from .interface import MarketDataSource, PriceCache + +logger = logging.getLogger(__name__) + +class MassiveDataSource(MarketDataSource): + """Polls the Massive REST API and writes prices to PriceCache.""" + + def __init__( + self, + api_key: str, + price_cache: PriceCache, + poll_interval: float = 15.0, # seconds; 15s = 4 req/min (safe for free tier) + ): + self._client = RESTClient(api_key=api_key) + self._cache = price_cache + self._interval = poll_interval + self._tickers: list[str] = [] + self._task: asyncio.Task | None = None + + async def start(self, tickers: list[str]) -> None: + self._tickers = list(tickers) + # Poll immediately so the cache is populated before the first SSE client connects + await self._poll_once() + self._task = asyncio.create_task(self._poll_loop()) + + async def stop(self) -> None: + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def add_ticker(self, ticker: str) -> None: + if ticker not in self._tickers: + self._tickers.append(ticker) + + async def remove_ticker(self, ticker: str) -> None: + self._tickers = [t for t in self._tickers if t != ticker] + self._cache.remove(ticker) + + def get_tickers(self) -> list[str]: + return list(self._tickers) + + async def _poll_loop(self) -> None: + while True: + await asyncio.sleep(self._interval) + await self._poll_once() + + async def _poll_once(self) -> None: + if not self._tickers: + return + try: + snapshots = await asyncio.to_thread( + self._client.get_snapshot_all, + market_type=SnapshotMarketType.STOCKS, + tickers=list(self._tickers), + ) + for snap in snapshots: + if snap.last_trade and snap.last_trade.price: + self._cache.update( + ticker=snap.ticker, + price=snap.last_trade.price, + timestamp=snap.last_trade.timestamp / 1000, # ms → seconds + ) + except Exception as e: + logger.warning("Massive poll failed: %s", e) + # Continue — polling loop must survive transient errors +``` + +--- + +## SimulatorDataSource Implementation + +Wraps the `GBMSimulator` (see `MARKET_SIMULATOR.md`) in an async loop that ticks every 500ms. + +```python +# backend/app/market/simulator.py (excerpt — full implementation in MARKET_SIMULATOR.md) +import asyncio +from .interface import MarketDataSource, PriceCache +from .gbm import GBMSimulator # See MARKET_SIMULATOR.md + +class SimulatorDataSource(MarketDataSource): + """Generates synthetic GBM price paths and writes them to PriceCache.""" + + def __init__(self, price_cache: PriceCache, update_interval: float = 0.5): + self._cache = price_cache + self._interval = update_interval + self._tickers: list[str] = [] + self._task: asyncio.Task | None = None + self._sim: GBMSimulator | None = None + + async def start(self, tickers: list[str]) -> None: + self._tickers = list(tickers) + self._sim = GBMSimulator(tickers=self._tickers) + # Seed cache with initial prices before first SSE client connects + for ticker, price in self._sim.current_prices().items(): + self._cache.update(ticker=ticker, price=price) + self._task = asyncio.create_task(self._run_loop()) + + async def stop(self) -> None: + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def add_ticker(self, ticker: str) -> None: + if ticker not in self._tickers: + self._tickers.append(ticker) + if self._sim: + self._sim.add_ticker(ticker) + # Immediately seed the cache so the new ticker appears in SSE + price = self._sim.get_price(ticker) + if price: + self._cache.update(ticker=ticker, price=price) + + async def remove_ticker(self, ticker: str) -> None: + self._tickers = [t for t in self._tickers if t != ticker] + if self._sim: + self._sim.remove_ticker(ticker) + self._cache.remove(ticker) + + def get_tickers(self) -> list[str]: + return list(self._tickers) + + async def _run_loop(self) -> None: + while True: + prices = self._sim.step() # dict[str, float] + for ticker, price in prices.items(): + self._cache.update(ticker=ticker, price=price) + await asyncio.sleep(self._interval) +``` + +--- + +## Integration with FastAPI + +### App Startup / Shutdown + +```python +# backend/app/main.py +from contextlib import asynccontextmanager +from fastapi import FastAPI +from .market.interface import PriceCache +from .market.factory import create_market_data_source +from .db import get_watchlist_tickers + +price_cache = PriceCache() +market_source = None + +@asynccontextmanager +async def lifespan(app: FastAPI): + global market_source + initial_tickers = await get_watchlist_tickers(user_id="default") + market_source = create_market_data_source(price_cache) + await market_source.start(initial_tickers) + yield + await market_source.stop() + +app = FastAPI(lifespan=lifespan) +``` + +### Watchlist Endpoints + +```python +# backend/app/routes/watchlist.py +@router.post("/api/watchlist") +async def add_to_watchlist(body: AddTickerRequest): + await db_add_ticker(body.ticker) + await market_source.add_ticker(body.ticker) + return {"ticker": body.ticker} + +@router.delete("/api/watchlist/{ticker}") +async def remove_from_watchlist(ticker: str): + await db_remove_ticker(ticker) + await market_source.remove_ticker(ticker) + return {"ticker": ticker} +``` + +### Trade Execution (reads from cache) + +```python +# backend/app/routes/portfolio.py +@router.post("/api/portfolio/trade") +async def execute_trade(body: TradeRequest): + update = price_cache.get(body.ticker) + if not update: + raise HTTPException(400, "No price available for ticker") + current_price = update.price + # ... validate and execute trade at current_price +``` + +--- + +## Integration with SSE + +The SSE endpoint reads from `PriceCache` every 500ms and streams updates to all connected clients. This is entirely independent of how frequently the cache is updated (which depends on the data source — 500ms for simulator, 15s for Massive). + +```python +# backend/app/routes/stream.py +import json +import asyncio +from fastapi.responses import StreamingResponse + +async def price_event_generator(price_cache: PriceCache): + """Async generator yielding SSE-formatted price updates.""" + while True: + prices = price_cache.get_all() + if prices: + payload = { + ticker: { + "ticker": p.ticker, + "price": p.price, + "previous_price": p.previous_price, + "change": p.change, + "direction": p.direction, + "timestamp": p.timestamp, + } + for ticker, p in prices.items() + } + yield f"data: {json.dumps(payload)}\n\n" + await asyncio.sleep(0.5) + +@router.get("/api/stream/prices") +async def stream_prices(): + return StreamingResponse( + price_event_generator(price_cache), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", # Disable nginx buffering + }, + ) +``` + +--- + +## File Structure + +``` +backend/ + app/ + market/ + __init__.py # Exports: PriceCache, create_market_data_source + models.py # PriceUpdate dataclass + interface.py # MarketDataSource ABC + PriceCache + factory.py # create_market_data_source() + massive_client.py # MassiveDataSource + simulator.py # SimulatorDataSource (wraps GBMSimulator) + gbm.py # GBMSimulator class (pure math, no async) + seed_prices.py # SEED_PRICES, TICKER_PARAMS, DEFAULT_PARAMS constants +``` + +--- + +## Application Lifecycle + +| Event | Action | +|-------|--------| +| App startup | Create `PriceCache`; call `create_market_data_source(cache)`; call `await source.start(watchlist_tickers)` | +| SSE client connects | Immediately gets current cache snapshot; then receives updates every 500ms | +| User adds ticker | Call `await source.add_ticker(ticker)`; cache seeded immediately | +| User removes ticker | Call `await source.remove_ticker(ticker)`; evicted from cache | +| Trade execution | Read `price_cache.get(ticker).price` — always current | +| Portfolio snapshot (background task) | Read `price_cache.get_all()` to value all positions | +| App shutdown | Call `await source.stop()` to cancel background task | + +--- + +## Behavior Differences Between Sources + +| Aspect | Simulator | Massive API | +|--------|-----------|-------------| +| Update frequency | Every 500ms | Every 15s (free tier) / 2–5s (paid) | +| Price history | Starts fresh each session | Real market history available | +| After-hours | Always "live" | May show stale prices | +| New tickers | Random seed price ($50–$300) | Real last-traded price | +| External dependency | None | Internet + API key | +| Rate limit | N/A | 5 req/min (free), unlimited (paid) | + +The SSE stream always sends at 500ms intervals regardless of source — clients don't experience the 15s polling gap because the cache holds the last-known price and it's re-emitted on every SSE tick. diff --git a/planning/MARKET_SIMULATOR.md b/planning/MARKET_SIMULATOR.md new file mode 100644 index 00000000..3eaf65df --- /dev/null +++ b/planning/MARKET_SIMULATOR.md @@ -0,0 +1,453 @@ +# Market Simulator Design + +Approach and full code structure for the `GBMSimulator` — the built-in stock price simulator used when no `MASSIVE_API_KEY` is configured. + +## Overview + +The simulator uses **Geometric Brownian Motion (GBM)** to generate realistic stock price paths. GBM is the standard model underlying Black-Scholes option pricing: prices evolve multiplicatively with random noise, can never go negative, and produce the lognormal distribution observed in real equity markets. + +Key properties: +- Updates at **~500ms intervals** — prices feel alive on the dashboard +- **Correlated moves** — tech stocks tend to move together (Cholesky decomposition) +- **Random events** — occasional 2–5% shocks add drama and visual interest +- **Realistic volatility** — each ticker has tuned `sigma` reflecting real-world behavior +- **Hot-swappable tickers** — add/remove tickers at runtime with correlation matrix rebuilt automatically + +--- + +## GBM Mathematics + +At each discrete time step, a stock price evolves as: + +``` +S(t + dt) = S(t) × exp((μ − σ²/2) × dt + σ × √dt × Z) +``` + +Where: +- `S(t)` = current price +- `μ` (mu) = annualized drift (expected return), e.g. `0.05` = 5%/year +- `σ` (sigma) = annualized volatility, e.g. `0.20` = 20%/year +- `dt` = time step as a fraction of a trading year +- `Z ~ N(0, 1)` = standard normal random variable + +**Why `exp()`?** GBM is multiplicative, so prices can never hit zero or go negative — a fundamental requirement for realistic stock simulation. + +**Calculating `dt` for 500ms updates:** + +``` +dt = 0.5 seconds / (252 trading days × 6.5 hours/day × 3600 seconds/hour) + = 0.5 / 5,896,800 + ≈ 8.48e-8 +``` + +This tiny `dt` produces realistic sub-cent moves per tick that accumulate naturally over a simulated trading session. + +--- + +## Correlated Moves via Cholesky Decomposition + +Real stocks don't move independently. Tech stocks move together; banks correlate with each other; TSLA is its own animal. We model this using a correlation matrix and Cholesky decomposition. + +**Algorithm:** + +1. Define an `n × n` correlation matrix `C` for the active tickers +2. Compute the lower-triangular Cholesky factor `L` such that `C = L × Lᵀ` +3. At each step, draw `n` independent standard normals: `Z_ind ~ N(0, 1)ⁿ` +4. Produce correlated normals: `Z_corr = L × Z_ind` +5. Use `Z_corr[i]` as the `Z` in the GBM formula for ticker `i` + +**Default correlation groups:** + +| Group | Tickers | Intra-group ρ | +|-------|---------|--------------| +| Tech | AAPL, GOOGL, MSFT, AMZN, META, NVDA, NFLX | 0.60 | +| Finance | JPM, V | 0.50 | +| TSLA | TSLA | 0.30 (cross-group) | +| Cross-group | any tech ↔ finance | 0.30 | +| Unknown tickers | all others | 0.30 | + +The correlation matrix must be positive semi-definite for Cholesky to work. Our symmetric, diagonally-dominant construction guarantees this. + +--- + +## Random Events + +Every 500ms step, each ticker independently has a small chance (`0.001` = 0.1%) of a sudden price shock — a rapid 2–5% move in either direction. + +```python +if random.random() < event_probability: + shock = random.uniform(0.02, 0.05) * random.choice([-1, 1]) + price *= (1 + shock) +``` + +With 10 tickers at 0.1% per step, expect roughly one event somewhere in the watchlist every **~50 seconds**. This is frequent enough to keep the dashboard visually interesting without being cartoonish. + +--- + +## Seed Prices + +Realistic starting prices for the default watchlist tickers (as of early 2026): + +```python +SEED_PRICES: dict[str, float] = { + "AAPL": 190.0, + "GOOGL": 175.0, + "MSFT": 420.0, + "AMZN": 185.0, + "TSLA": 250.0, + "NVDA": 800.0, + "META": 500.0, + "JPM": 195.0, + "V": 280.0, + "NFLX": 600.0, +} + +# Tickers not in SEED_PRICES start at a random price in this range +UNKNOWN_TICKER_PRICE_RANGE = (50.0, 300.0) +``` + +--- + +## Per-Ticker Parameters + +Each default ticker has tuned volatility to reflect real-world behavior: + +```python +TICKER_PARAMS: dict[str, dict] = { + "AAPL": {"sigma": 0.22, "mu": 0.05}, + "GOOGL": {"sigma": 0.25, "mu": 0.05}, + "MSFT": {"sigma": 0.20, "mu": 0.05}, + "AMZN": {"sigma": 0.28, "mu": 0.05}, + "TSLA": {"sigma": 0.50, "mu": 0.03}, # High vol, lower drift — erratic + "NVDA": {"sigma": 0.40, "mu": 0.08}, # High vol, strong upward drift + "META": {"sigma": 0.30, "mu": 0.05}, + "JPM": {"sigma": 0.18, "mu": 0.04}, # Low vol (bank) + "V": {"sigma": 0.17, "mu": 0.04}, # Low vol (payments) + "NFLX": {"sigma": 0.35, "mu": 0.05}, +} + +# Default parameters for any ticker not in the table above +DEFAULT_PARAMS: dict = {"sigma": 0.25, "mu": 0.05} +``` + +**Volatility intuition**: with `sigma=0.50` (TSLA) and correct `dt`, a simulated trading day produces roughly the right intraday range you'd see on a real TSLA chart. + +--- + +## Full Implementation + +### `seed_prices.py` (constants only) + +```python +# backend/app/market/seed_prices.py + +SEED_PRICES: dict[str, float] = { + "AAPL": 190.0, + "GOOGL": 175.0, + "MSFT": 420.0, + "AMZN": 185.0, + "TSLA": 250.0, + "NVDA": 800.0, + "META": 500.0, + "JPM": 195.0, + "V": 280.0, + "NFLX": 600.0, +} + +TICKER_PARAMS: dict[str, dict] = { + "AAPL": {"sigma": 0.22, "mu": 0.05}, + "GOOGL": {"sigma": 0.25, "mu": 0.05}, + "MSFT": {"sigma": 0.20, "mu": 0.05}, + "AMZN": {"sigma": 0.28, "mu": 0.05}, + "TSLA": {"sigma": 0.50, "mu": 0.03}, + "NVDA": {"sigma": 0.40, "mu": 0.08}, + "META": {"sigma": 0.30, "mu": 0.05}, + "JPM": {"sigma": 0.18, "mu": 0.04}, + "V": {"sigma": 0.17, "mu": 0.04}, + "NFLX": {"sigma": 0.35, "mu": 0.05}, +} + +DEFAULT_PARAMS: dict = {"sigma": 0.25, "mu": 0.05} + +UNKNOWN_TICKER_PRICE_RANGE: tuple[float, float] = (50.0, 300.0) + +# Correlation groups (used by GBMSimulator._get_correlation) +TECH_TICKERS: frozenset[str] = frozenset({"AAPL", "GOOGL", "MSFT", "AMZN", "META", "NVDA", "NFLX"}) +FINANCE_TICKERS: frozenset[str] = frozenset({"JPM", "V"}) +CORRELATIONS: dict = { + "intra_tech": 0.60, + "intra_finance": 0.50, + "cross_sector": 0.30, + "tsla": 0.30, # TSLA with everything + "default": 0.30, +} +``` + +--- + +### `gbm.py` (pure math, no async) + +```python +# backend/app/market/gbm.py +import math +import random +import numpy as np +from .seed_prices import ( + SEED_PRICES, TICKER_PARAMS, DEFAULT_PARAMS, + UNKNOWN_TICKER_PRICE_RANGE, TECH_TICKERS, FINANCE_TICKERS, CORRELATIONS, +) + +class GBMSimulator: + """ + Generates correlated Geometric Brownian Motion price paths for multiple tickers. + + This class is pure math with no async or I/O — wrap it in SimulatorDataSource + (see simulator.py) for use with FastAPI. + """ + + # dt = 0.5s / (252 days * 6.5 hrs * 3600 s) — one 500ms tick as a fraction of a trading year + DEFAULT_DT: float = 0.5 / (252 * 6.5 * 3600) + + def __init__( + self, + tickers: list[str], + dt: float = DEFAULT_DT, + event_probability: float = 0.001, + ): + self._dt = dt + self._event_prob = event_probability + self._prices: dict[str, float] = {} + self._params: dict[str, dict] = {} + self._tickers: list[str] = [] + self._cholesky: np.ndarray | None = None + + for ticker in tickers: + self.add_ticker(ticker) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def add_ticker(self, ticker: str) -> None: + """Add a ticker to the simulation. Rebuilds the correlation matrix.""" + if ticker in self._prices: + return + self._tickers.append(ticker) + self._prices[ticker] = SEED_PRICES.get( + ticker, + random.uniform(*UNKNOWN_TICKER_PRICE_RANGE), + ) + self._params[ticker] = TICKER_PARAMS.get(ticker, DEFAULT_PARAMS) + self._rebuild_cholesky() + + def remove_ticker(self, ticker: str) -> None: + """Remove a ticker from the simulation. Rebuilds the correlation matrix.""" + if ticker not in self._prices: + return + self._tickers.remove(ticker) + del self._prices[ticker] + del self._params[ticker] + self._rebuild_cholesky() + + def step(self) -> dict[str, float]: + """ + Advance one time step. Returns a dict of {ticker: new_price}. + Modifies internal prices in place. + """ + n = len(self._tickers) + if n == 0: + return {} + + # Generate correlated standard normal draws + z_ind = np.random.standard_normal(n) + z = self._cholesky @ z_ind if self._cholesky is not None else z_ind + + result: dict[str, float] = {} + for i, ticker in enumerate(self._tickers): + params = self._params[ticker] + mu: float = params["mu"] + sigma: float = params["sigma"] + + # GBM formula: S(t+dt) = S(t) * exp(drift + diffusion) + drift = (mu - 0.5 * sigma ** 2) * self._dt + diffusion = sigma * math.sqrt(self._dt) * float(z[i]) + self._prices[ticker] *= math.exp(drift + diffusion) + + # Random event: occasional sudden 2-5% shock + if random.random() < self._event_prob: + shock = random.uniform(0.02, 0.05) * random.choice([-1, 1]) + self._prices[ticker] *= (1.0 + shock) + + result[ticker] = round(self._prices[ticker], 2) + + return result + + def get_price(self, ticker: str) -> float | None: + """Get the current simulated price for a ticker.""" + return self._prices.get(ticker) + + def current_prices(self) -> dict[str, float]: + """Return a copy of all current simulated prices.""" + return dict(self._prices) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _rebuild_cholesky(self) -> None: + """Recompute the Cholesky factor of the correlation matrix.""" + n = len(self._tickers) + if n <= 1: + self._cholesky = None + return + + # Build symmetric correlation matrix + corr = np.eye(n) + for i in range(n): + for j in range(i + 1, n): + rho = self._get_correlation(self._tickers[i], self._tickers[j]) + corr[i, j] = rho + corr[j, i] = rho + + try: + self._cholesky = np.linalg.cholesky(corr) + except np.linalg.LinAlgError: + # Fallback: identity (no correlation) if matrix is not positive definite + self._cholesky = None + + def _get_correlation(self, t1: str, t2: str) -> float: + """Return the pairwise correlation coefficient for two tickers.""" + # TSLA is a loner — low correlation with everything + if t1 == "TSLA" or t2 == "TSLA": + return CORRELATIONS["tsla"] + + t1_tech = t1 in TECH_TICKERS + t2_tech = t2 in TECH_TICKERS + t1_fin = t1 in FINANCE_TICKERS + t2_fin = t2 in FINANCE_TICKERS + + if t1_tech and t2_tech: + return CORRELATIONS["intra_tech"] + if t1_fin and t2_fin: + return CORRELATIONS["intra_finance"] + if (t1_tech and t2_fin) or (t1_fin and t2_tech): + return CORRELATIONS["cross_sector"] + + return CORRELATIONS["default"] +``` + +--- + +### `simulator.py` (async wrapper) + +```python +# backend/app/market/simulator.py +import asyncio +import logging +from .interface import MarketDataSource, PriceCache +from .gbm import GBMSimulator + +logger = logging.getLogger(__name__) + + +class SimulatorDataSource(MarketDataSource): + """ + Async MarketDataSource that drives a GBMSimulator and writes prices to PriceCache. + Ticks every `update_interval` seconds (default 500ms). + """ + + def __init__(self, price_cache: PriceCache, update_interval: float = 0.5): + self._cache = price_cache + self._interval = update_interval + self._tickers: list[str] = [] + self._task: asyncio.Task | None = None + self._sim: GBMSimulator | None = None + + async def start(self, tickers: list[str]) -> None: + self._tickers = list(tickers) + self._sim = GBMSimulator(tickers=self._tickers) + # Seed the cache immediately so SSE clients have prices on first connect + for ticker, price in self._sim.current_prices().items(): + self._cache.update(ticker=ticker, price=price) + self._task = asyncio.create_task(self._run_loop()) + logger.info("SimulatorDataSource started with %d tickers", len(self._tickers)) + + async def stop(self) -> None: + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("SimulatorDataSource stopped") + + async def add_ticker(self, ticker: str) -> None: + if ticker not in self._tickers: + self._tickers.append(ticker) + if self._sim: + self._sim.add_ticker(ticker) + price = self._sim.get_price(ticker) + if price is not None: + self._cache.update(ticker=ticker, price=price) + + async def remove_ticker(self, ticker: str) -> None: + self._tickers = [t for t in self._tickers if t != ticker] + if self._sim: + self._sim.remove_ticker(ticker) + self._cache.remove(ticker) + + def get_tickers(self) -> list[str]: + return list(self._tickers) + + async def _run_loop(self) -> None: + while True: + prices = self._sim.step() + for ticker, price in prices.items(): + self._cache.update(ticker=ticker, price=price) + await asyncio.sleep(self._interval) +``` + +--- + +## File Structure + +``` +backend/ + app/ + market/ + __init__.py # Exports for convenient imports + models.py # PriceUpdate dataclass + interface.py # MarketDataSource ABC + PriceCache + factory.py # create_market_data_source() + massive_client.py # MassiveDataSource (REST polling) + simulator.py # SimulatorDataSource (async loop) + gbm.py # GBMSimulator (pure math, no async) + seed_prices.py # SEED_PRICES, TICKER_PARAMS, constants +``` + +--- + +## Behavior Notes + +### Price floors and ceilings +Prices can never go negative because GBM uses `exp()`, which is always positive. There is no hard ceiling — with high enough volatility, a stock could theoretically drift to extreme values over a very long session. In practice, with `sigma ≤ 0.50` and a finite session, this is not a problem. + +### New tickers +Unknown tickers (not in `SEED_PRICES`) start at a random price between $50 and $300. This is intentional: it creates interesting variety when the user adds a custom ticker. + +### Cholesky rebuild cost +When a ticker is added or removed, the correlation matrix is rebuilt. This is `O(n²)` but `n` stays small (under 50 tickers in normal use), making it negligible. + +### What `dt` means in practice +With `dt ≈ 8.5e-8` and `sigma=0.20` (MSFT): +- Per-tick volatility = `sigma × sqrt(dt)` = `0.20 × 0.000291` ≈ **0.0058%** per step +- After 1 simulated minute (120 steps at 0.5s): `0.0058% × sqrt(120)` ≈ **0.064%** +- This produces a calm, realistic-looking chart + +With `sigma=0.50` (TSLA), ticks are ~2.5x larger, producing the erratic behavior you'd expect. + +### Random events +The `event_probability=0.001` means each ticker fires an event on average once every 1,000 steps = every **500 seconds**. With 10 tickers, expect a visible price shock somewhere in the watchlist roughly every **50 seconds** — enough to keep the dashboard lively. + +### Correlation validity +The hardcoded correlations (`0.60`, `0.50`, `0.30`) produce a valid positive semi-definite matrix for the default 10-ticker watchlist. For unusual combinations of many unknown tickers, the `_rebuild_cholesky` method catches `LinAlgError` and falls back to uncorrelated (identity) behavior. diff --git a/planning/MASSIVE_API.md b/planning/MASSIVE_API.md new file mode 100644 index 00000000..3c06c09a --- /dev/null +++ b/planning/MASSIVE_API.md @@ -0,0 +1,402 @@ +# Massive API Reference (formerly Polygon.io) + +Reference documentation for the Massive (formerly Polygon.io) REST API as used in FinAlly. + +## Overview + +Polygon.io rebranded to **Massive** on October 30, 2025. The API base URL changed to `https://api.massive.com`. The old `https://api.polygon.io` redirects to the same infrastructure and remains functional during the transition period. All existing API keys and integrations continue to work unchanged. + +- **Base URL**: `https://api.massive.com` +- **Legacy URL**: `https://api.polygon.io` (redirects, still works) +- **Python package**: `massive` — install with `pip install -U massive` or `uv add massive` +- **Min Python**: 3.9+ +- **Auth**: API key passed to `RESTClient(api_key=...)` or set as `MASSIVE_API_KEY` env var + +## Authentication + +Two methods are supported: + +**Query parameter** (simple, for raw HTTP): +``` +GET https://api.massive.com/v2/snapshot/.../tickers?apiKey=YOUR_KEY +``` + +**Bearer header** (recommended): +``` +Authorization: Bearer YOUR_KEY +``` + +The official Python SDK uses the Bearer header automatically. + +## Rate Limits + +| Tier | Limit | +|------|-------| +| Free | 5 requests / minute | +| Any paid plan | Unlimited (stay under ~100 req/s) | + +**FinAlly strategy:** +- Free tier: call snapshot endpoint every **15 seconds** (4 calls/min, leaving headroom) +- Paid tier: poll every **2–5 seconds** + +The key insight: the snapshot endpoint returns all tickers in **one API call**, so even with 10+ tickers you consume only 1 request per poll cycle. + +## Python Client Initialization + +```python +from massive import RESTClient + +# Reads MASSIVE_API_KEY from environment automatically +client = RESTClient() + +# Or pass explicitly +client = RESTClient(api_key="your_key_here") +``` + +--- + +## Endpoints Used in FinAlly + +### 1. Full Market Snapshot — Multiple Tickers (Primary Endpoint) + +Gets current prices for multiple tickers in a **single API call**. This is the main endpoint for FinAlly's polling loop. + +**REST:** +``` +GET /v2/snapshot/locale/us/markets/stocks/tickers?tickers=AAPL,GOOGL,MSFT +``` + +**Python SDK:** +```python +from massive import RESTClient +from massive.rest.models import SnapshotMarketType + +client = RESTClient() + +snapshots = client.get_snapshot_all( + market_type=SnapshotMarketType.STOCKS, + tickers=["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "NVDA", "META", "JPM", "V", "NFLX"], +) + +for snap in snapshots: + print(f"{snap.ticker}: ${snap.last_trade.price:.2f}") + print(f" Day change: {snap.day.change_percent:.2f}%") + print(f" OHLC: O={snap.day.open} H={snap.day.high} L={snap.day.low} C={snap.day.close}") + print(f" Volume: {snap.day.volume:,}") + print(f" Updated: {snap.last_trade.timestamp}") +``` + +**Raw HTTP example:** +```python +import requests + +def get_snapshots_raw(tickers: list[str], api_key: str) -> dict[str, float]: + """Returns {ticker: current_price} for all requested tickers.""" + url = "https://api.massive.com/v2/snapshot/locale/us/markets/stocks/tickers" + resp = requests.get(url, params={ + "tickers": ",".join(tickers), + "apiKey": api_key, + }) + resp.raise_for_status() + data = resp.json() + + prices = {} + for t in data.get("tickers", []): + ticker = t["ticker"] + # last trade price is most current; fall back to day close + price = (t.get("lastTrade") or {}).get("p") or (t.get("day") or {}).get("c") + if price: + prices[ticker] = price + return prices +``` + +**Response structure** (per ticker, from raw API): +```json +{ + "ticker": "AAPL", + "day": { + "o": 129.61, + "h": 130.15, + "l": 125.07, + "c": 125.07, + "v": 111237700, + "vw": 127.35 + }, + "lastTrade": { + "p": 125.07, + "s": 100, + "t": 1675190399000000000 + }, + "lastQuote": { + "P": 125.08, + "p": 125.06, + "S": 10, + "s": 8, + "t": 1675190399500000000 + }, + "prevDay": { + "o": 129.61, + "c": 129.61, + "h": 130.00, + "l": 128.50 + }, + "todaysChange": -4.54, + "todaysChangePerc": -3.50, + "updated": 1675190399000000000 +} +``` + +**Key fields:** +| Raw field | SDK attribute | Description | +|---|---|---| +| `lastTrade.p` | `last_trade.price` | Current/most recent traded price | +| `day.c` | `day.close` | Running day close price | +| `prevDay.c` | `prev_daily_bar.close` | Previous day's close (for computing day change %) | +| `todaysChangePerc` | `today_change_percent` | % change from previous close | +| `lastTrade.t` | `last_trade.timestamp` | Timestamp in nanoseconds (raw) / milliseconds (SDK) | + +--- + +### 2. Single Ticker Snapshot + +For fetching detailed data on one specific ticker (e.g., when user clicks for detail view). + +**REST:** +``` +GET /v2/snapshot/locale/us/markets/stocks/tickers/{ticker} +``` + +**Python SDK:** +```python +snapshot = client.get_snapshot_ticker( + market_type=SnapshotMarketType.STOCKS, + ticker="AAPL", +) + +print(f"Price: ${snapshot.last_trade.price:.2f}") +print(f"Bid/Ask: ${snapshot.last_quote.bid_price:.2f} / ${snapshot.last_quote.ask_price:.2f}") +print(f"Day range: ${snapshot.day.low:.2f} – ${snapshot.day.high:.2f}") +print(f"Prev close: ${snapshot.prev_daily_bar.close:.2f}") +``` + +--- + +### 3. Previous Day Bar (End-of-Day Close) + +Gets the previous trading day's OHLCV for a ticker. Useful for seeding the simulator or establishing baseline prices at startup. + +**REST:** +``` +GET /v2/aggs/ticker/{ticker}/prev?adjusted=true +``` + +**Python SDK:** +```python +prev = client.get_previous_close_agg(ticker="AAPL") + +for agg in prev: + print(f"Prev close: ${agg.close:.2f}") + print(f"OHLC: O={agg.open} H={agg.high} L={agg.low} C={agg.close}") + print(f"Volume: {agg.volume:,}") + print(f"Date: {agg.timestamp}") # milliseconds UTC +``` + +**Raw response:** +```json +{ + "ticker": "AAPL", + "resultsCount": 1, + "results": [ + { + "T": "AAPL", + "o": 150.0, + "h": 155.0, + "l": 149.0, + "c": 154.5, + "v": 80000000, + "vw": 152.3, + "t": 1672531200000 + } + ] +} +``` + +--- + +### 4. Aggregate Bars (OHLCV History) + +Historical OHLCV bars over a date range. Needed if we add a historical price chart to the detail view. + +**REST:** +``` +GET /v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from}/{to} +``` + +Parameters: +- `multiplier`: integer (e.g., `1`, `5`) +- `timespan`: `minute`, `hour`, `day`, `week`, `month` +- `from` / `to`: `YYYY-MM-DD` or millisecond timestamp +- `adjusted`: `true` (default) — adjusts for splits/dividends +- `sort`: `asc` or `desc` +- `limit`: max 50,000 + +**Python SDK (daily bars):** +```python +aggs = [] +for bar in client.list_aggs( + ticker="AAPL", + multiplier=1, + timespan="day", + from_="2025-01-01", + to="2026-01-01", + adjusted=True, + sort="asc", + limit=50000, +): + aggs.append(bar) + +for bar in aggs: + print(f"Date: {bar.timestamp} O={bar.open} H={bar.high} L={bar.low} C={bar.close} V={bar.volume:,}") +``` + +**Python SDK (minute bars — intraday):** +```python +for bar in client.list_aggs( + ticker="AAPL", + multiplier=1, + timespan="minute", + from_="2026-03-26", + to="2026-03-27", + adjusted=True, + sort="asc", +): + ... # bar.timestamp is milliseconds UTC +``` + +**Raw bar structure:** +```json +{ + "o": 130.0, + "h": 132.5, + "l": 129.8, + "c": 131.2, + "v": 50000000, + "vw": 130.9, + "n": 42831, + "t": 1672531200000 +} +``` + +Fields: `o`=open, `h`=high, `l`=low, `c`=close, `v`=volume, `vw`=VWAP, `n`=number of trades, `t`=timestamp (ms UTC). + +--- + +### 5. Last Trade / Last Quote + +Individual endpoints for most-recent trade or NBBO quote. Generally not needed for FinAlly since the snapshot endpoint includes this data, but useful for spot-checks. + +```python +# Last trade +trade = client.get_last_trade(ticker="AAPL") +print(f"Last: ${trade.price:.2f} × {trade.size} shares") + +# Last NBBO quote +quote = client.get_last_quote(ticker="AAPL") +print(f"Bid: ${quote.bid:.2f} × {quote.bid_size}") +print(f"Ask: ${quote.ask:.2f} × {quote.ask_size}") +``` + +--- + +## How FinAlly Uses the API + +The Massive poller runs as an async background task: + +1. Read the current watchlist tickers from the database +2. Call `get_snapshot_all(tickers=watchlist)` — **one API call for all tickers** +3. Extract `last_trade.price` and `last_trade.timestamp` per ticker +4. Write into the shared `PriceCache` +5. Sleep for the poll interval, then repeat + +```python +import asyncio +from massive import RESTClient +from massive.rest.models import SnapshotMarketType + +async def poll_massive(api_key: str, get_tickers, price_cache, interval: float = 15.0): + """Poll Massive API and update the shared price cache.""" + client = RESTClient(api_key=api_key) + + while True: + tickers = get_tickers() + if tickers: + try: + snapshots = await asyncio.to_thread( + client.get_snapshot_all, + market_type=SnapshotMarketType.STOCKS, + tickers=tickers, + ) + for snap in snapshots: + if snap.last_trade and snap.last_trade.price: + price_cache.update( + ticker=snap.ticker, + price=snap.last_trade.price, + timestamp=snap.last_trade.timestamp / 1000, # ms → seconds + ) + except Exception as e: + # Log and continue — polling loop must not crash + print(f"Massive poll error: {e}") + + await asyncio.sleep(interval) +``` + +--- + +## Error Handling + +The SDK raises exceptions for HTTP errors: + +| Status | Meaning | Action | +|--------|---------|--------| +| 401 | Invalid API key | Log and abort polling | +| 403 | Insufficient plan permissions | Log and fall back to simulator | +| 429 | Rate limit exceeded | Back off exponentially; log warning | +| 5xx | Server error | SDK retries 3x automatically; then log and continue | + +Handle 403 specifically: if the user's plan doesn't support the snapshot endpoint, fall back to the simulator rather than crashing. + +--- + +## Notes on Timestamps + +- `lastTrade.t` in raw JSON is **nanoseconds** UTC +- SDK attribute `last_trade.timestamp` is **milliseconds** UTC +- Convert to Unix seconds for the `PriceCache`: `timestamp_ms / 1000` + +## Market Hours Notes + +- Snapshot data resets daily at ~3:30 AM EST +- Pre-market data starts populating from ~4:00 AM EST +- Outside trading hours, `last_trade.price` reflects the last traded price (may include after-hours trades) +- `day` object values reflect the current session; during pre-market they may be from the previous session + +## WebSocket Streaming (Paid Tiers Only) + +Not used in FinAlly (REST polling is sufficient and simpler), but available for future enhancement: + +```python +from massive import WebSocketClient + +ws = WebSocketClient( + api_key="YOUR_KEY", + subscriptions=["T.AAPL", "T.MSFT", "AM.*"], # trades or minute aggregates +) + +def handle_msg(msgs): + for m in msgs: + print(m) # Trade or aggregate update + +ws.run(handle_msg=handle_msg) +``` + +Subscription prefixes: `T.` = trades, `Q.` = quotes, `AM.` = per-minute aggregates, `A.` = per-second aggregates. Use `T.*` or `AM.*` for all tickers.