diff --git a/README.md b/README.md index 983f3fc..70bad56 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,29 @@ modelcost --source all gpt-4o 1000 500 modelcost --json gpt-4o 1000 500 ``` -List available models: +### Cached and reasoning tokens + +Modern LLM APIs charge differently for cached input and reasoning output tokens. +Pass them as optional flags — they default to 0 so existing usage is unchanged. + +```bash +# Cached input tokens (served from prompt cache) +modelcost gpt-4o 1000 500 --cached-input-tokens 200 + +# Cache creation tokens (first-time cache writes) +modelcost gpt-4o 1000 500 --cache-creation-input-tokens 100 + +# Reasoning tokens (subset of output_tokens, e.g. o1/R1 thinking) +modelcost deepseek/deepseek-r1 2000 5000 --reasoning-tokens 3000 + +# All together +modelcost gpt-4.1-mini 1000 500 \ + --cached-input-tokens 200 \ + --cache-creation-input-tokens 100 \ + --reasoning-tokens 150 +``` + +### List models ```bash modelcost models @@ -40,7 +62,7 @@ modelcost models --filter gpt modelcost models --json ``` -CLI help: +### Help ```bash modelcost --help @@ -52,13 +74,26 @@ modelcost models --help ```python from modelcost.calculator import calculate_cost, list_models +# Basic usage (backward compatible) result = calculate_cost("gpt-4o", 1000, 500) for source in result.available_sources: print(f"{source.source}: ${source.total_cost_usd:.6f}") -litellm_cost = next(s for s in result.sources if s.source == "litellm") -print(litellm_cost.price_per_million_input, litellm_cost.price_per_million_output) +# With cached and reasoning tokens +result = calculate_cost( + "gpt-4.1-mini", + input_tokens=1000, + output_tokens=500, + cached_input_tokens=200, + cache_creation_input_tokens=100, + reasoning_tokens=150, +) + +s = result.sources[0] +print(f"${s.total_cost_usd:.6f}") +print(f" cache read: ${s.price_per_million_cache_read}/M") +print(f" reasoning: ${s.price_per_million_reasoning}/M") models = list_models("openrouter") ``` @@ -67,16 +102,63 @@ models = list_models("openrouter") `calculate_cost()` returns a `CostResult` with: - `model`, `input_tokens`, `output_tokens` +- `cached_input_tokens`, `cache_creation_input_tokens`, `reasoning_tokens` (0 when not used) - `sources`: list of `SourceCost` objects - `available_sources`: only sources with prices found Each `SourceCost` includes: - `source` - `total_cost_usd` -- `price_per_million_input` -- `price_per_million_output` +- `price_per_million_input`, `price_per_million_output` +- `price_per_million_cache_read`, `price_per_million_cache_creation`, `price_per_million_reasoning` (present only when the source has specific pricing for these) - `error` (when not available) +### Cost formula + +All subset tokens (`cached_input_tokens`, `cache_creation_input_tokens`, `reasoning_tokens`) +are treated as **subsets** of their parent total and are clamped accordingly: + +``` +text_input = input_tokens - cached_input - cache_creation +text_output = output_tokens - reasoning_tokens + +total = text_input * input_rate + + cached_input * cache_read_rate (fallback: input_rate) + + cache_creation * cache_creation_rate (fallback: input_rate) + + text_output * output_rate + + reasoning * reasoning_rate (fallback: output_rate) +``` + +This matches how most APIs report usage — `input_tokens` and `output_tokens` are the +totals including cached/reasoning, and the detail fields are subsets. +When a specific rate is missing, the base rate for that category is used as fallback. + +### JSON output + +`--json` / `result.to_dict()` includes the new fields only when non-zero: + +```json +{ + "model": "gpt-4.1-mini", + "input_tokens": 1000, + "output_tokens": 500, + "cached_input_tokens": 200, + "reasoning_tokens": 150, + "costs": [ + { + "source": "litellm", + "total_cost_usd": 0.001148, + "price_per_million_input": 0.4, + "price_per_million_output": 1.6, + "price_per_million_cache_read": 0.1, + "price_per_million_cache_creation": 0.48, + "price_per_million_reasoning": 1.6, + "error": null + } + ] +} +``` + ## Caching `openrouter` responses are cached in `~/.modelcost_cache.json` for 1 hour. @@ -86,3 +168,4 @@ Each `SourceCost` includes: - Prices are fetched at runtime from the upstream catalogs. - If a model is missing in a source, that source is marked as unavailable. - Network sources are fetched in parallel for the `all` option. +- `tokencost` does not expose cache/reasoning pricing — when used with `source="all"`, its cost may be higher than `litellm` for calls that include cached or reasoning tokens. diff --git a/modelcost/calculator.py b/modelcost/calculator.py index 554c59c..e015599 100644 --- a/modelcost/calculator.py +++ b/modelcost/calculator.py @@ -29,17 +29,38 @@ def calculate_cost( model: str, input_tokens: int, output_tokens: int, + *, + cached_input_tokens: int = 0, + cache_creation_input_tokens: int = 0, + reasoning_tokens: int = 0, source: str = "litellm", ) -> CostResult: - if input_tokens < 0 or output_tokens < 0: - raise ValueError("input_tokens and output_tokens must be >= 0") + if any( + v < 0 + for v in ( + input_tokens, + output_tokens, + cached_input_tokens, + cache_creation_input_tokens, + reasoning_tokens, + ) + ): + raise ValueError("All token counts must be >= 0") if source not in VALID_SOURCES: raise ValueError(f"Invalid source '{source}'. Valid values: {VALID_SOURCES}") active = ["litellm", "openrouter", "tokencost"] if source == "all" else [source] - sources = _fetch_all(model, input_tokens, output_tokens, active) + sources = _fetch_all( + model, + input_tokens, + output_tokens, + active, + cached_input_tokens=cached_input_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + reasoning_tokens=reasoning_tokens, + ) return CostResult( model=model, @@ -47,11 +68,21 @@ def calculate_cost( output_tokens=output_tokens, sources=sources, single_source=source != "all", # output formatting flag + cached_input_tokens=cached_input_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + reasoning_tokens=reasoning_tokens, ) def _fetch_all( - model: str, input_tokens: int, output_tokens: int, active: list[str] + model: str, + input_tokens: int, + output_tokens: int, + active: list[str], + *, + cached_input_tokens: int = 0, + cache_creation_input_tokens: int = 0, + reasoning_tokens: int = 0, ) -> list[SourceCost]: network_tasks = { name: fn @@ -68,7 +99,15 @@ def _fetch_all( with ThreadPoolExecutor(max_workers=len(network_tasks)) as executor: futures = { executor.submit( - _compute, name, fn, model, input_tokens, output_tokens + _compute, + name, + fn, + model, + input_tokens, + output_tokens, + cached_input_tokens=cached_input_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + reasoning_tokens=reasoning_tokens, ): name for name, fn in network_tasks.items() } @@ -84,7 +123,15 @@ def _fetch_all( def _compute( - source_name: str, fetch_fn, model: str, input_tokens: int, output_tokens: int + source_name: str, + fetch_fn, + model: str, + input_tokens: int, + output_tokens: int, + *, + cached_input_tokens: int = 0, + cache_creation_input_tokens: int = 0, + reasoning_tokens: int = 0, ) -> SourceCost: try: prices = fetch_fn() @@ -101,13 +148,44 @@ def _compute( price_per_million_output=None, error=f"Model '{model}' not found", ) - cost = pricing["prompt"] * input_tokens + pricing["completion"] * output_tokens - return SourceCost( + + prompt_rate = pricing["prompt"] + completion_rate = pricing["completion"] + cache_read_rate = pricing.get("cache_read", prompt_rate) + cache_creation_rate = pricing.get("cache_creation", prompt_rate) + reasoning_rate = pricing.get("reasoning", completion_rate) + + # All subset tokens are clamped so they never exceed the parent total. + effective_cached = min(cached_input_tokens, input_tokens) + effective_creation = min( + cache_creation_input_tokens, input_tokens - effective_cached + ) + text_input = input_tokens - effective_cached - effective_creation + + effective_reasoning = min(reasoning_tokens, output_tokens) + text_output = output_tokens - effective_reasoning + + cost = ( + text_input * prompt_rate + + effective_cached * cache_read_rate + + effective_creation * cache_creation_rate + + text_output * completion_rate + + effective_reasoning * reasoning_rate + ) + + result = SourceCost( source=source_name, total_cost_usd=cost, - price_per_million_input=pricing["prompt"] * 1_000_000, - price_per_million_output=pricing["completion"] * 1_000_000, + price_per_million_input=prompt_rate * 1_000_000, + price_per_million_output=completion_rate * 1_000_000, ) + if "cache_read" in pricing: + result.price_per_million_cache_read = cache_read_rate * 1_000_000 + if "cache_creation" in pricing: + result.price_per_million_cache_creation = cache_creation_rate * 1_000_000 + if "reasoning" in pricing: + result.price_per_million_reasoning = reasoning_rate * 1_000_000 + return result except Exception as e: return SourceCost( source=source_name, diff --git a/modelcost/cli.py b/modelcost/cli.py index 6370c61..df190d7 100644 --- a/modelcost/cli.py +++ b/modelcost/cli.py @@ -33,14 +33,50 @@ def main() -> None: type=click.Choice(VALID_SOURCES), help="Pricing source.", ) +@click.option( + "--cached-input-tokens", + default=0, + show_default=False, + type=int, + help="Tokens served from cache (charged at cache-read rate).", +) +@click.option( + "--cache-creation-input-tokens", + default=0, + show_default=False, + type=int, + help="Tokens written to cache for the first time.", +) +@click.option( + "--reasoning-tokens", + default=0, + show_default=False, + type=int, + help="Thinking/reasoning output tokens (subset of output_tokens).", +) @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") def cost_cmd( - model: str, input_tokens: int, output_tokens: int, source: str, as_json: bool + model: str, + input_tokens: int, + output_tokens: int, + source: str, + cached_input_tokens: int, + cache_creation_input_tokens: int, + reasoning_tokens: int, + as_json: bool, ) -> None: """Calculate the cost for MODEL with INPUT_TOKENS and OUTPUT_TOKENS.""" # ── Calculation mode ────────────────────────────────────────────── try: - result = calculate_cost(model, input_tokens, output_tokens, source=source) + result = calculate_cost( + model, + input_tokens, + output_tokens, + cached_input_tokens=cached_input_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + reasoning_tokens=reasoning_tokens, + source=source, + ) except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @@ -57,9 +93,15 @@ def cost_cmd( click.echo(f"unavailable — {s.error}", err=True) sys.exit(1) else: - click.echo( - f"Model: {result.model} ({result.input_tokens} in / {result.output_tokens} out)\n" - ) + parts = [f"{result.input_tokens} in", f"{result.output_tokens} out"] + if result.cached_input_tokens: + parts.append(f"{result.cached_input_tokens} cached") + if result.cache_creation_input_tokens: + parts.append(f"{result.cache_creation_input_tokens} cache-create") + if result.reasoning_tokens: + parts.append(f"{result.reasoning_tokens} reasoning") + header = " / ".join(parts) + click.echo(f"Model: {result.model} ({header})\n") for s in result.sources: if s.available: click.echo(f" [{s.source:<12}] ${s.total_cost_usd:.6f} USD") diff --git a/modelcost/models.py b/modelcost/models.py index 1955a25..e91ade3 100644 --- a/modelcost/models.py +++ b/modelcost/models.py @@ -8,6 +8,9 @@ class SourceCost: price_per_million_input: float | None price_per_million_output: float | None error: str | None = None + price_per_million_cache_read: float | None = None + price_per_million_cache_creation: float | None = None + price_per_million_reasoning: float | None = None @property def available(self) -> bool: @@ -21,24 +24,43 @@ class CostResult: output_tokens: int sources: list[SourceCost] single_source: bool = True # False when source="all" + cached_input_tokens: int = 0 + cache_creation_input_tokens: int = 0 + reasoning_tokens: int = 0 @property def available_sources(self) -> list[SourceCost]: return [s for s in self.sources if s.available] def to_dict(self) -> dict: - return { + costs = [] + for s in self.sources: + entry: dict = { + "source": s.source, + "total_cost_usd": s.total_cost_usd, + "price_per_million_input": s.price_per_million_input, + "price_per_million_output": s.price_per_million_output, + "error": s.error, + } + if s.price_per_million_cache_read is not None: + entry["price_per_million_cache_read"] = s.price_per_million_cache_read + if s.price_per_million_cache_creation is not None: + entry["price_per_million_cache_creation"] = ( + s.price_per_million_cache_creation + ) + if s.price_per_million_reasoning is not None: + entry["price_per_million_reasoning"] = s.price_per_million_reasoning + costs.append(entry) + result: dict = { "model": self.model, "input_tokens": self.input_tokens, "output_tokens": self.output_tokens, - "costs": [ - { - "source": s.source, - "total_cost_usd": s.total_cost_usd, - "price_per_million_input": s.price_per_million_input, - "price_per_million_output": s.price_per_million_output, - "error": s.error, - } - for s in self.sources - ], + "costs": costs, } + if self.cached_input_tokens > 0: + result["cached_input_tokens"] = self.cached_input_tokens + if self.cache_creation_input_tokens > 0: + result["cache_creation_input_tokens"] = self.cache_creation_input_tokens + if self.reasoning_tokens > 0: + result["reasoning_tokens"] = self.reasoning_tokens + return result diff --git a/modelcost/providers/litellm.py b/modelcost/providers/litellm.py index c7d3ea1..fa3c433 100644 --- a/modelcost/providers/litellm.py +++ b/modelcost/providers/litellm.py @@ -9,11 +9,25 @@ def fetch_litellm_prices() -> dict: resp = httpx.get(LITELLM_URL, timeout=10) resp.raise_for_status() - return { - model: { + prices = {} + for model, info in resp.json().items(): + if "input_cost_per_token" not in info or "output_cost_per_token" not in info: + continue + entry: dict = { "prompt": info["input_cost_per_token"], "completion": info["output_cost_per_token"], } - for model, info in resp.json().items() - if "input_cost_per_token" in info and "output_cost_per_token" in info - } + cache_read = info.get( + "cache_read_input_token_cost", + info.get("input_cost_per_token_cache_hit"), + ) + if cache_read is not None: + entry["cache_read"] = cache_read + cache_creation = info.get("cache_creation_input_token_cost") + if cache_creation is not None: + entry["cache_creation"] = cache_creation + reasoning = info.get("output_cost_per_reasoning_token") + if reasoning is not None: + entry["reasoning"] = reasoning + prices[model] = entry + return prices diff --git a/pyproject.toml b/pyproject.toml index 674d1c5..ea9151d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "modelcost" -version = "0.1.5" +version = "0.1.6" description = "Calculate LLM API call costs from token usage" readme = "README.md" license = { text = "MIT" } diff --git a/tests/test_calculator.py b/tests/test_calculator.py index cbed287..5f00149 100644 --- a/tests/test_calculator.py +++ b/tests/test_calculator.py @@ -72,6 +72,18 @@ def test_negative_tokens_raise_value_error(self, input_tokens, output_tokens): with pytest.raises(ValueError, match=r"must be >= 0"): calculate_cost("gpt-4o", input_tokens, output_tokens, source="litellm") + @pytest.mark.parametrize( + ("kwarg", "value"), + [ + ("cached_input_tokens", -1), + ("cache_creation_input_tokens", -1), + ("reasoning_tokens", -1), + ], + ) + def test_negative_extended_tokens_raise_value_error(self, kwarg, value): + with pytest.raises(ValueError, match=r"must be >= 0"): + calculate_cost("gpt-4o", 100, 50, source="litellm", **{kwarg: value}) + def test_all_source_returns_three_sources_in_order(self): litellm_src = SourceCost("litellm", 0.001, 1.0, 2.0) openrouter_src = SourceCost("openrouter", 0.002, 1.5, 3.0) @@ -82,7 +94,7 @@ def test_all_source_returns_three_sources_in_order(self): patch("modelcost.calculator._tokencost_source", return_value=tokencost_src), ): # _compute is called for litellm and openrouter via ThreadPoolExecutor - def compute_side_effect(name, fn, model, inp, out): + def compute_side_effect(name, fn, model, inp, out, **kwargs): if name == "litellm": return litellm_src return openrouter_src @@ -287,3 +299,406 @@ def test_all_is_not_valid_for_list_models(self): def test_valid_sources_constant(self): assert set(VALID_SOURCES) == {"litellm", "openrouter", "tokencost", "all"} + + +# --------------------------------------------------------------------------- +# Extended pricing: cache + reasoning tokens +# --------------------------------------------------------------------------- + +FAKE_PRICES_WITH_EXTRAS = { + "gpt-4.1-mini": { + "prompt": 4e-07, + "completion": 1.6e-06, + "cache_read": 1e-07, + "cache_creation": 4.8e-07, + "reasoning": 1.6e-06, + }, +} + +FAKE_PRICES_NO_EXTRAS = { + "gpt-4o": {"prompt": 0.000003, "completion": 0.000015}, +} + + +class TestComputeExtendedPricing: + """Tests for cached input, cache creation, and reasoning tokens in _compute.""" + + def test_cost_with_all_token_types(self): + """Full cost formula: text_input + cache_read + cache_creation + text_output + reasoning.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_WITH_EXTRAS) + result = _compute( + "litellm", + fetch_fn, + "gpt-4.1-mini", + 1000, # input_tokens (total, includes cached + creation) + 500, # output_tokens (total, includes reasoning) + cached_input_tokens=200, + cache_creation_input_tokens=100, + reasoning_tokens=150, + ) + + # text_input = 1000 - 200 - 100 = 700, text_output = 500 - 150 = 350 + # cost = 700*4e-7 + 200*1e-7 + 100*4.8e-7 + 350*1.6e-6 + 150*1.6e-6 + expected = ( + (1000 - 200 - 100) * 4e-7 + + 200 * 1e-7 + + 100 * 4.8e-7 + + (500 - 150) * 1.6e-6 + + 150 * 1.6e-6 + ) + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_cache_read == pytest.approx(0.1) + assert result.price_per_million_cache_creation == pytest.approx(0.48) + assert result.price_per_million_reasoning == pytest.approx(1.6) + + def test_cache_read_fallback_to_prompt_rate(self): + """When cache_read price is absent, fall back to prompt rate.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_NO_EXTRAS) + result = _compute( + "litellm", + fetch_fn, + "gpt-4o", + 1000, + 500, + cached_input_tokens=200, + ) + + # cache_read falls back to prompt rate; text_input = 1000 - 200 = 800 + expected = (1000 - 200) * 0.000003 + 200 * 0.000003 + 500 * 0.000015 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_cache_read is None + + def test_reasoning_fallback_to_completion_rate(self): + """When reasoning price is absent, fall back to completion rate.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_NO_EXTRAS) + result = _compute( + "litellm", + fetch_fn, + "gpt-4o", + 1000, + 500, + reasoning_tokens=200, + ) + + # reasoning falls back to completion: 200 * 0.000015 + # text output: (500-200) * 0.000015 + expected = 1000 * 0.000003 + 300 * 0.000015 + 200 * 0.000015 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_reasoning is None + + def test_reasoning_tokens_clamped_to_output_tokens(self): + """reasoning_tokens > output_tokens should be clamped.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_WITH_EXTRAS) + result = _compute( + "litellm", + fetch_fn, + "gpt-4.1-mini", + 1000, + 100, # output_tokens + reasoning_tokens=500, # more than output + ) + + # effective_reasoning = min(500, 100) = 100, text_output = 0 + expected = 1000 * 4e-7 + 0 * 1.6e-6 + 100 * 1.6e-6 + assert result.total_cost_usd == pytest.approx(expected) + + def test_cached_tokens_clamped_to_input_tokens(self): + """cached_input_tokens > input_tokens should be clamped.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_WITH_EXTRAS) + result = _compute( + "litellm", + fetch_fn, + "gpt-4.1-mini", + 100, # input_tokens + 500, + cached_input_tokens=800, # more than input + ) + + # effective_cached = min(800, 100) = 100, text_input = 0 + expected = 0 * 4e-7 + 100 * 1e-7 + 500 * 1.6e-6 + assert result.total_cost_usd == pytest.approx(expected) + + def test_cache_creation_clamped_to_remaining_input(self): + """cache_creation is clamped to input_tokens - cached.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_WITH_EXTRAS) + result = _compute( + "litellm", + fetch_fn, + "gpt-4.1-mini", + 500, # input_tokens + 200, + cached_input_tokens=300, + cache_creation_input_tokens=400, # only 200 remain + ) + + # effective_cached = 300, effective_creation = min(400, 500-300) = 200 + # text_input = 500 - 300 - 200 = 0 + expected = 0 * 4e-7 + 300 * 1e-7 + 200 * 4.8e-7 + 200 * 1.6e-6 + assert result.total_cost_usd == pytest.approx(expected) + + def test_backward_compat_no_new_params(self): + """Calling _compute without new params produces same result as before.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_NO_EXTRAS) + result = _compute("litellm", fetch_fn, "gpt-4o", 200, 100) + + expected = 200 * 0.000003 + 100 * 0.000015 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_input == pytest.approx(3.0) + assert result.price_per_million_output == pytest.approx(15.0) + assert result.price_per_million_cache_read is None + assert result.price_per_million_cache_creation is None + assert result.price_per_million_reasoning is None + + def test_extended_fields_populated_only_when_present(self): + """Price-per-million fields for cache/reasoning only set when source has them.""" + fetch_fn = MagicMock(return_value=FAKE_PRICES_WITH_EXTRAS) + result = _compute("litellm", fetch_fn, "gpt-4.1-mini", 100, 50) + + assert result.price_per_million_cache_read is not None + assert result.price_per_million_cache_creation is not None + assert result.price_per_million_reasoning is not None + + +class TestCalculateCostExtended: + """Tests for calculate_cost with the new token parameters.""" + + def test_new_params_threaded_through(self): + """New parameters reach _compute via calculate_cost.""" + with patch( + "modelcost.calculator.fetch_litellm_prices", + return_value=FAKE_PRICES_WITH_EXTRAS, + ): + result = calculate_cost( + "gpt-4.1-mini", + 1000, + 500, + cached_input_tokens=200, + cache_creation_input_tokens=100, + reasoning_tokens=150, + source="litellm", + ) + + s = result.sources[0] + expected = ( + (1000 - 200 - 100) * 4e-7 + + 200 * 1e-7 + + 100 * 4.8e-7 + + (500 - 150) * 1.6e-6 + + 150 * 1.6e-6 + ) + assert s.total_cost_usd == pytest.approx(expected) + + def test_backward_compat_calculate_cost(self): + """Existing call without new params returns identical result.""" + with patch( + "modelcost.calculator.fetch_litellm_prices", + return_value=FAKE_PRICES_NO_EXTRAS, + ): + result = calculate_cost("gpt-4o", 100, 50, source="litellm") + + s = result.sources[0] + expected = 100 * 0.000003 + 50 * 0.000015 + assert s.total_cost_usd == pytest.approx(expected) + + def test_token_counts_stored_in_cost_result(self): + """CostResult must carry the token counts for output.""" + with patch( + "modelcost.calculator.fetch_litellm_prices", + return_value=FAKE_PRICES_WITH_EXTRAS, + ): + result = calculate_cost( + "gpt-4.1-mini", + 1000, + 500, + cached_input_tokens=200, + cache_creation_input_tokens=100, + reasoning_tokens=150, + source="litellm", + ) + + assert result.cached_input_tokens == 200 + assert result.cache_creation_input_tokens == 100 + assert result.reasoning_tokens == 150 + + def test_token_counts_default_zero_in_cost_result(self): + """Without new params, CostResult token counts are 0.""" + with patch( + "modelcost.calculator.fetch_litellm_prices", + return_value=FAKE_PRICES_NO_EXTRAS, + ): + result = calculate_cost("gpt-4o", 100, 50, source="litellm") + + assert result.cached_input_tokens == 0 + assert result.cache_creation_input_tokens == 0 + assert result.reasoning_tokens == 0 + + +# --------------------------------------------------------------------------- +# Real-world model pricing patterns +# --------------------------------------------------------------------------- + +COHERE_EMBED_PRICES = { + "azure_ai/Cohere-embed-v3-multilingual": { + "prompt": 1e-07, + "completion": 0.0, + }, +} + +XAI_GROK_PRICES = { + "xai/grok-3": { + "prompt": 3e-06, + "completion": 1.5e-05, + "cache_read": 7.5e-07, + }, +} + +DEEPSEEK_R1_PRICES = { + "deepseek/deepseek-r1": { + "prompt": 5.5e-07, + "completion": 2.19e-06, + "cache_read": 1.4e-07, + }, +} + +DEEPSEEK_R1_REPLICATE_PRICES = { + "replicate/deepseek-ai/deepseek-r1": { + "prompt": 3.75e-06, + "completion": 1e-05, + "reasoning": 1e-05, + }, +} + + +class TestRealWorldPricingPatterns: + """Cost calculations with realistic model pricing structures.""" + + def test_cohere_embedding_zero_output_cost(self): + """Embedding model: output_cost=0, only input tokens matter.""" + fetch_fn = MagicMock(return_value=COHERE_EMBED_PRICES) + result = _compute( + "litellm", + fetch_fn, + "azure_ai/Cohere-embed-v3-multilingual", + 512, + 0, + ) + + expected = 512 * 1e-07 + 0 * 0.0 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_input == pytest.approx(0.1) + assert result.price_per_million_output == pytest.approx(0.0) + + def test_cohere_embedding_nonzero_output_still_zero_cost(self): + """Edge case: embedding called with output_tokens > 0 but rate is 0.""" + fetch_fn = MagicMock(return_value=COHERE_EMBED_PRICES) + result = _compute( + "litellm", + fetch_fn, + "azure_ai/Cohere-embed-v3-multilingual", + 512, + 128, + ) + + expected = 512 * 1e-07 + 128 * 0.0 + assert result.total_cost_usd == pytest.approx(expected) + + def test_xai_grok_with_cached_tokens(self): + """xAI/Grok: cache_read=25% of input, no reasoning pricing.""" + fetch_fn = MagicMock(return_value=XAI_GROK_PRICES) + result = _compute( + "litellm", + fetch_fn, + "xai/grok-3", + 5000, + 2000, + cached_input_tokens=3000, + ) + + # text_input: (5000-3000) * 3e-6, cache: 3000 * 7.5e-7, output: 2000 * 1.5e-5 + expected = (5000 - 3000) * 3e-06 + 3000 * 7.5e-07 + 2000 * 1.5e-05 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_cache_read == pytest.approx(0.75) + assert result.price_per_million_reasoning is None + + def test_xai_grok_reasoning_falls_back_to_completion(self): + """xAI/Grok has no reasoning price; should fall back to completion rate.""" + fetch_fn = MagicMock(return_value=XAI_GROK_PRICES) + result = _compute( + "litellm", + fetch_fn, + "xai/grok-3", + 1000, + 500, + reasoning_tokens=200, + ) + + # reasoning falls back to completion: both at 1.5e-5 + expected = 1000 * 3e-06 + 300 * 1.5e-05 + 200 * 1.5e-05 + assert result.total_cost_usd == pytest.approx(expected) + + def test_deepseek_r1_cache_hit_pricing(self): + """DeepSeek R1: cache_read from cache_hit fallback; cached clamped to input.""" + fetch_fn = MagicMock(return_value=DEEPSEEK_R1_PRICES) + result = _compute( + "litellm", + fetch_fn, + "deepseek/deepseek-r1", + 10000, + 1000, + cached_input_tokens=8000, + ) + + # text_input: (10000-8000) * 5.5e-7, cache: 8000 * 1.4e-7, output: 1000 * 2.19e-6 + expected = (10000 - 8000) * 5.5e-07 + 8000 * 1.4e-07 + 1000 * 2.19e-06 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_cache_read == pytest.approx(0.14) + + def test_deepseek_r1_replicate_with_reasoning(self): + """DeepSeek R1 on replicate: reasoning=output rate, no cache pricing.""" + fetch_fn = MagicMock(return_value=DEEPSEEK_R1_REPLICATE_PRICES) + result = _compute( + "litellm", + fetch_fn, + "replicate/deepseek-ai/deepseek-r1", + 5000, + 10000, + reasoning_tokens=7000, + ) + + # input: 5000*3.75e-6, text: 3000*1e-5, reasoning: 7000*1e-5 + expected = 5000 * 3.75e-06 + 3000 * 1e-05 + 7000 * 1e-05 + assert result.total_cost_usd == pytest.approx(expected) + assert result.price_per_million_reasoning == pytest.approx(10.0) + assert result.price_per_million_cache_read is None + + def test_deepseek_r1_cache_and_reasoning_combined(self): + """Full scenario: cached input + reasoning on a model that has both.""" + # Hypothetical model with all pricing fields + prices = { + "full-model": { + "prompt": 5.5e-07, + "completion": 2.19e-06, + "cache_read": 1.4e-07, + "reasoning": 3e-06, + }, + } + fetch_fn = MagicMock(return_value=prices) + result = _compute( + "litellm", + fetch_fn, + "full-model", + 10000, # input (total, includes cached) + 5000, # output (total, includes reasoning) + cached_input_tokens=8000, + reasoning_tokens=3000, + ) + + # text_input: (10000-8000)*5.5e-7, cache: 8000*1.4e-7, + # text_out: (5000-3000)*2.19e-6, reasoning: 3000*3e-6 + expected = ( + (10000 - 8000) * 5.5e-07 + + 8000 * 1.4e-07 + + (5000 - 3000) * 2.19e-06 + + 3000 * 3e-06 + ) + assert result.total_cost_usd == pytest.approx(expected) diff --git a/tests/test_cli.py b/tests/test_cli.py index 1a3d0c0..a472f7c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -61,6 +61,22 @@ def _multi_result(): ) +def _multi_result_with_extras(): + return CostResult( + model="gpt-4o", + input_tokens=1000, + output_tokens=500, + sources=[ + SourceCost("litellm", 0.005, 3.0, 15.0), + SourceCost("openrouter", 0.006, 3.2, 16.0), + ], + single_source=False, + cached_input_tokens=200, + cache_creation_input_tokens=100, + reasoning_tokens=150, + ) + + # --------------------------------------------------------------------------- # cost command # --------------------------------------------------------------------------- @@ -139,6 +155,33 @@ def test_unavailable_source_shown_in_multi_mode(self): assert "unavailable" in result.output + def test_multi_source_header_shows_extended_tokens(self): + runner = CliRunner() + with patch( + "modelcost.cli.calculate_cost", + return_value=_multi_result_with_extras(), + ): + result = runner.invoke( + main, ["cost", "gpt-4o", "1000", "500", "--source", "all"] + ) + + assert result.exit_code == 0 + assert "200 cached" in result.output + assert "100 cache-create" in result.output + assert "150 reasoning" in result.output + + def test_multi_source_header_omits_zero_extended_tokens(self): + runner = CliRunner() + with patch("modelcost.cli.calculate_cost", return_value=_multi_result()): + result = runner.invoke( + main, ["cost", "gpt-4o", "100", "50", "--source", "all"] + ) + + assert result.exit_code == 0 + assert "100 in / 50 out)" in result.output + assert "cached" not in result.output + assert "reasoning" not in result.output + def test_source_option_is_forwarded(self): runner = CliRunner() with patch( @@ -149,7 +192,15 @@ def test_source_option_is_forwarded(self): main, ["cost", "gpt-4o", "100", "50", "--source", "openrouter"] ) - mock_calc.assert_called_once_with("gpt-4o", 100, 50, source="openrouter") + mock_calc.assert_called_once_with( + "gpt-4o", + 100, + 50, + cached_input_tokens=0, + cache_creation_input_tokens=0, + reasoning_tokens=0, + source="openrouter", + ) def test_model_and_tokens_forwarded_correctly(self): runner = CliRunner() @@ -159,7 +210,111 @@ def test_model_and_tokens_forwarded_correctly(self): runner.invoke(main, ["cost", "claude-3-5-sonnet", "200", "75"]) mock_calc.assert_called_once_with( - "claude-3-5-sonnet", 200, 75, source="litellm" + "claude-3-5-sonnet", + 200, + 75, + cached_input_tokens=0, + cache_creation_input_tokens=0, + reasoning_tokens=0, + source="litellm", + ) + + def test_cached_input_tokens_flag(self): + runner = CliRunner() + with patch( + "modelcost.cli.calculate_cost", return_value=_single_result() + ) as mock_calc: + runner.invoke( + main, + ["cost", "gpt-4o", "1000", "500", "--cached-input-tokens", "200"], + ) + + mock_calc.assert_called_once_with( + "gpt-4o", + 1000, + 500, + cached_input_tokens=200, + cache_creation_input_tokens=0, + reasoning_tokens=0, + source="litellm", + ) + + def test_cache_creation_input_tokens_flag(self): + runner = CliRunner() + with patch( + "modelcost.cli.calculate_cost", return_value=_single_result() + ) as mock_calc: + runner.invoke( + main, + [ + "cost", + "gpt-4o", + "1000", + "500", + "--cache-creation-input-tokens", + "100", + ], + ) + + mock_calc.assert_called_once_with( + "gpt-4o", + 1000, + 500, + cached_input_tokens=0, + cache_creation_input_tokens=100, + reasoning_tokens=0, + source="litellm", + ) + + def test_reasoning_tokens_flag(self): + runner = CliRunner() + with patch( + "modelcost.cli.calculate_cost", return_value=_single_result() + ) as mock_calc: + runner.invoke( + main, + ["cost", "gpt-4o", "1000", "500", "--reasoning-tokens", "150"], + ) + + mock_calc.assert_called_once_with( + "gpt-4o", + 1000, + 500, + cached_input_tokens=0, + cache_creation_input_tokens=0, + reasoning_tokens=150, + source="litellm", + ) + + def test_all_new_flags_together(self): + runner = CliRunner() + with patch( + "modelcost.cli.calculate_cost", return_value=_single_result() + ) as mock_calc: + runner.invoke( + main, + [ + "cost", + "gpt-4o", + "1000", + "500", + "--cached-input-tokens", + "200", + "--cache-creation-input-tokens", + "100", + "--reasoning-tokens", + "150", + ], + ) + + mock_calc.assert_called_once_with( + "gpt-4o", + 1000, + 500, + cached_input_tokens=200, + cache_creation_input_tokens=100, + reasoning_tokens=150, + source="litellm", ) diff --git a/tests/test_models.py b/tests/test_models.py index 5170a41..ae96aac 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -39,6 +39,31 @@ def test_error_defaults_to_none(self): ) assert s.error is None + def test_new_price_fields_default_to_none(self): + s = SourceCost( + source="litellm", + total_cost_usd=0.001, + price_per_million_input=1.0, + price_per_million_output=2.0, + ) + assert s.price_per_million_cache_read is None + assert s.price_per_million_cache_creation is None + assert s.price_per_million_reasoning is None + + def test_new_price_fields_can_be_set(self): + s = SourceCost( + source="litellm", + total_cost_usd=0.001, + price_per_million_input=1.0, + price_per_million_output=2.0, + price_per_million_cache_read=0.25, + price_per_million_cache_creation=1.5, + price_per_million_reasoning=3.0, + ) + assert s.price_per_million_cache_read == 0.25 + assert s.price_per_million_cache_creation == 1.5 + assert s.price_per_million_reasoning == 3.0 + class TestCostResult: def _make_result(self, sources): @@ -121,3 +146,100 @@ def test_single_source_false_when_set(self): model="m", input_tokens=1, output_tokens=1, sources=[], single_source=False ) assert result.single_source is False + + def test_to_dict_omits_new_fields_when_none(self): + """New price fields must not appear in output when they are None.""" + s = SourceCost("litellm", 0.005, 3.0, 15.0) + result = CostResult( + model="gpt-4o", input_tokens=100, output_tokens=50, sources=[s] + ) + d = result.to_dict() + cost = d["costs"][0] + assert "price_per_million_cache_read" not in cost + assert "price_per_million_cache_creation" not in cost + assert "price_per_million_reasoning" not in cost + + def test_to_dict_includes_new_fields_when_present(self): + """New price fields must appear in output when they are set.""" + s = SourceCost( + "litellm", + 0.005, + 3.0, + 15.0, + price_per_million_cache_read=0.75, + price_per_million_cache_creation=3.75, + price_per_million_reasoning=15.0, + ) + result = CostResult( + model="gpt-4o", input_tokens=100, output_tokens=50, sources=[s] + ) + d = result.to_dict() + cost = d["costs"][0] + assert cost["price_per_million_cache_read"] == 0.75 + assert cost["price_per_million_cache_creation"] == 3.75 + assert cost["price_per_million_reasoning"] == 15.0 + + def test_to_dict_partial_new_fields(self): + """Only non-None new fields should appear.""" + s = SourceCost( + "litellm", + 0.005, + 3.0, + 15.0, + price_per_million_cache_read=0.75, + ) + result = CostResult( + model="gpt-4o", input_tokens=100, output_tokens=50, sources=[s] + ) + d = result.to_dict() + cost = d["costs"][0] + assert cost["price_per_million_cache_read"] == 0.75 + assert "price_per_million_cache_creation" not in cost + assert "price_per_million_reasoning" not in cost + + def test_token_counts_default_to_zero(self): + result = CostResult(model="m", input_tokens=1, output_tokens=1, sources=[]) + assert result.cached_input_tokens == 0 + assert result.cache_creation_input_tokens == 0 + assert result.reasoning_tokens == 0 + + def test_to_dict_omits_token_counts_when_zero(self): + s = SourceCost("litellm", 0.005, 3.0, 15.0) + result = CostResult( + model="gpt-4o", input_tokens=100, output_tokens=50, sources=[s] + ) + d = result.to_dict() + assert "cached_input_tokens" not in d + assert "cache_creation_input_tokens" not in d + assert "reasoning_tokens" not in d + + def test_to_dict_includes_token_counts_when_nonzero(self): + s = SourceCost("litellm", 0.005, 3.0, 15.0) + result = CostResult( + model="gpt-4o", + input_tokens=100, + output_tokens=50, + sources=[s], + cached_input_tokens=200, + cache_creation_input_tokens=50, + reasoning_tokens=30, + ) + d = result.to_dict() + assert d["cached_input_tokens"] == 200 + assert d["cache_creation_input_tokens"] == 50 + assert d["reasoning_tokens"] == 30 + + def test_to_dict_partial_token_counts(self): + """Only nonzero token counts appear.""" + s = SourceCost("litellm", 0.005, 3.0, 15.0) + result = CostResult( + model="gpt-4o", + input_tokens=100, + output_tokens=50, + sources=[s], + cached_input_tokens=200, + ) + d = result.to_dict() + assert d["cached_input_tokens"] == 200 + assert "cache_creation_input_tokens" not in d + assert "reasoning_tokens" not in d diff --git a/tests/test_providers_litellm.py b/tests/test_providers_litellm.py index 0c50752..e2e1d4f 100644 --- a/tests/test_providers_litellm.py +++ b/tests/test_providers_litellm.py @@ -118,3 +118,220 @@ def test_calls_correct_url(self): or "berriAI" in call_url or "github" in call_url ) + + def test_extracts_cache_read_price(self): + raw = { + "gpt-4.1-mini": { + "input_cost_per_token": 4e-07, + "output_cost_per_token": 1.6e-06, + "cache_read_input_token_cost": 1e-07, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert prices["gpt-4.1-mini"]["cache_read"] == 1e-07 + + def test_extracts_cache_creation_price(self): + raw = { + "model-a": { + "input_cost_per_token": 1e-06, + "output_cost_per_token": 2e-06, + "cache_creation_input_token_cost": 1.5e-06, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert prices["model-a"]["cache_creation"] == 1.5e-06 + + def test_extracts_reasoning_price(self): + raw = { + "o3": { + "input_cost_per_token": 1e-05, + "output_cost_per_token": 4e-05, + "output_cost_per_reasoning_token": 4e-05, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert prices["o3"]["reasoning"] == 4e-05 + + def test_omits_extra_fields_when_absent(self): + raw = { + "gpt-4o": { + "input_cost_per_token": 0.000003, + "output_cost_per_token": 0.000015, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert "cache_read" not in prices["gpt-4o"] + assert "cache_creation" not in prices["gpt-4o"] + assert "reasoning" not in prices["gpt-4o"] + + def test_extracts_all_extra_fields_together(self): + raw = { + "gemini-flash": { + "input_cost_per_token": 2.5e-07, + "output_cost_per_token": 1.5e-06, + "cache_read_input_token_cost": 2.5e-08, + "cache_creation_input_token_cost": 5e-07, + "output_cost_per_reasoning_token": 1.5e-06, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + entry = prices["gemini-flash"] + assert entry["prompt"] == 2.5e-07 + assert entry["completion"] == 1.5e-06 + assert entry["cache_read"] == 2.5e-08 + assert entry["cache_creation"] == 5e-07 + assert entry["reasoning"] == 1.5e-06 + + def test_cache_hit_field_used_as_fallback_for_cache_read(self): + """Models with input_cost_per_token_cache_hit but no cache_read_input_token_cost.""" + raw = { + "deepseek/deepseek-r1": { + "input_cost_per_token": 5.5e-07, + "output_cost_per_token": 2.19e-06, + "input_cost_per_token_cache_hit": 1.4e-07, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert prices["deepseek/deepseek-r1"]["cache_read"] == 1.4e-07 + + def test_cache_read_takes_priority_over_cache_hit(self): + """When both fields exist, cache_read_input_token_cost wins.""" + raw = { + "deepseek/deepseek-chat": { + "input_cost_per_token": 2.8e-07, + "output_cost_per_token": 1.1e-06, + "cache_read_input_token_cost": 2.8e-08, + "input_cost_per_token_cache_hit": 2.8e-08, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert prices["deepseek/deepseek-chat"]["cache_read"] == 2.8e-08 + + def test_no_cache_read_when_neither_field_present(self): + """No cache_read key when neither cache_read nor cache_hit is present.""" + raw = { + "gpt-4o": { + "input_cost_per_token": 0.000003, + "output_cost_per_token": 0.000015, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert "cache_read" not in prices["gpt-4o"] + + def test_cohere_embedding_zero_output_cost(self): + """Embedding models have output_cost=0; must be included with correct values.""" + raw = { + "azure_ai/Cohere-embed-v3-multilingual": { + "input_cost_per_token": 1e-07, + "output_cost_per_token": 0.0, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + assert "azure_ai/Cohere-embed-v3-multilingual" in prices + entry = prices["azure_ai/Cohere-embed-v3-multilingual"] + assert entry["prompt"] == 1e-07 + assert entry["completion"] == 0.0 + assert "cache_read" not in entry + assert "reasoning" not in entry + + def test_xai_grok_cache_read_only(self): + """xAI/Grok models have cache_read but no cache_hit or reasoning.""" + raw = { + "xai/grok-3": { + "input_cost_per_token": 3e-06, + "output_cost_per_token": 1.5e-05, + "cache_read_input_token_cost": 7.5e-07, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + entry = prices["xai/grok-3"] + assert entry["prompt"] == 3e-06 + assert entry["completion"] == 1.5e-05 + assert entry["cache_read"] == 7.5e-07 + assert "cache_creation" not in entry + assert "reasoning" not in entry + + def test_deepseek_cache_hit_and_reasoning(self): + """DeepSeek R1 on replicate: reasoning field + cache_hit fallback.""" + raw = { + "deepseek/deepseek-r1": { + "input_cost_per_token": 5.5e-07, + "output_cost_per_token": 2.19e-06, + "input_cost_per_token_cache_hit": 1.4e-07, + }, + "replicate/deepseek-ai/deepseek-r1": { + "input_cost_per_token": 3.75e-06, + "output_cost_per_token": 1e-05, + "output_cost_per_reasoning_token": 1e-05, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + ds = prices["deepseek/deepseek-r1"] + assert ds["cache_read"] == 1.4e-07 + assert "reasoning" not in ds + + rep = prices["replicate/deepseek-ai/deepseek-r1"] + assert rep["reasoning"] == 1e-05 + assert "cache_read" not in rep + + def test_rerank_model_zero_costs_included(self): + """Rerank models with input=0 and output=0 pass the filter but yield $0.""" + raw = { + "azure_ai/cohere-rerank-v3-english": { + "input_cost_per_token": 0.0, + "output_cost_per_token": 0.0, + "input_cost_per_query": 0.002, + }, + } + with patch( + "modelcost.providers.litellm.httpx.get", return_value=_make_response(raw) + ): + prices = fetch_litellm_prices() + + entry = prices["azure_ai/cohere-rerank-v3-english"] + assert entry["prompt"] == 0.0 + assert entry["completion"] == 0.0 diff --git a/uv.lock b/uv.lock index 5588007..8fd2202 100644 --- a/uv.lock +++ b/uv.lock @@ -746,7 +746,7 @@ wheels = [ [[package]] name = "modelcost" -version = "0.1.4" +version = "0.1.6" source = { editable = "." } dependencies = [ { name = "click" },