diff --git a/planning/MARKET_DATA_DESIGN.md b/planning/MARKET_DATA_DESIGN.md new file mode 100644 index 00000000..11b77f4f --- /dev/null +++ b/planning/MARKET_DATA_DESIGN.md @@ -0,0 +1,1174 @@ +# Market Data Backend — Detailed Design + +Implementation-ready design for the FinAlly market data subsystem. Covers the unified interface, in-memory price cache, GBM simulator, Massive API client, SSE streaming endpoint, and FastAPI lifecycle integration. + +All code lives under `backend/app/market/`. + +--- + +## Table of Contents + +1. [Architecture Overview](#1-architecture-overview) +2. [File Structure](#2-file-structure) +3. [Data Model — `models.py`](#3-data-model) +4. [Price Cache — `cache.py`](#4-price-cache) +5. [Abstract Interface — `interface.py`](#5-abstract-interface) +6. [Seed Prices & Ticker Parameters — `seed_prices.py`](#6-seed-prices--ticker-parameters) +7. [GBM Simulator — `simulator.py`](#7-gbm-simulator) +8. [Massive API Client — `massive_client.py`](#8-massive-api-client) +9. [Factory — `factory.py`](#9-factory) +10. [SSE Streaming Endpoint — `stream.py`](#10-sse-streaming-endpoint) +11. [FastAPI Lifecycle Integration](#11-fastapi-lifecycle-integration) +12. [Watchlist Coordination](#12-watchlist-coordination) +13. [Error Handling & Edge Cases](#13-error-handling--edge-cases) +14. [Testing Strategy](#14-testing-strategy) +15. [Configuration Summary](#15-configuration-summary) + +--- + +## 1. Architecture Overview + +``` +MarketDataSource (ABC) +├── SimulatorDataSource → GBM simulator (default, no API key needed) +└── MassiveDataSource → Polygon.io REST poller (when MASSIVE_API_KEY set) + │ + ▼ + PriceCache (thread-safe, in-memory) + │ + ├──→ SSE stream endpoint (/api/stream/prices) + ├──→ Portfolio valuation + └──→ Trade execution +``` + +**Strategy pattern**: Both data sources implement the same ABC. Downstream code is source-agnostic — it reads from the `PriceCache`, never from the data source directly. + +**Push model**: Data sources write to the cache on their own schedule (500ms for simulator, 15s for Massive). The SSE endpoint polls the cache independently at 500ms. This decouples timing completely. + +--- + +## 2. File Structure + +``` +backend/app/market/ +├── __init__.py # Re-exports public API +├── models.py # PriceUpdate frozen dataclass +├── cache.py # PriceCache (thread-safe in-memory store) +├── interface.py # MarketDataSource ABC +├── seed_prices.py # SEED_PRICES, TICKER_PARAMS, CORRELATION_GROUPS +├── simulator.py # GBMSimulator + SimulatorDataSource +├── massive_client.py # MassiveDataSource (Polygon.io poller) +├── factory.py # create_market_data_source() +└── stream.py # SSE endpoint (FastAPI router factory) +``` + +Each file has a single responsibility. The `__init__.py` re-exports the public API so the rest of the backend imports from `app.market`: + +```python +"""Market data subsystem for FinAlly. + +Public API: + PriceUpdate - Immutable price snapshot dataclass + PriceCache - Thread-safe in-memory price store + MarketDataSource - Abstract interface for data providers + create_market_data_source - Factory that selects simulator or Massive + create_stream_router - FastAPI router factory for SSE endpoint +""" + +from .cache import PriceCache +from .factory import create_market_data_source +from .interface import MarketDataSource +from .models import PriceUpdate +from .stream import create_stream_router + +__all__ = [ + "PriceUpdate", + "PriceCache", + "MarketDataSource", + "create_market_data_source", + "create_stream_router", +] +``` + +--- + +## 3. Data Model + +**File: `backend/app/market/models.py`** + +`PriceUpdate` is the only data structure that leaves the market data layer. Every downstream consumer — SSE streaming, portfolio valuation, trade execution — works exclusively with this type. + +```python +from __future__ import annotations + +import time +from dataclasses import dataclass, field + + +@dataclass(frozen=True, slots=True) +class PriceUpdate: + """Immutable snapshot of a single ticker's price at a point in time.""" + + ticker: str + price: float + previous_price: float + timestamp: float = field(default_factory=time.time) # Unix seconds + + @property + def change(self) -> float: + """Absolute price change from previous update.""" + return round(self.price - self.previous_price, 4) + + @property + def change_percent(self) -> float: + """Percentage change from previous update.""" + if self.previous_price == 0: + return 0.0 + return round((self.price - self.previous_price) / self.previous_price * 100, 4) + + @property + def direction(self) -> str: + """'up', 'down', or 'flat'.""" + if self.price > self.previous_price: + return "up" + elif self.price < self.previous_price: + return "down" + return "flat" + + def to_dict(self) -> dict: + """Serialize for JSON / SSE transmission.""" + return { + "ticker": self.ticker, + "price": self.price, + "previous_price": self.previous_price, + "timestamp": self.timestamp, + "change": self.change, + "change_percent": self.change_percent, + "direction": self.direction, + } +``` + +### Design decisions + +- **`frozen=True`**: Price updates are immutable value objects. Once created they never change, safe to share across async tasks without copying. +- **`slots=True`**: Minor memory optimization — many are created per second. +- **Computed properties** (`change`, `direction`, `change_percent`): Derived from `price` and `previous_price` so they can never be inconsistent. +- **`to_dict()`**: Single serialization point used by both the SSE endpoint and REST API responses. + +--- + +## 4. Price Cache + +**File: `backend/app/market/cache.py`** + +The price cache is the central data hub. Data sources write to it; SSE streaming and portfolio valuation read from it. It must be thread-safe because the Massive client runs synchronous API calls in `asyncio.to_thread()`. + +```python +from __future__ import annotations + +import time +from threading import Lock + +from .models import PriceUpdate + + +class PriceCache: + """Thread-safe in-memory cache of the latest price for each ticker.""" + + def __init__(self) -> None: + self._prices: dict[str, PriceUpdate] = {} + self._lock = Lock() + self._version: int = 0 # Monotonically increasing; bumped on every update + + def update(self, ticker: str, price: float, timestamp: float | None = None) -> PriceUpdate: + """Record a new price. Returns the created PriceUpdate. + + First update for a ticker sets previous_price == price (direction='flat'). + """ + with self._lock: + ts = timestamp or time.time() + prev = self._prices.get(ticker) + previous_price = prev.price if prev else price + + update = PriceUpdate( + ticker=ticker, + price=round(price, 2), + previous_price=round(previous_price, 2), + timestamp=ts, + ) + self._prices[ticker] = update + self._version += 1 + return update + + def get(self, ticker: str) -> PriceUpdate | None: + with self._lock: + return self._prices.get(ticker) + + def get_all(self) -> dict[str, PriceUpdate]: + with self._lock: + return dict(self._prices) + + def get_price(self, ticker: str) -> float | None: + update = self.get(ticker) + return update.price if update else None + + def remove(self, ticker: str) -> None: + with self._lock: + self._prices.pop(ticker, None) + + @property + def version(self) -> int: + return self._version + + def __len__(self) -> int: + with self._lock: + return len(self._prices) + + def __contains__(self, ticker: str) -> bool: + with self._lock: + return ticker in self._prices +``` + +### Why a version counter? + +The SSE streaming loop polls the cache every ~500ms. Without a version counter, it would serialize and send all prices every tick even if nothing changed (e.g., Massive API only updates every 15s). The version counter lets the SSE loop skip sends when nothing is new: + +```python +last_version = -1 +while True: + if price_cache.version != last_version: + last_version = price_cache.version + yield format_sse(price_cache.get_all()) + await asyncio.sleep(0.5) +``` + +### Why `threading.Lock` instead of `asyncio.Lock`? + +- The Massive client's synchronous `get_snapshot_all()` runs in `asyncio.to_thread()` — a real OS thread. `asyncio.Lock` would not protect against that. +- `threading.Lock` works correctly from both sync threads and the async event loop. +- Under normal load (10 tickers, 2 updates/sec), lock contention is negligible. + +--- + +## 5. Abstract Interface + +**File: `backend/app/market/interface.py`** + +```python +from __future__ import annotations + +from abc import ABC, abstractmethod + + +class MarketDataSource(ABC): + """Contract for market data providers. + + Implementations push price updates into a shared PriceCache on their own + schedule. Downstream code never calls the data source directly for prices — + it reads from the cache. + + Lifecycle: + source = create_market_data_source(cache) + await source.start(["AAPL", "GOOGL", ...]) + # ... app runs ... + await source.add_ticker("TSLA") + await source.remove_ticker("GOOGL") + # ... app shutting down ... + await source.stop() + """ + + @abstractmethod + async def start(self, tickers: list[str]) -> None: + """Begin producing price updates for the given tickers. + + Starts a background task that periodically writes to the PriceCache. + Must be called exactly once. + """ + + @abstractmethod + async def stop(self) -> None: + """Stop the background task and release resources. + + Safe to call multiple times. + """ + + @abstractmethod + async def add_ticker(self, ticker: str) -> None: + """Add a ticker to the active set. No-op if already present.""" + + @abstractmethod + async def remove_ticker(self, ticker: str) -> None: + """Remove a ticker from the active set. Also removes from PriceCache.""" + + @abstractmethod + def get_tickers(self) -> list[str]: + """Return the current list of actively tracked tickers.""" +``` + +### Why the source writes to the cache instead of returning prices + +This push model decouples timing. The simulator ticks at 500ms, Massive polls at 15s, but SSE always reads from the cache at its own 500ms cadence. No need for the SSE layer to know which data source is active or what its update interval is. + +--- + +## 6. Seed Prices & Ticker Parameters + +**File: `backend/app/market/seed_prices.py`** + +Constants only — no logic, no imports. Shared by the simulator (for initial prices and GBM parameters) and potentially by the Massive client (as fallback prices if the API hasn't responded yet). + +```python +"""Seed prices and per-ticker parameters for the market simulator.""" + +# Realistic starting prices for the default watchlist +SEED_PRICES: dict[str, float] = { + "AAPL": 190.00, + "GOOGL": 175.00, + "MSFT": 420.00, + "AMZN": 185.00, + "TSLA": 250.00, + "NVDA": 800.00, + "META": 500.00, + "JPM": 195.00, + "V": 280.00, + "NFLX": 600.00, +} + +# Per-ticker GBM parameters +# sigma: annualized volatility (higher = more price movement) +# mu: annualized drift / expected return +TICKER_PARAMS: dict[str, dict[str, float]] = { + "AAPL": {"sigma": 0.22, "mu": 0.05}, + "GOOGL": {"sigma": 0.25, "mu": 0.05}, + "MSFT": {"sigma": 0.20, "mu": 0.05}, + "AMZN": {"sigma": 0.28, "mu": 0.05}, + "TSLA": {"sigma": 0.50, "mu": 0.03}, # High volatility + "NVDA": {"sigma": 0.40, "mu": 0.08}, # High volatility, strong drift + "META": {"sigma": 0.30, "mu": 0.05}, + "JPM": {"sigma": 0.18, "mu": 0.04}, # Low volatility (bank) + "V": {"sigma": 0.17, "mu": 0.04}, # Low volatility (payments) + "NFLX": {"sigma": 0.35, "mu": 0.05}, +} + +# Default parameters for tickers not in the list above (dynamically added) +DEFAULT_PARAMS: dict[str, float] = {"sigma": 0.25, "mu": 0.05} + +# Correlation groups for the simulator's Cholesky decomposition +CORRELATION_GROUPS: dict[str, set[str]] = { + "tech": {"AAPL", "GOOGL", "MSFT", "AMZN", "META", "NVDA", "NFLX"}, + "finance": {"JPM", "V"}, +} + +# Correlation coefficients +INTRA_TECH_CORR = 0.6 # Tech stocks move together +INTRA_FINANCE_CORR = 0.5 # Finance stocks move together +CROSS_GROUP_CORR = 0.3 # Between sectors / unknown tickers +TSLA_CORR = 0.3 # TSLA does its own thing +``` + +Tickers added dynamically that aren't in `SEED_PRICES` start at a random price between $50-$300 and use `DEFAULT_PARAMS`. + +--- + +## 7. GBM Simulator + +**File: `backend/app/market/simulator.py`** + +Contains two classes: +- **`GBMSimulator`**: Pure math engine. Stateful — holds current prices and advances them one step at a time. +- **`SimulatorDataSource`**: The `MarketDataSource` implementation that wraps `GBMSimulator` in an async loop and writes to the `PriceCache`. + +### 7.1 GBMSimulator — The Math Engine + +**GBM Formula:** +``` +S(t+dt) = S(t) * exp((mu - sigma^2/2) * dt + sigma * sqrt(dt) * Z) +``` + +Where: +- `S(t)` = current price +- `mu` = annualized drift (expected return) +- `sigma` = annualized volatility +- `dt` = time step as fraction of a trading year (~8.48e-8 for 500ms ticks) +- `Z` = correlated standard normal random variable (via Cholesky decomposition) + +```python +from __future__ import annotations + +import asyncio +import logging +import math +import random + +import numpy as np + +from .cache import PriceCache +from .interface import MarketDataSource +from .seed_prices import ( + CORRELATION_GROUPS, CROSS_GROUP_CORR, DEFAULT_PARAMS, + INTRA_FINANCE_CORR, INTRA_TECH_CORR, SEED_PRICES, TICKER_PARAMS, TSLA_CORR, +) + +logger = logging.getLogger(__name__) + + +class GBMSimulator: + """Geometric Brownian Motion simulator for correlated stock prices.""" + + # 252 trading days * 6.5 hours/day * 3600 seconds/hour = 5,896,800 seconds + TRADING_SECONDS_PER_YEAR = 252 * 6.5 * 3600 # 5,896,800 + DEFAULT_DT = 0.5 / TRADING_SECONDS_PER_YEAR # ~8.48e-8 + + def __init__( + self, + tickers: list[str], + dt: float = DEFAULT_DT, + event_probability: float = 0.001, + ) -> None: + self._dt = dt + self._event_prob = event_probability + self._tickers: list[str] = [] + self._prices: dict[str, float] = {} + self._params: dict[str, dict[str, float]] = {} + self._cholesky: np.ndarray | None = None + + for ticker in tickers: + self._add_ticker_internal(ticker) + self._rebuild_cholesky() + + def step(self) -> dict[str, float]: + """Advance all tickers by one time step. Returns {ticker: new_price}.""" + n = len(self._tickers) + if n == 0: + return {} + + z_independent = np.random.standard_normal(n) + if self._cholesky is not None: + z_correlated = self._cholesky @ z_independent + else: + z_correlated = z_independent + + result: dict[str, float] = {} + for i, ticker in enumerate(self._tickers): + params = self._params[ticker] + mu = params["mu"] + sigma = params["sigma"] + + drift = (mu - 0.5 * sigma**2) * self._dt + diffusion = sigma * math.sqrt(self._dt) * z_correlated[i] + self._prices[ticker] *= math.exp(drift + diffusion) + + # Random event: ~0.1% chance per tick per ticker + if random.random() < self._event_prob: + shock_magnitude = random.uniform(0.02, 0.05) + shock_sign = random.choice([-1, 1]) + self._prices[ticker] *= 1 + shock_magnitude * shock_sign + + result[ticker] = round(self._prices[ticker], 2) + return result + + def add_ticker(self, ticker: str) -> None: + if ticker in self._prices: + return + self._add_ticker_internal(ticker) + self._rebuild_cholesky() + + def remove_ticker(self, ticker: str) -> None: + if ticker not in self._prices: + return + self._tickers.remove(ticker) + del self._prices[ticker] + del self._params[ticker] + self._rebuild_cholesky() + + def get_price(self, ticker: str) -> float | None: + return self._prices.get(ticker) + + def get_tickers(self) -> list[str]: + return list(self._tickers) + + def _add_ticker_internal(self, ticker: str) -> None: + """Add without rebuilding Cholesky (for batch initialization).""" + if ticker in self._prices: + return + self._tickers.append(ticker) + self._prices[ticker] = SEED_PRICES.get(ticker, random.uniform(50.0, 300.0)) + self._params[ticker] = TICKER_PARAMS.get(ticker, dict(DEFAULT_PARAMS)) + + def _rebuild_cholesky(self) -> None: + """Rebuild Cholesky decomposition. Called on ticker add/remove. O(n^2) but n < 50.""" + n = len(self._tickers) + if n <= 1: + self._cholesky = None + return + + corr = np.eye(n) + for i in range(n): + for j in range(i + 1, n): + rho = self._pairwise_correlation(self._tickers[i], self._tickers[j]) + corr[i, j] = rho + corr[j, i] = rho + self._cholesky = np.linalg.cholesky(corr) + + @staticmethod + def _pairwise_correlation(t1: str, t2: str) -> float: + """Correlation based on sector grouping.""" + tech = CORRELATION_GROUPS["tech"] + finance = CORRELATION_GROUPS["finance"] + + if t1 == "TSLA" or t2 == "TSLA": + return TSLA_CORR + if t1 in tech and t2 in tech: + return INTRA_TECH_CORR + if t1 in finance and t2 in finance: + return INTRA_FINANCE_CORR + return CROSS_GROUP_CORR +``` + +### Correlated moves explained + +Real stocks don't move independently. We use **Cholesky decomposition** of a correlation matrix to produce correlated random draws from independent ones: + +``` +L = cholesky(C) # C is the n×n correlation matrix +Z_correlated = L @ Z # Z is n independent standard normals +``` + +The correlation structure: +| Pair | Correlation | Rationale | +|------|-------------|-----------| +| Tech ↔ Tech | 0.6 | Sector moves together | +| Finance ↔ Finance | 0.5 | Sector moves together | +| TSLA ↔ anything | 0.3 | TSLA does its own thing | +| Cross-sector | 0.3 | Baseline market correlation | +| Unknown tickers | 0.3 | Default | + +### Random shock events + +Every step, each ticker has a 0.1% chance of a sudden 2-5% move. With 10 tickers at 2 ticks/sec, expect an event roughly every 50 seconds — enough visual drama without being unrealistic. + +### 7.2 SimulatorDataSource — Async Wrapper + +```python +class SimulatorDataSource(MarketDataSource): + """Wraps GBMSimulator in an async background loop writing to PriceCache.""" + + def __init__( + self, + price_cache: PriceCache, + update_interval: float = 0.5, + event_probability: float = 0.001, + ) -> None: + self._cache = price_cache + self._interval = update_interval + self._event_prob = event_probability + self._sim: GBMSimulator | None = None + self._task: asyncio.Task | None = None + + async def start(self, tickers: list[str]) -> None: + self._sim = GBMSimulator(tickers=tickers, event_probability=self._event_prob) + # Seed the cache immediately so SSE has data on first tick + for ticker in tickers: + price = self._sim.get_price(ticker) + if price is not None: + self._cache.update(ticker=ticker, price=price) + self._task = asyncio.create_task(self._run_loop(), name="simulator-loop") + + async def stop(self) -> None: + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def add_ticker(self, ticker: str) -> None: + if self._sim: + self._sim.add_ticker(ticker) + price = self._sim.get_price(ticker) + if price is not None: + self._cache.update(ticker=ticker, price=price) + + async def remove_ticker(self, ticker: str) -> None: + if self._sim: + self._sim.remove_ticker(ticker) + self._cache.remove(ticker) + + def get_tickers(self) -> list[str]: + return self._sim.get_tickers() if self._sim else [] + + async def _run_loop(self) -> None: + while True: + try: + if self._sim: + prices = self._sim.step() + for ticker, price in prices.items(): + self._cache.update(ticker=ticker, price=price) + except Exception: + logger.exception("Simulator step failed") + await asyncio.sleep(self._interval) +``` + +Key behaviors: +- **Immediate seeding**: `start()` populates the cache before the loop begins — no blank-screen delay. +- **Graceful cancellation**: `stop()` cancels and awaits the task, catching `CancelledError`. +- **Exception resilience**: The loop catches exceptions per-step so a single bad tick doesn't kill the feed. + +--- + +## 8. Massive API Client + +**File: `backend/app/market/massive_client.py`** + +Polls the Massive (Polygon.io) REST API snapshot endpoint on a configurable interval. The synchronous Massive client runs in `asyncio.to_thread()` to avoid blocking the event loop. + +```python +from __future__ import annotations + +import asyncio +import logging + +from massive import RESTClient +from massive.rest.models import SnapshotMarketType + +from .cache import PriceCache +from .interface import MarketDataSource + +logger = logging.getLogger(__name__) + + +class MassiveDataSource(MarketDataSource): + """Polls Massive REST API for live stock prices. + + Uses GET /v2/snapshot/locale/us/markets/stocks/tickers to fetch all + watched tickers in a single API call. + + Rate limits: + - Free tier: 5 req/min → poll every 15s (default) + - Paid tiers: higher limits → poll every 2-5s + """ + + def __init__( + self, + api_key: str, + price_cache: PriceCache, + poll_interval: float = 15.0, + ) -> None: + self._api_key = api_key + self._cache = price_cache + self._interval = poll_interval + self._tickers: list[str] = [] + self._task: asyncio.Task | None = None + self._client: RESTClient | None = None + + async def start(self, tickers: list[str]) -> None: + self._client = RESTClient(api_key=self._api_key) + self._tickers = list(tickers) + await self._poll_once() # Immediate first poll + self._task = asyncio.create_task(self._poll_loop(), name="massive-poller") + + async def stop(self) -> None: + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._client = None + + async def add_ticker(self, ticker: str) -> None: + ticker = ticker.upper().strip() + if ticker not in self._tickers: + self._tickers.append(ticker) + + async def remove_ticker(self, ticker: str) -> None: + ticker = ticker.upper().strip() + self._tickers = [t for t in self._tickers if t != ticker] + self._cache.remove(ticker) + + def get_tickers(self) -> list[str]: + return list(self._tickers) + + async def _poll_loop(self) -> None: + while True: + await asyncio.sleep(self._interval) + await self._poll_once() + + async def _poll_once(self) -> None: + if not self._tickers or not self._client: + return + try: + snapshots = await asyncio.to_thread(self._fetch_snapshots) + for snap in snapshots: + try: + price = snap.last_trade.price + timestamp = snap.last_trade.timestamp / 1000.0 # ms → seconds + self._cache.update(ticker=snap.ticker, price=price, timestamp=timestamp) + except (AttributeError, TypeError) as e: + logger.warning("Skipping snapshot for %s: %s", getattr(snap, "ticker", "???"), e) + except Exception as e: + logger.error("Massive poll failed: %s", e) + + def _fetch_snapshots(self) -> list: + """Synchronous Massive API call. Runs in a thread.""" + return self._client.get_snapshot_all( + market_type=SnapshotMarketType.STOCKS, + tickers=self._tickers, + ) +``` + +### Massive API response structure (per ticker) + +```json +{ + "ticker": "AAPL", + "day": { + "open": 129.61, "high": 130.15, "low": 125.07, "close": 125.07, + "volume": 111237700, "change": -4.54, "change_percent": -3.50 + }, + "last_trade": { + "price": 125.07, "size": 100, "exchange": "XNYS", + "timestamp": 1675190399000 + }, + "last_quote": { + "bid_price": 125.06, "ask_price": 125.08, + "bid_size": 500, "ask_size": 1000 + } +} +``` + +We extract `last_trade.price` and `last_trade.timestamp` (converting ms → seconds). + +### Error handling philosophy + +| Error | Behavior | +|-------|----------| +| **401 Unauthorized** | Logged. Poller keeps running (user might fix `.env` and restart). | +| **429 Rate Limited** | Logged. Next poll retries after `poll_interval`. | +| **Network timeout** | Logged. Retries on next cycle. | +| **Malformed snapshot** | Individual ticker skipped. Others still processed. | +| **All tickers fail** | Cache retains last-known prices. SSE streams stale data (better than nothing). | + +--- + +## 9. Factory + +**File: `backend/app/market/factory.py`** + +```python +from __future__ import annotations + +import logging +import os + +from .cache import PriceCache +from .interface import MarketDataSource +from .massive_client import MassiveDataSource +from .simulator import SimulatorDataSource + +logger = logging.getLogger(__name__) + + +def create_market_data_source(price_cache: PriceCache) -> MarketDataSource: + """Select data source based on MASSIVE_API_KEY environment variable. + + Returns an unstarted source. Caller must await source.start(tickers). + """ + api_key = os.environ.get("MASSIVE_API_KEY", "").strip() + + if api_key: + logger.info("Market data source: Massive API (real data)") + return MassiveDataSource(api_key=api_key, price_cache=price_cache) + else: + logger.info("Market data source: GBM Simulator") + return SimulatorDataSource(price_cache=price_cache) +``` + +--- + +## 10. SSE Streaming Endpoint + +**File: `backend/app/market/stream.py`** + +The SSE endpoint holds open a long-lived HTTP connection and pushes price updates as `text/event-stream`. + +```python +from __future__ import annotations + +import asyncio +import json +import logging +from collections.abc import AsyncGenerator + +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse + +from .cache import PriceCache + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/stream", tags=["streaming"]) + + +def create_stream_router(price_cache: PriceCache) -> APIRouter: + """Factory: creates the SSE router with injected PriceCache.""" + + @router.get("/prices") + async def stream_prices(request: Request) -> StreamingResponse: + return StreamingResponse( + _generate_events(price_cache, request), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + }, + ) + + return router + + +async def _generate_events( + price_cache: PriceCache, + request: Request, + interval: float = 0.5, +) -> AsyncGenerator[str, None]: + """Yields SSE-formatted price events every `interval` seconds.""" + yield "retry: 1000\n\n" # Browser reconnect delay + + last_version = -1 + client_ip = request.client.host if request.client else "unknown" + logger.info("SSE client connected: %s", client_ip) + + try: + while True: + if await request.is_disconnected(): + break + + current_version = price_cache.version + if current_version != last_version: + last_version = current_version + prices = price_cache.get_all() + if prices: + data = {ticker: update.to_dict() for ticker, update in prices.items()} + yield f"data: {json.dumps(data)}\n\n" + + await asyncio.sleep(interval) + except asyncio.CancelledError: + logger.info("SSE stream cancelled for: %s", client_ip) +``` + +### SSE wire format + +Each event the client receives: + +``` +data: {"AAPL":{"ticker":"AAPL","price":190.50,"previous_price":190.42,"timestamp":1707580800.5,"change":0.08,"change_percent":0.042,"direction":"up"},"GOOGL":{...}} + +``` + +### Frontend consumption + +```javascript +const eventSource = new EventSource('/api/stream/prices'); +eventSource.onmessage = (event) => { + const prices = JSON.parse(event.data); + // prices = { "AAPL": { ticker, price, previous_price, change, direction, ... }, ... } +}; +``` + +### Why poll-and-push instead of event-driven? + +The SSE endpoint polls the cache on a fixed interval rather than being notified by the data source. This produces predictable, evenly-spaced updates for the frontend. Regular spacing is important for clean sparkline chart visualization. + +--- + +## 11. FastAPI Lifecycle Integration + +The market data system starts and stops with the FastAPI app using the `lifespan` context manager. + +**In `backend/app/main.py`:** + +```python +from contextlib import asynccontextmanager +from fastapi import FastAPI +from app.market import PriceCache, MarketDataSource, create_market_data_source, create_stream_router + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # --- STARTUP --- + price_cache = PriceCache() + app.state.price_cache = price_cache + + source = create_market_data_source(price_cache) + app.state.market_source = source + + initial_tickers = await load_watchlist_tickers() # from SQLite + await source.start(initial_tickers) + + stream_router = create_stream_router(price_cache) + app.include_router(stream_router) + + yield # App is running + + # --- SHUTDOWN --- + await source.stop() + + +app = FastAPI(title="FinAlly", lifespan=lifespan) + + +# Dependency injection helpers +def get_price_cache() -> PriceCache: + return app.state.price_cache + +def get_market_source() -> MarketDataSource: + return app.state.market_source +``` + +### Accessing market data from other routes + +```python +from fastapi import APIRouter, Depends + +router = APIRouter(prefix="/api") + +@router.post("/portfolio/trade") +async def execute_trade( + trade: TradeRequest, + price_cache: PriceCache = Depends(get_price_cache), +): + current_price = price_cache.get_price(trade.ticker) + if current_price is None: + raise HTTPException(400, f"Price not yet available for {trade.ticker}") + # ... execute trade at current_price ... + + +@router.post("/watchlist") +async def add_to_watchlist( + payload: WatchlistAdd, + source: MarketDataSource = Depends(get_market_source), +): + # Insert into DB ... + await source.add_ticker(payload.ticker) + + +@router.delete("/watchlist/{ticker}") +async def remove_from_watchlist( + ticker: str, + source: MarketDataSource = Depends(get_market_source), +): + # Delete from DB ... + await source.remove_ticker(ticker) +``` + +--- + +## 12. Watchlist Coordination + +When the watchlist changes (via REST API or LLM chat), the market data source must be notified. + +### Flow: Adding a Ticker + +``` +User/LLM → POST /api/watchlist {ticker: "PYPL"} + → Insert into watchlist table (SQLite) + → await source.add_ticker("PYPL") + Simulator: adds to GBMSimulator, rebuilds Cholesky, seeds cache immediately + Massive: appends to ticker list, appears on next poll + → Return success with current price (if available) +``` + +### Flow: Removing a Ticker + +``` +User/LLM → DELETE /api/watchlist/PYPL + → Delete from watchlist table (SQLite) + → await source.remove_ticker("PYPL") + Both: removes from active set + removes from cache + → Return success +``` + +### Edge case: Ticker has an open position + +If the user removes a ticker from the watchlist but still holds shares, the ticker should remain in the data source for accurate portfolio valuation: + +```python +@router.delete("/watchlist/{ticker}") +async def remove_from_watchlist(ticker: str, source=Depends(get_market_source)): + await db.delete_watchlist_entry(ticker) + + # Only stop tracking if no open position + position = await db.get_position(ticker) + if position is None or position.quantity == 0: + await source.remove_ticker(ticker) + + return {"status": "ok"} +``` + +--- + +## 13. Error Handling & Edge Cases + +### Empty watchlist at startup + +Both data sources handle `start([])` gracefully — no prices produced, no API calls. SSE sends empty events. When a ticker is added later, tracking begins immediately. + +### Price cache miss during trade + +```python +price = price_cache.get_price(ticker) +if price is None: + raise HTTPException(400, "Price not yet available. Please wait and try again.") +``` + +The simulator avoids this by seeding the cache in `add_ticker()`. Massive may have a brief gap before the next poll. + +### Invalid Massive API key + +First poll fails with 401. Poller logs the error and keeps retrying. SSE streams empty data. Fix: correct the API key and restart. + +### Thread safety under load + +`threading.Lock` is a mutex. Under normal load (10 tickers, 2 updates/sec), contention is negligible. The critical section is tiny (dict lookup + assignment). + +### Simulator precision + +GBM with tiny `dt` is numerically stable: +- Prices are `round()`ed to 2 decimal places in `step()` +- `exp(drift + diffusion)` is always positive — prices can never go negative +- Floating-point precision is not a concern at these magnitudes + +--- + +## 14. Testing Strategy + +73 tests across 6 modules, all passing. 84% overall coverage. + +| Module | Tests | Coverage | +|--------|-------|----------| +| `test_models.py` | 11 | 100% | +| `test_cache.py` | 13 | 100% | +| `test_simulator.py` | 17 | 98% | +| `test_simulator_source.py` | 10 | integration | +| `test_factory.py` | 7 | 100% | +| `test_massive.py` | 13 | 56% (API mocked) | + +### Example: GBMSimulator tests + +```python +class TestGBMSimulator: + def test_step_returns_all_tickers(self): + sim = GBMSimulator(tickers=["AAPL", "GOOGL"]) + result = sim.step() + assert set(result.keys()) == {"AAPL", "GOOGL"} + + def test_prices_are_positive(self): + """GBM prices can never go negative (exp() is always positive).""" + sim = GBMSimulator(tickers=["AAPL"]) + for _ in range(10_000): + prices = sim.step() + assert prices["AAPL"] > 0 + + def test_add_ticker(self): + sim = GBMSimulator(tickers=["AAPL"]) + sim.add_ticker("TSLA") + result = sim.step() + assert "TSLA" in result + + def test_cholesky_rebuilds_on_add(self): + sim = GBMSimulator(tickers=["AAPL"]) + assert sim._cholesky is None # 1 ticker, no matrix + sim.add_ticker("GOOGL") + assert sim._cholesky is not None # 2 tickers, matrix exists +``` + +### Example: PriceCache tests + +```python +class TestPriceCache: + def test_direction_up(self): + cache = PriceCache() + cache.update("AAPL", 190.00) + update = cache.update("AAPL", 191.00) + assert update.direction == "up" + assert update.change == 1.00 + + def test_version_increments(self): + cache = PriceCache() + v0 = cache.version + cache.update("AAPL", 190.00) + assert cache.version == v0 + 1 +``` + +### Example: MassiveDataSource tests (mocked) + +```python +def _make_snapshot(ticker: str, price: float, timestamp_ms: int) -> MagicMock: + snap = MagicMock() + snap.ticker = ticker + snap.last_trade.price = price + snap.last_trade.timestamp = timestamp_ms + return snap + +class TestMassiveDataSource: + async def test_poll_updates_cache(self): + cache = PriceCache() + source = MassiveDataSource(api_key="test", price_cache=cache, poll_interval=60.0) + mock_snapshots = [_make_snapshot("AAPL", 190.50, 1707580800000)] + + with patch.object(source, "_fetch_snapshots", return_value=mock_snapshots): + await source._poll_once() + + assert cache.get_price("AAPL") == 190.50 + + async def test_api_error_does_not_crash(self): + cache = PriceCache() + source = MassiveDataSource(api_key="test", price_cache=cache, poll_interval=60.0) + source._tickers = ["AAPL"] + + with patch.object(source, "_fetch_snapshots", side_effect=Exception("network error")): + await source._poll_once() # Should not raise +``` + +--- + +## 15. Configuration Summary + +| Parameter | Location | Default | Description | +|-----------|----------|---------|-------------| +| `MASSIVE_API_KEY` | Env var | `""` | If set → Massive API; else → simulator | +| `update_interval` | `SimulatorDataSource` | `0.5s` | Time between simulator ticks | +| `poll_interval` | `MassiveDataSource` | `15.0s` | Time between API polls | +| `event_probability` | `GBMSimulator` | `0.001` | Chance of random shock per ticker per tick | +| `dt` | `GBMSimulator` | `~8.5e-8` | GBM time step (fraction of trading year) | +| SSE push interval | `_generate_events()` | `0.5s` | Time between SSE pushes | +| SSE retry directive | `_generate_events()` | `1000ms` | Browser `EventSource` reconnect delay | + +--- + +## Quick-Start Usage + +```python +from app.market import PriceCache, create_market_data_source + +# Startup +cache = PriceCache() +source = create_market_data_source(cache) # Reads MASSIVE_API_KEY +await source.start(["AAPL", "GOOGL", "MSFT", ...]) + +# Read prices +update = cache.get("AAPL") # PriceUpdate or None +price = cache.get_price("AAPL") # float or None +all_prices = cache.get_all() # dict[str, PriceUpdate] + +# Dynamic watchlist +await source.add_ticker("TSLA") +await source.remove_ticker("GOOGL") + +# Shutdown +await source.stop() +```