From 61cf9205fdd3b33b046e6f7acfa31f9eb5108855 Mon Sep 17 00:00:00 2001 From: "alexey.sizov" <59122515+ivl64@users.noreply.github.com> Date: Sun, 19 Apr 2026 10:58:36 +0200 Subject: [PATCH 1/6] Add market data planning docs: Massive API, unified interface, and simulator Documents the Massive (formerly Polygon.io) REST API endpoints and Python SDK usage, the unified MarketDataSource interface and PriceCache architecture, and the GBM simulator design including correlation structure and seed parameters. Co-Authored-By: Claude Sonnet 4.6 --- planning/MARKET_INTERFACE.md | 220 +++++++++++++++++++++++++++++++++++ planning/MARKET_SIMULATOR.md | 198 +++++++++++++++++++++++++++++++ planning/MASSIVE_API.md | 199 +++++++++++++++++++++++++++++++ 3 files changed, 617 insertions(+) create mode 100644 planning/MARKET_INTERFACE.md create mode 100644 planning/MARKET_SIMULATOR.md create mode 100644 planning/MASSIVE_API.md diff --git a/planning/MARKET_INTERFACE.md b/planning/MARKET_INTERFACE.md new file mode 100644 index 00000000..88cb1086 --- /dev/null +++ b/planning/MARKET_INTERFACE.md @@ -0,0 +1,220 @@ +# Market Data Unified Interface + +This document describes the unified Python API for retrieving stock prices in FinAlly. +The same interface works whether prices come from the Massive REST API or the built-in +GBM simulator. All downstream code (SSE streaming, portfolio valuation, trade execution) +reads from `PriceCache` and never touches the data source directly. + +--- + +## Architecture + +``` +Environment variable check (MASSIVE_API_KEY) + │ + ├── set & non-empty ──→ MassiveDataSource (real market data, REST polling) + └── absent / empty ──→ SimulatorDataSource (GBM simulation, no dependencies) + │ + ▼ writes prices every tick + PriceCache (thread-safe in-memory store) + │ + ├──→ SSE stream /api/stream/prices + ├──→ Portfolio valuation + └──→ Trade execution +``` + +--- + +## Module Layout + +All code lives in `backend/app/market/`: + +| File | Contents | +|--------------------|--------------------------------------------------------------| +| `models.py` | `PriceUpdate` — the canonical price record | +| `interface.py` | `MarketDataSource` — abstract base class | +| `cache.py` | `PriceCache` — thread-safe price store | +| `seed_prices.py` | Starting prices and GBM parameters for all default tickers | +| `simulator.py` | `GBMSimulator` + `SimulatorDataSource` | +| `massive_client.py`| `MassiveDataSource` | +| `factory.py` | `create_market_data_source()` — selects implementation | +| `stream.py` | FastAPI SSE endpoint (`/api/stream/prices`) | + +Public re-exports via `__init__.py`: + +```python +from app.market import PriceCache, PriceUpdate, create_market_data_source +``` + +--- + +## Data Model: `PriceUpdate` + +Immutable frozen dataclass. Every price change is represented as one of these. + +```python +@dataclass(frozen=True, slots=True) +class PriceUpdate: + ticker: str + price: float + previous_price: float + timestamp: float # Unix seconds + + # Computed properties: + change: float # price - previous_price + change_percent: float # percentage change from previous_price + direction: str # "up" | "down" | "flat" + + def to_dict(self) -> dict: + """Serialize for JSON / SSE transmission.""" +``` + +--- + +## Abstract Interface: `MarketDataSource` + +```python +class MarketDataSource(ABC): + + async def start(self, tickers: list[str]) -> None: + """Begin producing prices. Starts a background task. Call once.""" + + async def stop(self) -> None: + """Stop the background task. Safe to call multiple times.""" + + async def add_ticker(self, ticker: str) -> None: + """Add a ticker to the active set. No-op if already present.""" + + async def remove_ticker(self, ticker: str) -> None: + """Remove a ticker. Also purges it from PriceCache.""" + + def get_tickers(self) -> list[str]: + """Return the current list of tracked tickers.""" +``` + +Both `MassiveDataSource` and `SimulatorDataSource` implement this interface. +Swapping one for the other requires no changes to any other module. + +--- + +## PriceCache + +Thread-safe store. Producers call `update()`; consumers call `get()` / `get_all()`. + +```python +class PriceCache: + + def update(self, ticker: str, price: float, timestamp: float | None = None) -> PriceUpdate: + """Record a new price. Computes direction from previous value.""" + + def get(self, ticker: str) -> PriceUpdate | None: + """Latest PriceUpdate for one ticker, or None.""" + + def get_all(self) -> dict[str, PriceUpdate]: + """Snapshot of all current prices (shallow copy).""" + + def get_price(self, ticker: str) -> float | None: + """Convenience: just the price float, or None.""" + + def remove(self, ticker: str) -> None: + """Remove a ticker from the cache.""" + + @property + def version(self) -> int: + """Monotonically increasing counter. Bumped on every update. + Used by the SSE endpoint for change detection (avoids re-sending unchanged prices).""" +``` + +--- + +## Factory: Selecting the Implementation + +```python +# backend/app/market/factory.py + +def create_market_data_source(price_cache: PriceCache) -> MarketDataSource: + api_key = os.environ.get("MASSIVE_API_KEY", "").strip() + if api_key: + return MassiveDataSource(api_key=api_key, price_cache=price_cache) + else: + return SimulatorDataSource(price_cache=price_cache) +``` + +The factory reads `MASSIVE_API_KEY` from the environment (loaded from `.env` at startup). + +--- + +## Startup / Shutdown (FastAPI lifespan) + +```python +from app.market import PriceCache, create_market_data_source + +cache = PriceCache() +source = create_market_data_source(cache) + +# On startup: +await source.start(["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", + "NVDA", "META", "JPM", "V", "NFLX"]) + +# On shutdown: +await source.stop() +``` + +--- + +## Reading Prices (downstream code) + +```python +# Single ticker +update = cache.get("AAPL") # PriceUpdate | None +price = cache.get_price("AAPL") # float | None + +# All tickers (e.g., for SSE broadcast or portfolio valuation) +all_prices = cache.get_all() # dict[str, PriceUpdate] + +# Access fields +update.price +update.previous_price +update.change_percent +update.direction # "up" | "down" | "flat" +update.to_dict() # JSON-ready dict +``` + +--- + +## Dynamic Watchlist + +The watchlist API routes call these methods when the user adds/removes tickers: + +```python +await source.add_ticker("PYPL") # starts tracking; cache has a price on next tick +await source.remove_ticker("NFLX") # stops tracking; removes from cache immediately +``` + +--- + +## SSE Stream + +`GET /api/stream/prices` — long-lived SSE connection. The server pushes all cached +prices every 500 ms (or immediately when the version counter increments). Each event: + +``` +event: price +data: {"ticker":"AAPL","price":190.25,"previous_price":190.10,"timestamp":1712345678.0, + "change":0.15,"change_percent":0.0789,"direction":"up"} +``` + +The frontend uses the native `EventSource` API; no library required. +Reconnection is automatic via `EventSource`'s built-in retry mechanism. + +--- + +## Environment Variables + +| Variable | Effect | +|------------------|----------------------------------------------------------------| +| `MASSIVE_API_KEY`| Set to use real market data. Leave empty for the simulator. | +| (none) | Simulator runs by default — no API key required. | + +Poll interval for `MassiveDataSource` defaults to **15 seconds** (free tier safe). +Simulator update interval defaults to **500 ms**. diff --git a/planning/MARKET_SIMULATOR.md b/planning/MARKET_SIMULATOR.md new file mode 100644 index 00000000..8a6b1f3b --- /dev/null +++ b/planning/MARKET_SIMULATOR.md @@ -0,0 +1,198 @@ +# Market Simulator + +The GBM simulator generates realistic-looking stock price movements without any external +API dependency. It is the default data source when `MASSIVE_API_KEY` is not set. + +--- + +## Implementation Files + +| File | Contents | +|------------------|---------------------------------------------------------------| +| `simulator.py` | `GBMSimulator` (math engine) + `SimulatorDataSource` (async wrapper) | +| `seed_prices.py` | Starting prices, per-ticker GBM parameters, correlation config | + +--- + +## Mathematical Model: Geometric Brownian Motion + +Each tick advances every price by one GBM step: + +``` +S(t+dt) = S(t) * exp( (mu - sigma²/2) * dt + sigma * sqrt(dt) * Z ) +``` + +Where: +- `S(t)` — current price +- `mu` — annualized drift (expected return, e.g. 0.05 = 5%/year) +- `sigma` — annualized volatility (e.g. 0.22 = 22%/year for AAPL) +- `dt` — time step as fraction of a trading year (~8.48×10⁻⁸ for 500 ms ticks) +- `Z` — correlated standard normal random variable (see below) + +With `dt` this small, each tick produces sub-cent moves that accumulate naturally into +realistic-looking drifts and swings over minutes and hours. + +--- + +## Correlated Moves (Cholesky Decomposition) + +Stocks in the same sector tend to move together. The simulator models this via a +correlation matrix and its Cholesky decomposition. + +**Correlation values:** + +| Relationship | Coefficient | +|----------------------------------|-------------| +| Same tech sector (AAPL, GOOGL…) | 0.60 | +| Same finance sector (JPM, V) | 0.50 | +| TSLA with anything | 0.30 | +| Cross-sector / unknown tickers | 0.30 | + +**Per-tick generation:** + +```python +z_independent = np.random.standard_normal(n) # n = number of tickers +z_correlated = cholesky_matrix @ z_independent # correlated draws +``` + +Each ticker `i` uses `z_correlated[i]` in the GBM formula instead of a raw normal draw. +The Cholesky matrix is rebuilt whenever tickers are added or removed (O(n²), n < 50). + +--- + +## Seed Prices and Per-Ticker Parameters + +Defined in `seed_prices.py`. These are the starting values when the simulator initialises: + +| Ticker | Seed Price | Volatility (σ) | Drift (μ) | Notes | +|--------|-----------|----------------|-----------|---------------------| +| AAPL | $190 | 22% | 5% | | +| GOOGL | $175 | 25% | 5% | | +| MSFT | $420 | 20% | 5% | | +| AMZN | $185 | 28% | 5% | | +| TSLA | $250 | 50% | 3% | High volatility | +| NVDA | $800 | 40% | 8% | High vol, high drift | +| META | $500 | 30% | 5% | | +| JPM | $195 | 18% | 4% | Low vol (bank) | +| V | $280 | 17% | 4% | Low vol (payments) | +| NFLX | $600 | 35% | 5% | | + +Tickers added dynamically (not in the table above) use defaults: σ=25%, μ=5%, seed=random $50–$300. + +--- + +## Random Shock Events + +On every tick, each ticker has a 0.1% chance of a sudden 2–5% price move (up or down). +With 10 tickers at 2 ticks/second, expect roughly one shock event every 50 seconds. + +```python +if random.random() < 0.001: + shock = random.uniform(0.02, 0.05) + sign = random.choice([-1, 1]) + price *= (1 + shock * sign) +``` + +This creates the visual drama of news-driven moves without modelling news. + +--- + +## Class Structure + +### `GBMSimulator` (pure math, synchronous) + +```python +class GBMSimulator: + def __init__(self, tickers: list[str], dt: float = DEFAULT_DT, + event_probability: float = 0.001) -> None: ... + + def step(self) -> dict[str, float]: + """Advance all tickers by one dt. Returns {ticker: new_price}. + Hot path — called every 500 ms.""" + + def add_ticker(self, ticker: str) -> None: + """Add ticker; rebuilds Cholesky matrix.""" + + def remove_ticker(self, ticker: str) -> None: + """Remove ticker; rebuilds Cholesky matrix.""" + + def get_price(self, ticker: str) -> float | None: ... + def get_tickers(self) -> list[str]: ... +``` + +`DEFAULT_DT = 0.5 / (252 * 6.5 * 3600)` — 500 ms expressed as a fraction of a trading year +(252 trading days × 6.5 hours/day × 3600 s/hour = 5,896,800 trading seconds/year). + +### `SimulatorDataSource` (async wrapper, implements `MarketDataSource`) + +```python +class SimulatorDataSource(MarketDataSource): + def __init__(self, price_cache: PriceCache, + update_interval: float = 0.5, + event_probability: float = 0.001) -> None: ... + + async def start(self, tickers: list[str]) -> None: + """Create GBMSimulator, seed cache with initial prices, start background loop.""" + + async def stop(self) -> None: + """Cancel background task.""" + + async def add_ticker(self, ticker: str) -> None: + """Forward to GBMSimulator; seed cache immediately.""" + + async def remove_ticker(self, ticker: str) -> None: + """Forward to GBMSimulator; purge from PriceCache.""" + + async def _run_loop(self) -> None: + """Core loop: step() → write all prices to cache → sleep 500 ms.""" +``` + +The background loop: + +```python +while True: + prices = self._sim.step() # GBM math + for ticker, price in prices.items(): + self._cache.update(ticker=ticker, price=price) # write to cache + await asyncio.sleep(self._interval) # 500 ms +``` + +--- + +## Adding a New Ticker at Runtime + +```python +# SimulatorDataSource.add_ticker() does this: +self._sim.add_ticker(ticker) # extend GBMSimulator state +price = self._sim.get_price(ticker) # get the seed price +self._cache.update(ticker, price) # immediately visible to SSE / portfolio +``` + +The new ticker appears in the next SSE broadcast (within 500 ms). + +--- + +## "Daily Change" in the Simulator + +The simulator has no concept of market open/close, so "daily change" is defined as the +percentage change from the **seed price** at process startup. The SSE payload always +includes `previous_price` (price before the last tick), so the frontend can compute +its own running change; the seed price itself is available in `seed_prices.py` for +any backend calculation that needs a stable reference. + +--- + +## Test Coverage + +The simulator is tested in `backend/tests/market/`: + +| Test module | What it covers | +|--------------------------|-------------------------------------------------------------| +| `test_simulator.py` | GBM math validity, Cholesky structure, shock events, add/remove | +| `test_simulator_source.py` | Async lifecycle, cache integration, dynamic ticker management | + +Run with: + +```bash +cd backend && uv run pytest tests/market/test_simulator.py -v +``` diff --git a/planning/MASSIVE_API.md b/planning/MASSIVE_API.md new file mode 100644 index 00000000..e10b579c --- /dev/null +++ b/planning/MASSIVE_API.md @@ -0,0 +1,199 @@ +# Massive (formerly Polygon.io) API Reference + +Massive rebranded from Polygon.io on October 30, 2025. All existing API keys, accounts, +and integrations continue to work unchanged. The Python SDK package is `massive`. + +--- + +## Authentication + +```python +from massive import RESTClient +client = RESTClient(api_key="YOUR_MASSIVE_API_KEY") +``` + +The SDK wraps `https://api.polygon.io`. All raw HTTP calls require `?apiKey=`. + +--- + +## Endpoints Used by This Project + +### 1. Full Market Snapshot (primary polling endpoint) + +`GET /v2/snapshot/locale/us/markets/stocks/tickers` + +Fetches the latest price data for multiple tickers in a **single** request. +This is the right endpoint for watchlist polling — one call covers all tickers. + +**Query parameters:** + +| Parameter | Type | Notes | +|---------------|-------------------|----------------------------------------------------| +| `tickers` | comma-sep string | e.g. `AAPL,TSLA,NVDA`. Omit to get all US stocks. | +| `include_otc` | boolean | Include OTC securities; default `false`. | + +**Response shape:** + +```json +{ + "count": 2, + "status": "OK", + "tickers": [ + { + "ticker": "AAPL", + "todaysChange": 2.31, + "todaysChangePerc": 1.22, + "updated": 1712345678000, + "day": { "o": 187.50, "h": 191.20, "l": 186.80, "c": 190.25, "v": 54321000, "vw": 189.10 }, + "prevDay": { "o": 185.00, "h": 188.50, "l": 184.30, "c": 187.94, "v": 61200000, "vw": 186.75 }, + "min": { "o": 190.10, "h": 190.40, "l": 189.90, "c": 190.25, "v": 12340, "t": 1712345640000 }, + "lastTrade": { "p": 190.25, "s": 100, "t": 1712345678000, "x": 4 }, + "lastQuote": { "P": 190.26, "Q": 200, "p": 190.24, "S": 300, "t": 1712345679000 } + } + ] +} +``` + +`lastTrade` fields: `p` = price, `s` = size, `t` = timestamp (Unix ms), `x` = exchange ID. +`lastQuote` fields: `P` = ask price, `p` = bid price, `Q` = ask size, `S` = bid size. + +**Python SDK:** + +```python +from massive import RESTClient +from massive.rest.models import SnapshotMarketType + +client = RESTClient(api_key=api_key) + +# The RESTClient is synchronous — use asyncio.to_thread in async contexts +snapshots = client.get_snapshot_all( + market_type=SnapshotMarketType.STOCKS, + tickers=["AAPL", "TSLA", "NVDA"], +) + +for snap in snapshots: + price = snap.last_trade.price + ts = snap.last_trade.timestamp / 1000.0 # ms → seconds + change_pct = snap.todays_change_perc + print(f"{snap.ticker}: ${price:.2f} ({change_pct:+.2f}%)") +``` + +**Rate limits:** + +| Plan | Requests/min | Recommended poll interval | +|-----------|--------------|---------------------------| +| Free | 5 | 15 s | +| Starter | 100 | 5 s | +| Developer | 1 000 | 2 s | +| Advanced | unlimited | < 1 s | + +Snapshot data clears at 3:30 AM EST and refreshes from ~4:00 AM EST. + +--- + +### 2. Previous Day Bar (end-of-day reference price) + +`GET /v2/aggs/ticker/{ticker}/prev` + +Returns OHLCV for the last completed trading session. Used to compute the "daily change" +percentage shown in the watchlist (today's price vs. yesterday's close). + +**Parameters:** `adjusted` (boolean, default `true`) — split-adjusted prices. + +**Response:** + +```json +{ + "ticker": "AAPL", + "status": "OK", + "results": [ + { "T": "AAPL", "o": 185.00, "h": 188.50, "l": 184.30, "c": 187.94, + "v": 61200000, "vw": 186.75, "t": 1712188800000, "n": 523401 } + ] +} +``` + +Fields: `T` = ticker, `o/h/l/c` = OHLC, `v` = volume, `vw` = VWAP, +`t` = session start (Unix ms), `n` = transaction count. + +**Python SDK:** + +```python +aggs = client.get_previous_close_agg(ticker="AAPL") +prev_close = aggs[0].close # float +prev_open = aggs[0].open # float +``` + +--- + +### 3. Single Ticker Snapshot + +`GET /v2/snapshot/locale/us/markets/stocks/tickers/{ticker}` + +Same shape as one element of the full snapshot. Useful for a quick price lookup +on a single ticker without fetching the whole watchlist. + +--- + +### 4. Last Trade + +`GET /v2/last/trade/{ticker}` + +Most recent trade tick. Lighter than a full snapshot. +Note: timestamp `t` here is **nanoseconds** (unlike snapshot's milliseconds). + +```json +{ "status": "OK", "results": { "T": "AAPL", "p": 190.25, "s": 100, "t": 1712345678000000000, "x": 4 } } +``` + +--- + +## Async Usage Pattern (FastAPI) + +The Massive `RESTClient` is **synchronous**. Always wrap in `asyncio.to_thread`: + +```python +import asyncio +from massive import RESTClient +from massive.rest.models import SnapshotMarketType + +client = RESTClient(api_key=api_key) + +snapshots = await asyncio.to_thread( + client.get_snapshot_all, + market_type=SnapshotMarketType.STOCKS, + tickers=["AAPL", "NVDA"], +) +``` + +--- + +## Error Handling + +| HTTP | Meaning | +|------|------------------------------------------| +| 401 | Invalid/missing API key | +| 403 | Endpoint requires a higher plan | +| 404 | Ticker not found | +| 429 | Rate limit exceeded — back off and retry | + +Standard pattern: catch broadly, log, and let the poll loop retry: + +```python +try: + snapshots = client.get_snapshot_all( + market_type=SnapshotMarketType.STOCKS, + tickers=tickers, + ) +except Exception as e: + logger.error("Massive poll failed: %s", e) + # Don't re-raise — the background loop will retry on the next interval +``` + +--- + +## Installation + +```bash +uv add massive +``` From 57b10d84d87d1f490368330d2bd7c37a48743f26 Mon Sep 17 00:00:00 2001 From: "alexey.sizov" <59122515+ivl64@users.noreply.github.com> Date: Sun, 19 Apr 2026 11:22:29 +0200 Subject: [PATCH 2/6] Add plan review notes and document gaps/ambiguities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Appends §13 to PLAN.md with identified gaps, inconsistencies, and minor issues. Adds REVIEW.md with a detailed structured review covering MVP phasing, API contracts, LLM failure handling, and implementation order. Also updates enabled plugins and fixes cerebras skill name casing. Co-Authored-By: Claude Sonnet 4.6 --- .claude/settings.json | 4 +- .claude/skills/cerebras/SKILL.md | 2 +- planning/PLAN.md | 43 +++++++ planning/REVIEW.md | 200 +++++++++++++++++++++++++++++++ 4 files changed, 245 insertions(+), 4 deletions(-) create mode 100644 planning/REVIEW.md diff --git a/.claude/settings.json b/.claude/settings.json index aa06f43d..ac45dd25 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -1,7 +1,5 @@ { "enabledPlugins": { - "frontend-design@claude-plugins-official": true, - "context7@claude-plugins-official": true, - "playwright@claude-plugins-official": true + "feature-dev@claude-plugins-official": true } } diff --git a/.claude/skills/cerebras/SKILL.md b/.claude/skills/cerebras/SKILL.md index 9efd01a3..4642a81f 100644 --- a/.claude/skills/cerebras/SKILL.md +++ b/.claude/skills/cerebras/SKILL.md @@ -1,5 +1,5 @@ --- -name: cerebras-inference +name: Cerebras Inference description: Use this to write code to call an LLM using LiteLLM and OpenRouter with the Cerebras inference provider --- diff --git a/planning/PLAN.md b/planning/PLAN.md index bc1811b3..5906cffe 100644 --- a/planning/PLAN.md +++ b/planning/PLAN.md @@ -454,3 +454,46 @@ The container is designed to deploy to AWS App Runner, Render, or any container - Portfolio visualization: heatmap renders with correct colors, P&L chart has data points - AI chat (mocked): send a message, receive a response, trade execution appears inline - SSE resilience: disconnect and verify reconnection + +--- + +## 13. Document Review Notes + +### Gaps & Ambiguities + +**§6 SSE Streaming — "all tickers known to the system"** +The SSE endpoint streams prices for "all tickers known to the system." It's unclear whether this means the watchlist only, or all tickers ever cached (including tickers from positions in stocks no longer on the watchlist). This matters when a user holds a position in a delisted watchlist item. Recommend clarifying: SSE streams the union of watchlist + current positions. + +**§8 API — no response shape docs** +The endpoint table describes inputs but not response shapes. Agents implementing the frontend and backend will need to infer or invent these independently. At minimum, `GET /api/portfolio` and `POST /api/chat` response schemas should be documented here, as they're complex enough to cause integration mismatches. + +**§9 LLM — chat history window** +"Loads recent conversation history" — how many messages? No limit is specified. Without a bound, a long session will grow the prompt indefinitely and eventually hit context limits. Recommend specifying a number (e.g., last 20 messages). + +**§9 LLM — failed trade feedback loop** +"If a trade fails validation, the error is included in the chat response" — but the response has already been returned to the frontend at that point. Clarify whether validation happens before or after the LLM call, and whether the LLM response `message` is regenerated to include the error, or whether the backend appends an error annotation. Currently ambiguous. + +**§7 Database — `portfolio_snapshots` growth** +Snapshots accumulate every 30 seconds indefinitely. For a long-running demo session this grows unbounded. The P&L chart doesn't need every data point. Recommend specifying a retention policy (e.g., keep last 24 hours, or downsample after N points). + +**§10 Frontend — "daily change %"** +The watchlist panel should show "daily change %" but the simulator doesn't have a concept of a daily open price — it just uses GBM from startup. Clarify what "daily change" means for the simulator: change since process start? Change from seed price? This affects both the backend SSE payload and frontend display. + +### Inconsistencies + +**§4 vs §11 — Dockerfile path for static files** +§4 shows `backend/db/` containing schema SQL. §11 says the Dockerfile copies `frontend build output into a static/ directory` (inside the container), but doesn't say where relative to `/app`. The `docker run` command in §11 only mounts `/app/db`. If static files land at `/app/static/` and FastAPI needs to serve them, the path assumption should be made explicit in the Dockerfile pseudocode. + +**§9 — Model name** +`openrouter/openai/gpt-oss-120b` doesn't match any known OpenRouter model identifier. The cerebras-inference skill likely uses a different model string. This will cause a runtime error for any agent who follows the plan literally. The exact model ID should be taken from the cerebras skill. + +### Minor Issues + +**§11 Dockerfile — missing lockfile copy** +`uv sync` requires `uv.lock` to be present. The pseudocode only shows `Copy backend/` — this works if the lockfile is inside `backend/`, but it should be explicit since forgetting the lockfile is a common Docker mistake. + +**§2 — Sparklines reset on page refresh** +Sparklines "accumulated from SSE since page load" are ephemeral by design, but this means they're always empty on first load. Worth a one-line note so the frontend engineer doesn't try to persist or pre-populate them from the API. + +**§12 — E2E test gap: LLM mock content undefined** +The E2E tests use `LLM_MOCK=true` and test "trade execution appears inline" — but nowhere in the plan is the mock response content specified. The backend must return a deterministic mock that includes at least one trade for the test to pass. The mock payload should be documented (even briefly) so backend and E2E test implementations agree. diff --git a/planning/REVIEW.md b/planning/REVIEW.md new file mode 100644 index 00000000..cb70ae53 --- /dev/null +++ b/planning/REVIEW.md @@ -0,0 +1,200 @@ +# Review of `planning/PLAN.md` + +## Overall Feedback + +This is a strong project plan. It has a clear product vision, a coherent architecture, and a good explanation for why the main technology choices were made. That is already better than many early project plans. + +The main issue is not that the plan is weak. The main issue is that it is ambitious. For a junior developer, the current document explains **what** the product should be, but it does not always explain **in what order to build it**, **what can be simplified first**, and **what exact contracts different parts of the system should follow**. + +In short: the plan is exciting and well-scoped at the product level, but it still needs more implementation-level guidance. + +## What Is Working Well + +- The vision is easy to understand and gives the project a strong identity. +- The architecture choices are mostly sensible for a teaching project: FastAPI, SQLite, SSE, one container, one port. +- The plan correctly avoids unnecessary complexity in some places, especially by using market orders only and defaulting to a simulator. +- The testing section is a good sign. It shows that quality was considered early. +- The built-in review notes in Section 13 are useful and catch several real gaps. + +## Main Suggestions for Improvement + +### 1. Define a clear MVP before the “full demo” version + +Right now the plan mixes core requirements and “wow factor” features together. That makes implementation riskier. + +For a junior developer, I would strongly recommend splitting the work into: + +- **MVP**: watchlist, streaming prices, manual buy/sell, portfolio summary, simple chat request/response +- **Phase 2**: treemap, richer charts, watchlist management through AI, better animations +- **Phase 3 / stretch**: cloud deployment, more polished mock behavior, advanced charting improvements + +This matters because a good software plan should make it obvious what can be cut if time runs short. + +### 2. Add a milestone-based implementation order + +The document describes the final product well, but not the build sequence. + +That is important for a junior developer because they need a safe path like: + +1. Create backend health check and static frontend serving +2. Set up SQLite schema and seed data +3. Implement simulator and in-memory price cache +4. Add SSE stream and verify prices update in browser +5. Implement manual trade API and portfolio calculations +6. Build basic frontend watchlist and portfolio table +7. Add chat endpoint with `LLM_MOCK=true` +8. Integrate real LLM only after mock mode works +9. Add charts, treemap, and UI polish last + +Without this order, it is easy to start with the flashy parts and get blocked by the fundamentals. + +### 3. Define API response shapes, not just endpoint names + +This is one of the biggest practical gaps. + +A frontend developer and a backend developer can both follow the current plan and still build incompatible implementations. The plan should include example JSON responses for at least: + +- `GET /api/portfolio` +- `GET /api/watchlist` +- `GET /api/portfolio/history` +- `POST /api/portfolio/trade` +- `POST /api/chat` + +For a junior developer, explicit contracts reduce confusion and rework. + +### 4. Resolve open questions inside the plan, not only in the review notes + +Section 13 identifies good problems, but they are still left as open issues. A stronger version of the plan would move the decisions back into the earlier sections. + +Examples: + +- What exactly is included in SSE: watchlist only, or watchlist + held positions? +- How many chat messages are included in LLM history? +- What does “daily change %” mean in simulator mode? +- What is the exact mock chat response used in tests? + +A plan is more helpful when it contains decisions, not just observations. + +### 5. Simplify the LLM scope and describe failure behavior more carefully + +The AI assistant is one of the riskiest parts of the project because it combines prompting, structured outputs, validation, side effects, and UI updates. + +For a junior developer, I would suggest narrowing the first version: + +- Start with chat that returns a message only +- Then add structured watchlist actions +- Then add structured trade actions +- Only after that allow auto-execution + +If auto-execution stays in scope from day one, the plan should clearly define: + +- what happens when the model returns invalid JSON +- what happens when only some requested trades succeed +- whether the backend edits the assistant message after validation fails +- what response shape the frontend receives for partial success + +### 6. Add explicit trade and portfolio rules + +The plan describes the database schema, but some business rules are still implicit. + +For example, the plan should answer these directly: + +- Are fractional shares allowed everywhere in the UI and API? +- How many decimal places are supported? +- How is average cost recalculated after multiple buys? +- What happens when selling the entire position? +- Are negative or zero quantities rejected with `400 Bad Request`? +- Are unknown tickers allowed if they are not already in the watchlist? + +Junior developers often struggle less with coding than with unclear business rules. + +### 7. Add error states and empty states to the UX section + +The user experience section focuses on the ideal path, which is good, but implementation also needs non-happy-path behavior. + +The plan should describe: + +- what the UI shows when SSE disconnects +- what happens if the market data provider is unavailable +- what happens if the chat request fails +- what the portfolio area shows when there are no positions +- what the chart shows before enough data exists + +These states are easy to forget and often create rough demos. + +### 8. Reduce accidental complexity in persistence + +The snapshot idea is useful, but the retention policy should be defined now. Otherwise the table can grow forever. + +A simple rule would be enough: + +- keep snapshots for the last 24 hours, or +- keep the latest N rows, or +- store every 30 seconds for 30 minutes, then downsample older data + +The important thing is to choose a rule before implementation. + +### 9. Make the testing plan more priority-driven + +The testing section is good, but still broad. A junior developer benefits from knowing which tests matter most. + +I would recommend stating a minimum required test set: + +- one backend test for trade validation +- one backend test for portfolio calculation +- one backend test for simulator output format +- one E2E test for first launch +- one E2E test for manual buy flow +- one E2E test for mocked chat flow + +This gives a realistic testing floor before adding more coverage. + +### 10. Fix a few inconsistencies before anyone starts coding + +These are small, but they can waste time: + +- The model identifier in the LLM section should be verified before implementation. +- The Docker section should explicitly describe where the built frontend files live in the final image. +- The Docker steps should clearly mention `uv.lock`. +- The plan should state whether sparkline data is intentionally temporary and reset on page refresh. + +## Suggested Additions + +If this plan is going to guide implementation, I would add three short sections: + +### A. Non-Goals + +This helps prevent scope creep. For example: + +- No real-money trading +- No authentication or multi-user support in v1 +- No order book or limit orders +- No portfolio import/export + +### B. Definition of Done + +This helps a junior developer know when the project is actually complete. For example: + +- App starts with one documented command +- Default watchlist loads +- Prices update without manual refresh +- User can buy and sell successfully +- Portfolio values update correctly +- Chat works in mock mode +- E2E tests pass + +### C. Implementation Milestones + +A short milestone table would make this much easier to execute and review. + +## Final Recommendation + +I would not rewrite the whole plan. The foundation is solid. I would do a focused revision with these goals: + +1. Separate MVP from stretch features +2. Add milestone order +3. Add response schemas and business rules +4. Resolve Section 13 ambiguities directly in the main sections +5. Reduce LLM ambiguity and define failure handling + +If those changes are made, this will become much easier for a junior developer to implement successfully and much less likely to produce mismatched frontend/backend work. From 780584198cc9305a45157b16cebf7c35ad476651 Mon Sep 17 00:00:00 2001 From: "alexey.sizov" <59122515+ivl64@users.noreply.github.com> Date: Sun, 19 Apr 2026 22:50:37 +0200 Subject: [PATCH 3/6] market data --- planning/MARKET_DATA_DESIGN.md | 795 +++++++++++++++++++++++++++++++++ 1 file changed, 795 insertions(+) create mode 100644 planning/MARKET_DATA_DESIGN.md diff --git a/planning/MARKET_DATA_DESIGN.md b/planning/MARKET_DATA_DESIGN.md new file mode 100644 index 00000000..6b0380ea --- /dev/null +++ b/planning/MARKET_DATA_DESIGN.md @@ -0,0 +1,795 @@ +# Market Data Backend — Implementation Design + +**Status:** Fully implemented in `backend/app/market/` (8 modules, ~500 lines, 73 tests). + +This document is the authoritative design reference for the market data subsystem. It +includes the actual implementation code, integration patterns, and rationale for every +major decision. + +--- + +## 1. Architecture + +``` +Environment (MASSIVE_API_KEY) + │ + ├── set & non-empty ──→ MassiveDataSource (Polygon.io REST polling) + └── absent / empty ──→ SimulatorDataSource (GBM price simulation) + │ + ▼ writes prices on every tick + PriceCache (thread-safe in-memory store, version counter) + │ + ├──→ GET /api/stream/prices (SSE — one event per version bump) + ├──→ GET /api/portfolio (current position values) + ├──→ POST /api/portfolio/trade (fill price at execution) + └──→ GET /api/watchlist (prices alongside tickers) +``` + +**Key constraint:** All downstream code reads from `PriceCache` only. Nothing except +the two data sources ever writes to it. This decouples the data source from every +consumer completely. + +--- + +## 2. Module Layout + +``` +backend/app/market/ +├── __init__.py # Public re-exports +├── models.py # PriceUpdate — the canonical price record +├── interface.py # MarketDataSource — abstract base class +├── cache.py # PriceCache — thread-safe price store +├── seed_prices.py # Seed prices and per-ticker GBM parameters +├── simulator.py # GBMSimulator + SimulatorDataSource +├── massive_client.py # MassiveDataSource (Polygon.io REST poller) +├── factory.py # create_market_data_source() — env-driven selection +└── stream.py # FastAPI SSE endpoint +``` + +Public API (everything else imports from here): + +```python +from app.market import ( + PriceCache, + PriceUpdate, + MarketDataSource, + create_market_data_source, + create_stream_router, +) +``` + +--- + +## 3. Data Model: `PriceUpdate` + +Every price change is represented as an immutable `PriceUpdate`. Producers create them; +consumers read them. + +```python +# models.py +from dataclasses import dataclass, field +import time + +@dataclass(frozen=True, slots=True) +class PriceUpdate: + ticker: str + price: float + previous_price: float + timestamp: float = field(default_factory=time.time) # Unix seconds + + @property + def change(self) -> float: + return round(self.price - self.previous_price, 4) + + @property + def change_percent(self) -> float: + if self.previous_price == 0: + return 0.0 + return round((self.price - self.previous_price) / self.previous_price * 100, 4) + + @property + def direction(self) -> str: + if self.price > self.previous_price: + return "up" + elif self.price < self.previous_price: + return "down" + return "flat" + + def to_dict(self) -> dict: + """Serialize for JSON / SSE transmission.""" + return { + "ticker": self.ticker, + "price": self.price, + "previous_price": self.previous_price, + "timestamp": self.timestamp, + "change": self.change, + "change_percent": self.change_percent, + "direction": self.direction, + } +``` + +**Design notes:** +- `frozen=True, slots=True` — immutable and memory-efficient; safe to pass across threads +- `previous_price` is always the price from the immediately preceding update, not a daily open +- `direction` is derived, not stored — stays consistent with price values +- `to_dict()` is the only serialization path; use it for SSE, REST responses, and JSON storage + +--- + +## 4. `PriceCache` — Thread-Safe Price Store + +The cache is the single source of truth for current prices. It is written by one +background task and read by many concurrent request handlers. + +```python +# cache.py +from threading import Lock +from .models import PriceUpdate + +class PriceCache: + def __init__(self) -> None: + self._prices: dict[str, PriceUpdate] = {} + self._lock = Lock() + self._version: int = 0 # Bumped on every update; drives SSE change detection + + def update(self, ticker: str, price: float, timestamp: float | None = None) -> PriceUpdate: + with self._lock: + ts = timestamp or time.time() + prev = self._prices.get(ticker) + previous_price = prev.price if prev else price # First update: flat direction + update = PriceUpdate( + ticker=ticker, + price=round(price, 2), + previous_price=round(previous_price, 2), + timestamp=ts, + ) + self._prices[ticker] = update + self._version += 1 + return update + + def get(self, ticker: str) -> PriceUpdate | None: ... + def get_all(self) -> dict[str, PriceUpdate]: ... # Shallow copy — safe for iteration + def get_price(self, ticker: str) -> float | None: ... + def remove(self, ticker: str) -> None: ... + + @property + def version(self) -> int: + """Monotonically increasing. The SSE generator polls this to detect changes.""" + return self._version +``` + +**Design notes:** +- `threading.Lock` (not `asyncio.Lock`) because `GBMSimulator.step()` runs in the async + event loop but `get_all()` may also be called from sync FastAPI path handlers +- `get_all()` returns a shallow copy so the caller can iterate safely without holding the lock +- `version` counter is the SSE change-detection mechanism: the SSE generator wakes every + 500ms, checks if version changed since its last send, and only transmits if it did + +--- + +## 5. Abstract Interface: `MarketDataSource` + +Both implementations satisfy this contract. Downstream code only types against this ABC. + +```python +# interface.py +from abc import ABC, abstractmethod + +class MarketDataSource(ABC): + + @abstractmethod + async def start(self, tickers: list[str]) -> None: + """Begin producing prices. Starts a background task. Call once at startup.""" + + @abstractmethod + async def stop(self) -> None: + """Cancel background task. Safe to call multiple times.""" + + @abstractmethod + async def add_ticker(self, ticker: str) -> None: + """Add a ticker to the active set. Immediately visible in the next tick.""" + + @abstractmethod + async def remove_ticker(self, ticker: str) -> None: + """Remove a ticker. Also purges it from PriceCache.""" + + @abstractmethod + def get_tickers(self) -> list[str]: + """Return currently tracked tickers.""" +``` + +**Strategy pattern** — swapping `SimulatorDataSource` for `MassiveDataSource` (or any +future source) requires no changes in any other file. + +--- + +## 6. Factory: Environment-Driven Source Selection + +```python +# factory.py +import os + +def create_market_data_source(price_cache: PriceCache) -> MarketDataSource: + api_key = os.environ.get("MASSIVE_API_KEY", "").strip() + if api_key: + logger.info("Market data source: Massive API (real data)") + return MassiveDataSource(api_key=api_key, price_cache=price_cache) + else: + logger.info("Market data source: GBM Simulator") + return SimulatorDataSource(price_cache=price_cache) +``` + +The `.env` file in the project root is loaded into the environment before this runs. +The factory is the only place that inspects `MASSIVE_API_KEY`. + +--- + +## 7. Simulator: `GBMSimulator` + `SimulatorDataSource` + +### Mathematical Model + +Each tick advances every price by one Geometric Brownian Motion step: + +``` +S(t+dt) = S(t) * exp( (mu - sigma²/2) * dt + sigma * sqrt(dt) * Z ) +``` + +| Symbol | Meaning | Example (AAPL) | +|---------|-------------------------------------------------|------------------------| +| S(t) | Current price | $190.00 | +| mu | Annualized drift (expected return) | 0.05 (5%/year) | +| sigma | Annualized volatility | 0.22 (22%/year) | +| dt | Tick duration as fraction of a trading year | ~8.48×10⁻⁸ (500ms) | +| Z | Correlated standard normal draw | (see §7.2) | + +`dt` is calibrated to trading time, not clock time: + +```python +TRADING_SECONDS_PER_YEAR = 252 * 6.5 * 3600 # 5,896,800 +DEFAULT_DT = 0.5 / TRADING_SECONDS_PER_YEAR # ~8.48e-8 +``` + +With this `dt`, each tick produces sub-cent moves. Over an hour (7,200 ticks), the +diffusion term accumulates to realistic intraday swings. + +### Correlated Moves (Cholesky Decomposition) + +Tech stocks tend to move together; finance stocks also correlate. The simulator +models this with a correlation matrix decomposed via Cholesky. + +**Correlation coefficients:** + +| Pair | Coefficient | +|-----------------------------------------|-------------| +| Two tech stocks (AAPL, MSFT, GOOGL…) | 0.60 | +| Two finance stocks (JPM, V) | 0.50 | +| TSLA with any other ticker | 0.30 | +| Cross-sector or unknown pairs | 0.30 | + +**Per-tick generation:** + +```python +def step(self) -> dict[str, float]: + n = len(self._tickers) + z_independent = np.random.standard_normal(n) + z_correlated = self._cholesky @ z_independent # correlated draws + + for i, ticker in enumerate(self._tickers): + mu, sigma = params["mu"], params["sigma"] + drift = (mu - 0.5 * sigma**2) * self._dt + diffusion = sigma * math.sqrt(self._dt) * z_correlated[i] + self._prices[ticker] *= math.exp(drift + diffusion) +``` + +The Cholesky matrix is rebuilt whenever tickers are added or removed. With n < 50 this +is fast enough (O(n²)) to do synchronously. + +### Seed Prices and Per-Ticker Parameters + +```python +# seed_prices.py +SEED_PRICES: dict[str, float] = { + "AAPL": 190.00, "GOOGL": 175.00, "MSFT": 420.00, "AMZN": 185.00, + "TSLA": 250.00, "NVDA": 800.00, "META": 500.00, "JPM": 195.00, + "V": 280.00, "NFLX": 600.00, +} + +TICKER_PARAMS: dict[str, dict[str, float]] = { + "AAPL": {"sigma": 0.22, "mu": 0.05}, + "TSLA": {"sigma": 0.50, "mu": 0.03}, # High vol + "NVDA": {"sigma": 0.40, "mu": 0.08}, # High vol + strong drift + "JPM": {"sigma": 0.18, "mu": 0.04}, # Low vol (bank) + "V": {"sigma": 0.17, "mu": 0.04}, # Low vol (payments) + # ... (all 10 default tickers) +} + +DEFAULT_PARAMS = {"sigma": 0.25, "mu": 0.05} # For dynamically added tickers +``` + +Tickers not in `TICKER_PARAMS` get default params and a random seed price in [$50, $300]. + +### Random Shock Events + +On every tick, each ticker has a 0.1% chance of a sudden 2–5% move: + +```python +if random.random() < 0.001: + shock_magnitude = random.uniform(0.02, 0.05) + shock_sign = random.choice([-1, 1]) + self._prices[ticker] *= (1 + shock_magnitude * shock_sign) +``` + +With 10 tickers at 2 ticks/second: expect ~1 shock per 50 seconds. These create the +visual drama of news-driven moves without modelling news. + +### `SimulatorDataSource` — Async Wrapper + +```python +# simulator.py (SimulatorDataSource) +class SimulatorDataSource(MarketDataSource): + + async def start(self, tickers: list[str]) -> None: + self._sim = GBMSimulator(tickers=tickers, event_probability=self._event_prob) + # Seed the cache immediately so SSE has data on first connection + for ticker in tickers: + price = self._sim.get_price(ticker) + if price is not None: + self._cache.update(ticker=ticker, price=price) + self._task = asyncio.create_task(self._run_loop(), name="simulator-loop") + + async def _run_loop(self) -> None: + while True: + try: + prices = self._sim.step() + for ticker, price in prices.items(): + self._cache.update(ticker=ticker, price=price) + except Exception: + logger.exception("Simulator step failed") + await asyncio.sleep(self._interval) # 500ms default + + async def add_ticker(self, ticker: str) -> None: + self._sim.add_ticker(ticker) # Extends GBM state + rebuilds Cholesky + price = self._sim.get_price(ticker) + if price is not None: + self._cache.update(ticker, price) # Immediately visible to SSE consumers +``` + +--- + +## 8. Massive API Client: `MassiveDataSource` + +### Authentication + +```python +from massive import RESTClient + +client = RESTClient(api_key="YOUR_MASSIVE_API_KEY") +# Wraps https://api.polygon.io — all existing Polygon keys work unchanged +``` + +### Primary Endpoint: Full Snapshot + +One API call fetches all watched tickers simultaneously: + +``` +GET /v2/snapshot/locale/us/markets/stocks/tickers?tickers=AAPL,TSLA,NVDA +``` + +Response shape (relevant fields): + +```json +{ + "tickers": [ + { + "ticker": "AAPL", + "todaysChangePerc": 1.22, + "lastTrade": { + "p": 190.25, + "t": 1712345678000 + }, + "day": { "o": 187.50, "h": 191.20, "l": 186.80, "c": 190.25 }, + "prevDay": { "c": 187.94 } + } + ] +} +``` + +Python SDK (synchronous — must be wrapped in `asyncio.to_thread`): + +```python +from massive.rest.models import SnapshotMarketType + +snapshots = client.get_snapshot_all( + market_type=SnapshotMarketType.STOCKS, + tickers=["AAPL", "TSLA", "NVDA"], +) + +for snap in snapshots: + price = snap.last_trade.price + timestamp = snap.last_trade.timestamp / 1000.0 # ms → seconds + change_pct = snap.todays_change_perc +``` + +### `MassiveDataSource` Implementation + +```python +# massive_client.py +class MassiveDataSource(MarketDataSource): + + async def start(self, tickers: list[str]) -> None: + self._client = RESTClient(api_key=self._api_key) + self._tickers = list(tickers) + await self._poll_once() # Immediate first poll — cache populated before SSE starts + self._task = asyncio.create_task(self._poll_loop(), name="massive-poller") + + async def _poll_loop(self) -> None: + while True: + await asyncio.sleep(self._interval) # 15s default (free tier safe) + await self._poll_once() + + async def _poll_once(self) -> None: + if not self._tickers or not self._client: + return + try: + # RESTClient is synchronous — avoid blocking the event loop + snapshots = await asyncio.to_thread(self._fetch_snapshots) + for snap in snapshots: + price = snap.last_trade.price + timestamp = snap.last_trade.timestamp / 1000.0 + self._cache.update(ticker=snap.ticker, price=price, timestamp=timestamp) + except Exception as e: + logger.error("Massive poll failed: %s", e) + # Don't re-raise — loop retries on next interval + + def _fetch_snapshots(self) -> list: + """Synchronous REST call. Runs in a thread pool via asyncio.to_thread.""" + return self._client.get_snapshot_all( + market_type=SnapshotMarketType.STOCKS, + tickers=self._tickers, + ) +``` + +### Rate Limits and Poll Intervals + +| Plan | Requests/min | Recommended `poll_interval` | +|-----------|--------------|-----------------------------| +| Free | 5 | 15.0s (default) | +| Starter | 100 | 5.0s | +| Developer | 1,000 | 2.0s | +| Advanced | unlimited | 1.0s or lower | + +Override the default: + +```python +MassiveDataSource(api_key=key, price_cache=cache, poll_interval=5.0) +``` + +### Error Handling + +| HTTP | Cause | Behavior | +|------|------------------------------------|---------------------------------| +| 401 | Invalid API key | Logged, loop retries in 15s | +| 403 | Endpoint requires higher tier | Logged, loop retries in 15s | +| 404 | Ticker not found | Snapshot skipped (per-ticker) | +| 429 | Rate limit exceeded | Logged, loop retries in 15s | +| N/A | Network error | Logged, loop retries in 15s | + +All failures are non-fatal. The cache retains the last known prices until the next +successful poll. The SSE stream continues to push stale (but still present) prices. + +### Daily Change % with Massive + +Use `snap.todays_change_perc` directly — Massive computes this from today's open vs. +the last trade price. No additional API call needed. + +--- + +## 9. SSE Streaming Endpoint + +### Event Format + +The SSE endpoint sends all cached prices as a single JSON blob per event: + +``` +retry: 1000 + +data: {"AAPL":{"ticker":"AAPL","price":190.25,"previous_price":190.10,"timestamp":1712345678.0,"change":0.15,"change_percent":0.0789,"direction":"up"},"MSFT":{...},...} + +data: {"AAPL":{"ticker":"AAPL","price":190.27,...},...} +``` + +- One `data:` line per event (no `event:` type — client receives all as `message`) +- All tickers in one payload per event (not one event per ticker) +- `retry: 1000` on connection: browser auto-reconnects after 1 second on disconnect + +### Implementation + +```python +# stream.py +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse +from collections.abc import AsyncGenerator + +router = APIRouter(prefix="/api/stream", tags=["streaming"]) + +def create_stream_router(price_cache: PriceCache) -> APIRouter: + @router.get("/prices") + async def stream_prices(request: Request) -> StreamingResponse: + return StreamingResponse( + _generate_events(price_cache, request), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Prevents nginx from buffering SSE + }, + ) + return router + +async def _generate_events( + price_cache: PriceCache, + request: Request, + interval: float = 0.5, +) -> AsyncGenerator[str, None]: + yield "retry: 1000\n\n" + last_version = -1 + while True: + if await request.is_disconnected(): + break + current_version = price_cache.version + if current_version != last_version: + last_version = current_version + prices = price_cache.get_all() + if prices: + data = {ticker: update.to_dict() for ticker, update in prices.items()} + yield f"data: {json.dumps(data)}\n\n" + await asyncio.sleep(interval) +``` + +**Why version-based change detection?** The SSE generator wakes every 500ms regardless. +Without the version check, it would resend identical data on every wakeup. With the +check, it only transmits when at least one price changed since the last send. + +### Frontend Connection Pattern + +```typescript +const source = new EventSource("/api/stream/prices"); + +source.onmessage = (event) => { + const prices: Record = JSON.parse(event.data); + for (const [ticker, update] of Object.entries(prices)) { + // Flash green/red based on update.direction + updateWatchlistRow(ticker, update); + appendSparklinePoint(ticker, update.price); + } +}; + +source.onerror = () => { + // EventSource auto-reconnects using the retry value we sent (1000ms) + setConnectionStatus("reconnecting"); +}; +``` + +--- + +## 10. FastAPI Integration (App Lifecycle) + +```python +# backend/app/main.py (example) +from contextlib import asynccontextmanager +from fastapi import FastAPI +from app.market import PriceCache, create_market_data_source, create_stream_router + +DEFAULT_TICKERS = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", + "NVDA", "META", "JPM", "V", "NFLX"] + +price_cache = PriceCache() +market_source = create_market_data_source(price_cache) + +@asynccontextmanager +async def lifespan(app: FastAPI): + await market_source.start(DEFAULT_TICKERS) + yield + await market_source.stop() + +app = FastAPI(lifespan=lifespan) +app.include_router(create_stream_router(price_cache)) +``` + +`price_cache` and `market_source` are module-level singletons — no dependency injection +framework needed for a single-user app. + +--- + +## 11. Dynamic Watchlist Integration + +Watchlist API routes must call `market_source` to keep prices in sync: + +```python +# backend/app/routes/watchlist.py (example) +from fastapi import APIRouter + +router = APIRouter(prefix="/api/watchlist") + +@router.post("/") +async def add_to_watchlist(body: AddTickerRequest): + # 1. Write to database + await db.add_watchlist_entry(ticker=body.ticker) + # 2. Start tracking prices immediately + await market_source.add_ticker(body.ticker) + return {"ticker": body.ticker, "status": "added"} + +@router.delete("/{ticker}") +async def remove_from_watchlist(ticker: str): + # 1. Remove from database + await db.remove_watchlist_entry(ticker=ticker) + # 2. Stop tracking — also purges from PriceCache + await market_source.remove_ticker(ticker) + return {"ticker": ticker, "status": "removed"} +``` + +After `add_ticker()`: +- **Simulator:** seed price written to cache immediately; appears in next SSE event (~500ms) +- **Massive:** ticker added to the poll list; appears after the next poll (up to 15s) + +After `remove_ticker()`: +- Price removed from cache immediately; SSE stops including it on the next send + +--- + +## 12. Portfolio Integration + +Reading prices for trade execution and portfolio valuation: + +```python +# Get price for a single ticker (trade execution) +price = price_cache.get_price("AAPL") +if price is None: + raise HTTPException(400, "Price unavailable for AAPL") + +# Get all prices for portfolio valuation +all_prices = price_cache.get_all() # dict[str, PriceUpdate] +for ticker, position in positions.items(): + current_price = all_prices.get(ticker) + if current_price: + unrealized_pnl = (current_price.price - position.avg_cost) * position.quantity +``` + +**Important:** `price_cache.get_all()` returns a snapshot. Take it once per request, +not once per ticker, to get consistent prices across a portfolio valuation. + +--- + +## 13. "Daily Change %" — Simulator vs. Massive + +| Source | Definition of daily change | Where to find it | +|-----------|-----------------------------------------------|-------------------------------------| +| Simulator | % change from seed price at process startup | `(current - SEED_PRICES[t]) / SEED_PRICES[t] * 100` | +| Massive | % change from today's market open | `snap.todays_change_perc` | + +For the simulator, compute it in the backend using `seed_prices.SEED_PRICES` as the +reference. Include it in the SSE payload or the watchlist REST response — the frontend +should not need to know which data source is active. + +Recommended: add a `daily_change_percent` field to `PriceUpdate.to_dict()` for the +watchlist response, and compute it in the data source layer: + +```python +# In SimulatorDataSource, track seed prices and expose them: +daily_change_pct = (current_price - seed) / seed * 100 + +# In MassiveDataSource._poll_once(): +daily_change_pct = snap.todays_change_perc # Already computed by Polygon +``` + +--- + +## 14. Testing + +### Running the Tests + +```bash +cd backend +uv run --extra dev pytest tests/market/ -v # Market data tests only +uv run --extra dev pytest --cov=app # All tests with coverage +``` + +### Test Matrix + +| Module | Tests | What it covers | +|-----------------------------|-------|----------------------------------------------------------| +| `test_models.py` | 11 | PriceUpdate fields, properties, direction, `to_dict()` | +| `test_cache.py` | 13 | Thread safety, version counter, update/get/remove | +| `test_simulator.py` | 17 | GBM math, Cholesky structure, shocks, add/remove tickers | +| `test_simulator_source.py` | 10 | Async lifecycle, cache integration, dynamic tickers | +| `test_factory.py` | 7 | Env var detection, correct class returned | +| `test_massive.py` | 13 | Snapshot parsing, poll loop, error handling | + +### Example Test: Cache Thread Safety + +```python +# tests/market/test_cache.py +import threading + +def test_concurrent_updates_are_consistent(): + cache = PriceCache() + + def writer(ticker, prices): + for price in prices: + cache.update(ticker, price) + + threads = [ + threading.Thread(target=writer, args=("AAPL", range(100))), + threading.Thread(target=writer, args=("MSFT", range(100))), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert cache.get("AAPL") is not None + assert cache.get("MSFT") is not None + assert cache.version == 200 +``` + +### Example Test: GBM Math Validity + +```python +# tests/market/test_simulator.py +def test_gbm_prices_stay_positive(): + sim = GBMSimulator(["AAPL", "TSLA"]) + for _ in range(1000): + prices = sim.step() + assert all(p > 0 for p in prices.values()) + +def test_correlated_moves_use_cholesky(): + sim = GBMSimulator(["AAPL", "MSFT"]) + assert sim._cholesky is not None + assert sim._cholesky.shape == (2, 2) +``` + +### Example Test: Factory Selection + +```python +# tests/market/test_factory.py +import os + +def test_returns_simulator_when_no_key(monkeypatch): + monkeypatch.delenv("MASSIVE_API_KEY", raising=False) + source = create_market_data_source(PriceCache()) + assert isinstance(source, SimulatorDataSource) + +def test_returns_massive_when_key_set(monkeypatch): + monkeypatch.setenv("MASSIVE_API_KEY", "test_key_123") + source = create_market_data_source(PriceCache()) + assert isinstance(source, MassiveDataSource) +``` + +--- + +## 15. Terminal Demo + +A standalone Rich terminal demo visualizes live simulated prices: + +```bash +cd backend +uv run market_data_demo.py +``` + +Displays a live-updating table: all 10 tickers, current price, direction arrow, change %, +and a sparkline. Runs for 60 seconds or until Ctrl+C. Useful for verifying the simulator +parameters produce realistic-looking price action. + +--- + +## 16. Adding a New Data Source + +To add a third data source (e.g., a WebSocket feed): + +1. Create `backend/app/market/websocket_client.py` +2. Implement `MarketDataSource` (all 5 abstract methods) +3. Add a branch in `factory.py` checking a new env var (e.g., `WS_FEED_URL`) +4. Add tests in `tests/market/test_websocket_client.py` + +No changes needed in `cache.py`, `stream.py`, any route, or any consumer. The strategy +pattern pays for itself here. From 14e9f0ffed97539189bd9158d680d62fcf25cc1f Mon Sep 17 00:00:00 2001 From: "alexey.sizov" <59122515+ivl64@users.noreply.github.com> Date: Sun, 19 Apr 2026 23:05:49 +0200 Subject: [PATCH 4/6] workflow adjusted --- .github/workflows/claude.yml | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/.github/workflows/claude.yml b/.github/workflows/claude.yml index d300267f..d4d0b6fb 100644 --- a/.github/workflows/claude.yml +++ b/.github/workflows/claude.yml @@ -19,14 +19,14 @@ jobs: (github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude'))) runs-on: ubuntu-latest permissions: - contents: read - pull-requests: read - issues: read + contents: write + pull-requests: write + issues: write id-token: write actions: read # Required for Claude to read CI results on PRs steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 1 @@ -34,17 +34,25 @@ jobs: id: claude uses: anthropics/claude-code-action@v1 with: - claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} - # This is an optional setting that allows Claude to read CI results on PRs - additional_permissions: | - actions: read + # Optional: Customize the trigger phrase (default: @claude) + # trigger_phrase: "/claude" - # Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it. - # prompt: 'Update the pull request description to include a summary of changes.' + # Optional: Trigger when specific user is assigned to an issue + # assignee_trigger: "claude-bot" - # Optional: Add claude_args to customize behavior and configuration - # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md - # or https://code.claude.com/docs/en/cli-reference for available options - # claude_args: '--allowed-tools Bash(gh pr:*)' + # Optional: Configure Claude's behavior with CLI arguments + # claude_args: | + # --model claude-opus-4-1-20250805 + # --max-turns 10 + # --allowedTools "Bash(npm install),Bash(npm run build),Bash(npm run test:*),Bash(npm run lint:*)" + # --system-prompt "Follow our coding standards. Ensure all new code has tests. Use TypeScript for new files." + # Optional: Advanced settings configuration + # settings: | + # { + # "env": { + # "NODE_ENV": "test" + # } + # } From 1f89c7dd8772dd26d602e83d0252d087beadbe41 Mon Sep 17 00:00:00 2001 From: "alexey.sizov" <59122515+ivl64@users.noreply.github.com> Date: Sun, 19 Apr 2026 23:09:00 +0200 Subject: [PATCH 5/6] "Update Claude PR Assistant workflow" --- .github/workflows/claude.yml | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/.github/workflows/claude.yml b/.github/workflows/claude.yml index d4d0b6fb..6b15fac7 100644 --- a/.github/workflows/claude.yml +++ b/.github/workflows/claude.yml @@ -19,14 +19,14 @@ jobs: (github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude'))) runs-on: ubuntu-latest permissions: - contents: write - pull-requests: write - issues: write + contents: read + pull-requests: read + issues: read id-token: write actions: read # Required for Claude to read CI results on PRs steps: - name: Checkout repository - uses: actions/checkout@v6 + uses: actions/checkout@v4 with: fetch-depth: 1 @@ -34,25 +34,17 @@ jobs: id: claude uses: anthropics/claude-code-action@v1 with: - anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} - # Optional: Customize the trigger phrase (default: @claude) - # trigger_phrase: "/claude" + # This is an optional setting that allows Claude to read CI results on PRs + additional_permissions: | + actions: read - # Optional: Trigger when specific user is assigned to an issue - # assignee_trigger: "claude-bot" + # Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it. + # prompt: 'Update the pull request description to include a summary of changes.' - # Optional: Configure Claude's behavior with CLI arguments - # claude_args: | - # --model claude-opus-4-1-20250805 - # --max-turns 10 - # --allowedTools "Bash(npm install),Bash(npm run build),Bash(npm run test:*),Bash(npm run lint:*)" - # --system-prompt "Follow our coding standards. Ensure all new code has tests. Use TypeScript for new files." + # Optional: Add claude_args to customize behavior and configuration + # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md + # or https://code.claude.com/docs/en/cli-reference for available options + # claude_args: '--allowed-tools Bash(gh pr *)' - # Optional: Advanced settings configuration - # settings: | - # { - # "env": { - # "NODE_ENV": "test" - # } - # } From 357a8492eca63dc201211ebb5cabedd8b4d4f765 Mon Sep 17 00:00:00 2001 From: "alexey.sizov" <59122515+ivl64@users.noreply.github.com> Date: Sun, 19 Apr 2026 23:09:01 +0200 Subject: [PATCH 6/6] "Update Claude Code Review workflow"