Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .claude/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{
"enabledPlugins": {}
"enabledPlugins": {
"frontend-design@claude-plugins-official": true,
"context7@claude-plugins-official": true,
"playwright@claude-plugins-official": true
}
}
18 changes: 18 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"permissions": {
"allow": [
"Bash(codex exec:*)",
"WebSearch",
"WebFetch(domain:polygon.io)",
"WebFetch(domain:massive.com)",
"Bash(git checkout:*)",
"Bash(git add:*)",
"Bash(git commit -m ':*)",
"Bash(git push:*)",
"Bash(gh pr:*)",
"Bash(uv run:*)",
"Bash(git pull:*)",
"Bash(gh api:*)"
]
}
}
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

All project documentation is in the `planning` directory.

The key document is PLAN.md included in full here:
The key document is PLAN.md included in full below; the market data component has been completed and is summarized in the file `planning/MARKET_DATA_DESIGN.md` with more details in the `planning/archive` folder. Consult these docs only when required. The remainder of the platform is still to be developed.

@planning/PLAN.md
@planning/PLAN.md
59 changes: 59 additions & 0 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Backend — Developer Guide

## Project Setup

```bash
cd backend
uv sync --extra dev # Install all dependencies including test/lint tools
```

## Market Data API

The market data subsystem lives in `app/market/`. Use these imports:

```python
from app.market import PriceCache, PriceUpdate, MarketDataSource, create_market_data_source
```

### Core Types

- **`PriceUpdate`** — Immutable dataclass: `ticker`, `price`, `prev_price`, `open_price`, `timestamp`, plus properties `change`, `change_percent`, `direction` ("up"/"down"/"flat"), and `to_dict()` for JSON serialization. `open_price` is the session-start baseline (never changes). `to_dict()` returns an ISO 8601 timestamp string.

- **`PriceCache`** — Thread-safe in-memory store. Key methods:
- `update(ticker, price, timestamp=None, open_price=None) -> PriceUpdate`
- `get(ticker) -> PriceUpdate | None`
- `get_price(ticker) -> float | None`
- `get_all() -> dict[str, PriceUpdate]`
- `remove(ticker)`
- `version` property — monotonic counter, increments on every update (for SSE change detection)

- **`MarketDataSource`** — Abstract interface implemented by `SimulatorDataSource` and `MassiveDataSource`. Lifecycle: `start(tickers)` -> `add_ticker()` / `remove_ticker()` -> `stop()`.

- **`create_market_data_source(cache)`** — Factory. Returns `MassiveDataSource` if `MASSIVE_API_KEY` is set, otherwise `SimulatorDataSource`.

### SSE Streaming

```python
from app.market import create_stream_router

router = create_stream_router(price_cache) # Returns FastAPI APIRouter
# Endpoint: GET /api/stream/prices (text/event-stream)
```

### Seed Data

Default tickers: AAPL, GOOGL, MSFT, AMZN, TSLA, NVDA, META, JPM, V, NFLX. Seed prices and per-ticker volatility/drift params are in `app/market/seed_prices.py`.

## Running Tests

```bash
uv run --extra dev pytest -v # All tests
uv run --extra dev pytest --cov=app # With coverage
uv run --extra dev ruff check app/ tests/ # Lint
```

## Demo

```bash
uv run market_data_demo.py # Live terminal dashboard with simulated prices
```
24 changes: 19 additions & 5 deletions backend/app/market/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,35 @@ def __init__(self) -> None:
self._lock = Lock()
self._version: int = 0 # Monotonically increasing; bumped on every update

def update(self, ticker: str, price: float, timestamp: float | None = None) -> PriceUpdate:
def update(
self,
ticker: str,
price: float,
timestamp: float | None = None,
open_price: float | None = None,
) -> PriceUpdate:
"""Record a new price for a ticker. Returns the created PriceUpdate.

Automatically computes direction and change from the previous price.
If this is the first update for the ticker, previous_price == price (direction='flat').
open_price is only used on the first update for a ticker; ignored on
subsequent calls so the session-start baseline is never overwritten.
If not provided on the first update, price is used as the baseline.
"""
with self._lock:
ts = timestamp or time.time()
prev = self._prices.get(ticker)
previous_price = prev.price if prev else price

if prev:
prev_price = prev.price
effective_open = prev.open_price # Never overwrite
else:
prev_price = price
effective_open = open_price if open_price is not None else price

update = PriceUpdate(
ticker=ticker,
price=round(price, 2),
previous_price=round(previous_price, 2),
prev_price=round(prev_price, 2),
open_price=round(effective_open, 2),
timestamp=ts,
)
self._prices[ticker] = update
Expand Down
9 changes: 7 additions & 2 deletions backend/app/market/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

from .cache import PriceCache
from .interface import MarketDataSource
from .massive_client import MassiveDataSource
from .simulator import SimulatorDataSource

logger = logging.getLogger(__name__)

Expand All @@ -19,13 +17,20 @@ def create_market_data_source(price_cache: PriceCache) -> MarketDataSource:
- MASSIVE_API_KEY set and non-empty → MassiveDataSource (real market data)
- Otherwise → SimulatorDataSource (GBM simulation)

Imports are lazy so the `massive` package is only required when
MASSIVE_API_KEY is actually set.

Returns an unstarted source. Caller must await source.start(tickers).
"""
api_key = os.environ.get("MASSIVE_API_KEY", "").strip()

if api_key:
from .massive_client import MassiveDataSource # noqa: PLC0415

logger.info("Market data source: Massive API (real data)")
return MassiveDataSource(api_key=api_key, price_cache=price_cache)
else:
from .simulator import SimulatorDataSource # noqa: PLC0415

logger.info("Market data source: GBM Simulator")
return SimulatorDataSource(price_cache=price_cache)
22 changes: 18 additions & 4 deletions backend/app/market/massive_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import asyncio
import logging

from massive import RESTClient
from massive.rest.models import SnapshotMarketType
from typing import Any

from .cache import PriceCache
from .interface import MarketDataSource
Expand All @@ -23,6 +21,9 @@ class MassiveDataSource(MarketDataSource):
Rate limits:
- Free tier: 5 req/min → poll every 15s (default)
- Paid tiers: higher limits → poll every 2-5s

The `massive` package is imported lazily inside start() so it is only
required when MASSIVE_API_KEY is actually set.
"""

def __init__(
Expand All @@ -36,9 +37,13 @@ def __init__(
self._interval = poll_interval
self._tickers: list[str] = []
self._task: asyncio.Task | None = None
self._client: RESTClient | None = None
self._client: Any = None

async def start(self, tickers: list[str]) -> None:
# Lazy import: only required when MASSIVE_API_KEY is set.
# Students without a Massive API key never need this package installed.
from massive import RESTClient # noqa: PLC0415

self._client = RESTClient(api_key=self._api_key)
self._tickers = list(tickers)

Expand Down Expand Up @@ -101,10 +106,17 @@ async def _poll_once(self) -> None:
price = snap.last_trade.price
# Massive timestamps are Unix milliseconds → convert to seconds
timestamp = snap.last_trade.timestamp / 1000.0
# Use day.open as open_price; fall back to prev_day.close pre-market
open_price = None
if snap.day and snap.day.open:
open_price = snap.day.open
elif snap.prev_day and snap.prev_day.close:
open_price = snap.prev_day.close
self._cache.update(
ticker=snap.ticker,
price=price,
timestamp=timestamp,
open_price=open_price,
)
processed += 1
except (AttributeError, TypeError) as e:
Expand All @@ -122,6 +134,8 @@ async def _poll_once(self) -> None:

def _fetch_snapshots(self) -> list:
"""Synchronous call to the Massive REST API. Runs in a thread."""
from massive.rest.models import SnapshotMarketType # noqa: PLC0415

return self._client.get_snapshot_all(
market_type=SnapshotMarketType.STOCKS,
tickers=self._tickers,
Expand Down
27 changes: 18 additions & 9 deletions backend/app/market/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import time
from dataclasses import dataclass, field
from datetime import datetime, timezone


@dataclass(frozen=True, slots=True)
Expand All @@ -12,37 +13,45 @@ class PriceUpdate:

ticker: str
price: float
previous_price: float
prev_price: float # Price from the previous update
open_price: float # Session-start seed price — set once, never overwritten
timestamp: float = field(default_factory=time.time) # Unix seconds

@property
def change(self) -> float:
"""Absolute price change from previous update."""
return round(self.price - self.previous_price, 4)
return round(self.price - self.prev_price, 4)

@property
def change_percent(self) -> float:
"""Percentage change from previous update."""
if self.previous_price == 0:
if self.prev_price == 0:
return 0.0
return round((self.price - self.previous_price) / self.previous_price * 100, 4)
return round((self.price - self.prev_price) / self.prev_price * 100, 4)

@property
def direction(self) -> str:
"""'up', 'down', or 'flat'."""
if self.price > self.previous_price:
if self.price > self.prev_price:
return "up"
elif self.price < self.previous_price:
elif self.price < self.prev_price:
return "down"
return "flat"

def to_dict(self) -> dict:
"""Serialize for JSON / SSE transmission."""
"""Serialize for JSON / SSE transmission.

timestamp is formatted as ISO 8601 UTC string per PLAN.md §6.
"""
ts_iso = datetime.fromtimestamp(self.timestamp, tz=timezone.utc).isoformat().replace(
"+00:00", "Z"
)
return {
"ticker": self.ticker,
"price": self.price,
"previous_price": self.previous_price,
"timestamp": self.timestamp,
"prev_price": self.prev_price,
"open_price": self.open_price,
"timestamp": ts_iso,
"change": self.change,
"change_percent": self.change_percent,
"direction": self.direction,
Expand Down
7 changes: 5 additions & 2 deletions backend/app/market/seed_prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
# Default parameters for tickers not in the list above (dynamically added)
DEFAULT_PARAMS: dict[str, float] = {"sigma": 0.25, "mu": 0.05}

# Default seed price for unknown tickers (per PLAN.md §6)
DEFAULT_SEED_PRICE: float = 100.00

# Correlation groups for the simulator's Cholesky decomposition
# Tickers in the same group have higher intra-group correlation
CORRELATION_GROUPS: dict[str, set[str]] = {
Expand All @@ -41,7 +44,7 @@
}

# Correlation coefficients
INTRA_TECH_CORR = 0.6 # Tech stocks move together
INTRA_TECH_CORR = 0.6 # Tech stocks move together
INTRA_FINANCE_CORR = 0.5 # Finance stocks move together
CROSS_GROUP_CORR = 0.3 # Between sectors / unknown tickers
TSLA_CORR = 0.3 # TSLA does its own thing
TSLA_CORR = 0.25 # TSLA does its own thing (issue #9)
3 changes: 2 additions & 1 deletion backend/app/market/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CORRELATION_GROUPS,
CROSS_GROUP_CORR,
DEFAULT_PARAMS,
DEFAULT_SEED_PRICE,
INTRA_FINANCE_CORR,
INTRA_TECH_CORR,
SEED_PRICES,
Expand Down Expand Up @@ -148,7 +149,7 @@ def _add_ticker_internal(self, ticker: str) -> None:
if ticker in self._prices:
return
self._tickers.append(ticker)
self._prices[ticker] = SEED_PRICES.get(ticker, random.uniform(50.0, 300.0))
self._prices[ticker] = SEED_PRICES.get(ticker, DEFAULT_SEED_PRICE)
self._params[ticker] = TICKER_PARAMS.get(ticker, dict(DEFAULT_PARAMS))

def _rebuild_cholesky(self) -> None:
Expand Down
26 changes: 13 additions & 13 deletions backend/app/market/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/stream", tags=["streaming"])


def create_stream_router(price_cache: PriceCache) -> APIRouter:
"""Create the SSE streaming router with a reference to the price cache.

This factory pattern lets us inject the PriceCache without globals.
Returns a fresh APIRouter each call so the function is safe to call
multiple times (e.g. in tests) without registering duplicate routes.
"""
router = APIRouter(prefix="/api/stream", tags=["streaming"])

@router.get("/prices")
async def stream_prices(request: Request) -> StreamingResponse:
"""SSE endpoint for live price updates.

Streams all tracked ticker prices every ~500ms. The client connects
with EventSource and receives events in the format:
Streams one event per ticker every ~500ms. Each event is a JSON
object for a single ticker (per PLAN.md §6):

data: {"AAPL": {"ticker": "AAPL", "price": 190.50, ...}, ...}
data: {"ticker":"AAPL","price":190.50,"prev_price":190.42,
"open_price":190.00,"timestamp":"2026-04-10T12:00:00.500Z",
"direction":"up"}

Includes a retry directive so the browser auto-reconnects on
disconnection (EventSource built-in behavior).
disconnection (EventSource built-in behaviour).
"""
return StreamingResponse(
_generate_events(price_cache, request),
Expand All @@ -55,8 +57,8 @@ async def _generate_events(
) -> AsyncGenerator[str, None]:
"""Async generator that yields SSE-formatted price events.

Sends all prices every `interval` seconds. Stops when the client
disconnects (detected via request.is_disconnected()).
Sends one event per ticker every `interval` seconds. Stops when the
client disconnects (detected via request.is_disconnected()).
"""
# Tell the client to retry after 1 second if the connection drops
yield "retry: 1000\n\n"
Expand All @@ -76,10 +78,8 @@ async def _generate_events(
if current_version != last_version:
last_version = current_version
prices = price_cache.get_all()

if prices:
data = {ticker: update.to_dict() for ticker, update in prices.items()}
payload = json.dumps(data)
for update in prices.values():
payload = json.dumps(update.to_dict())
yield f"data: {payload}\n\n"

await asyncio.sleep(interval)
Expand Down
Loading