ingest billing metrics in internal stream#1448
Conversation
- fetch billing metrics from all live nodes - convert to event - ingest in pbilling stream - like cluster metrics, fetch every minute in the same scheduled task
WalkthroughAdds a billing metrics subsystem that collects, normalizes, and ingests per-node billing events into a new Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler as Cluster Scheduler
participant Cluster as cluster/mod.rs
participant Nodes as Cluster Nodes
participant Prom as Prometheus
participant Collector as BillingMetricsCollector
participant Ingest as Ingest
participant PMETA as "pmeta (PMETA)"
participant PBILL as "pbilling (PBILLING)"
Scheduler->>Cluster: collect_cluster_metrics()
Cluster->>Nodes: fetch metrics (parallel)
Nodes-->>Cluster: cluster metrics
Cluster->>PMETA: ingest cluster metrics
Note over Cluster: billing metrics flow (new)
Cluster->>Nodes: fetch_nodes_billing_metrics() (parallel)
Nodes->>Prom: query Prometheus samples
Prom-->>Nodes: samples (labels & values)
Nodes-->>Cluster: samples
Cluster->>Collector: extract_billing_metrics_from_samples()
Collector-->>Cluster: BillingMetricEvent[]
Cluster->>Ingest: ingest billing events
Ingest->>PBILL: write billing events
PBILL-->>Ingest: success
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/migration/stream_metadata_migration.rs (1)
161-174: v4_v5 misclassifies pbilling as UserDefined when stream_type is absent.Stream type is set to Internal only for PMETA_STREAM_NAME; BILLING_METRICS_STREAM_NAME must be included.
- if stream_type.is_none() { - if stream_name.eq(PMETA_STREAM_NAME) { + if stream_type.is_none() { + if stream_name.eq(PMETA_STREAM_NAME) || stream_name.eq(BILLING_METRICS_STREAM_NAME) { stream_metadata_map.insert( "stream_type".to_owned(), Value::String(storage::StreamType::Internal.to_string()), ); } else {Add unit tests that assert Internal for both "pmeta" and "pbilling" in v4_v5. Based on learnings.
🧹 Nitpick comments (9)
src/hottier.rs (1)
699-712: Only PMETA gets a hot tier; clarify pbilling intent.If pbilling also needs hot tier in some deployments, mirror this block for BILLING_METRICS_STREAM_NAME; else add a brief comment stating pbilling is intentionally excluded.
src/parseable/mod.rs (2)
396-407: Align PMETA log_source with internal ingestion change.Internal ingestion writes FORMAT_KEY=json; PMETA here is created with LogSource::Pmeta. Prefer consistency.
- let log_source_entry = LogSourceEntry::new(LogSource::Pmeta, HashSet::new()); + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new());Legacy stores remain covered by v5→v6 mapping from "Pmeta"→"pmeta". Add a brief note in release docs.
435-437: Sync order/headers OK; optionally collapse duplicate calls.You can DRY this by looping over [PMETA_STREAM_NAME, BILLING_METRICS_STREAM_NAME].
src/storage/object_storage.rs (1)
607-615: Early return is unprotected by upstream guards; consider clarifying invariant or adding trace logs.Verification found that the early return bypassing merge semantics is NOT protected as the review assumed:
- In
migration/mod.rs(line 248): called unconditionally as a fallback with no upstream guards- In
parseable/mod.rs(line 333): caller only checksmetastore.list_streams(), not in-memory state, so the early return can still executeThe different code paths (early return fetches metastore JSON; normal path merges ingestor sources) represent different behavior that could mask races or redundant calls. Suggestions remain valid:
- Add a trace log documenting the early-return scenario and where it's expected
- Or enforce the invariant at callers: ensure they check
PARSEABLE.get_stream()before calling this methodsrc/handlers/http/cluster/mod.rs (5)
72-87: Event schema: prefer timezone-aware timestamp; dedupe literalConsider:
- Use DateTime for event_time to avoid timezone ambiguity and cross-language parsing issues.
- Hoist "billing-metrics" into a constant to avoid magic strings.
Example constant and usage:
+const BILLING_EVENT_TYPE: &str = "billing-metrics";Then replace event_type assignments with:
- event_type: "billing-metrics".to_string(), + event_type: BILLING_EVENT_TYPE.to_string(),Also verify chrono's serde feature is enabled so event_time serializes correctly.
1239-1257: Robustness: guard non-finite/negative counter valuesBefore dispatching by metric kind, skip NaN/∞ (and optionally negatives) to avoid bogus events from scraped samples.
fn process_sample( collector: &mut BillingMetricsCollector, sample: &prometheus_parse::Sample, val: f64, ) { + if !val.is_finite() { + return; + } match sample.metric.as_str() {
1402-1455: Potential compile break: NodeType::to_string requires DisplayIf NodeType doesn't implement Display, this won't compile. Safer default is Debug formatting.
- node.node_type().to_string(), + format!("{:?}", node.node_type()),Would you prefer I open a quick follow-up to implement Display for NodeType instead?
1457-1486: Unbounded concurrency on node fetches; cap to protect the clusterLarge clusters could create a burst of concurrent requests. Mirror the limited concurrency pattern used elsewhere.
- .buffer_unordered(nodes_len) // No concurrency limit + .buffer_unordered(MAX_CONCURRENT_BILLING_FETCHES)Add near other consts:
const MAX_CONCURRENT_BILLING_FETCHES: usize = 16;
1563-1617: Scheduler ingestion flow: bootstrap pbilling and add context
- If pbilling stream is missing, ingestion will fail every minute. Ensure stream bootstrap (create if absent) before first ingestion.
- Consider adding anyhow::Context on failures to aid ops triage.
Would you like me to add a small bootstrap step (create internal streams pmeta/pbilling if not found) in initialization?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/handlers/http/cluster/mod.rs(4 hunks)src/handlers/http/ingest.rs(1 hunks)src/hottier.rs(2 hunks)src/migration/stream_metadata_migration.rs(2 hunks)src/parseable/mod.rs(3 hunks)src/storage/object_storage.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.
Applied to files:
src/migration/stream_metadata_migration.rs
🧬 Code graph analysis (4)
src/storage/object_storage.rs (1)
src/parseable/mod.rs (1)
new(180-194)
src/migration/stream_metadata_migration.rs (1)
src/validator.rs (1)
stream_name(36-71)
src/parseable/mod.rs (2)
src/handlers/http/cluster/mod.rs (2)
sync_streams_with_ingestors(320-372)new(110-117)src/event/format/mod.rs (1)
new(126-131)
src/handlers/http/cluster/mod.rs (3)
src/handlers/http/modal/mod.rs (7)
new(288-310)node_type(569-569)node_type(582-584)domain_name(567-567)domain_name(574-576)serde_json(372-372)serde_json(620-620)src/handlers/http/mod.rs (1)
base_path_without_preceding_slash(77-79)src/handlers/http/ingest.rs (1)
ingest_internal_stream(133-156)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/handlers/http/ingest.rs (1)
139-140: Metadata mismatch confirmed but appears intentional—verify design rationale.The mismatch is real: PMETA stream created with
LogSource::Pmeta(line 397) but events ingested withFORMAT_KEY="json"(line 139). However, the comment "For internal streams, use old schema" combined with explicitSchemaVersion::V0usage suggests this is intentional backward compatibility design.The
p_formatfield is not validated downstream—FORMAT_VERIFY_KEYis the active validation field. No functional breakage occurs, but the metadata inconsistency could confuse operators or future maintainers.Verification needed: Confirm whether setting FORMAT_KEY to
LogSource::Jsonfor PMETA is the correct design choice, or if it should useLogSource::Pmetato align with stream creation metadata.src/handlers/http/cluster/mod.rs (2)
61-62: Streams: ensure pbilling exists and PMETA rename is completeLooks good. Please confirm:
- pbilling internal stream is created at startup (schema matches BillingMetricEvent).
- All INTERNAL_STREAM_NAME references are updated to PMETA_STREAM_NAME across the repo.
1221-1237: LGTM: scrape-to-events pipeline shapeClean separation: scrape -> collect -> flatten. Good foundation for future metric types.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/parseable/mod.rs (1)
396-456: Excellent implementation of resilient dual-stream initialization!The refactored logic correctly handles all scenarios for creating pmeta and pbilling internal streams:
- ✅ Individual creation failures are logged but don't abort server initialization
- ✅ Successfully created streams are properly synced with ingestors
- ✅ Early return optimization when both streams already exist
- ✅ Sync errors are logged but don't abort initialization
- ✅ Always returns
Ok(())for initialization resilienceThe implementation aligns perfectly with the recorded learning and addresses the past review discussion. Based on learnings.
Optional refinement:
The comment at line 419 could be more precise:
- // Check if either stream creation failed + // Log any stream creation failures
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/mod.rs(3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-21T02:22:24.392Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.392Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/parseable/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/parseable/mod.rs
🧬 Code graph analysis (1)
src/parseable/mod.rs (1)
src/handlers/http/cluster/mod.rs (2)
sync_streams_with_ingestors(320-372)new(110-117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/parseable/mod.rs (1)
52-54: LGTM! Clean import additions.The new imports for
BILLING_METRICS_STREAM_NAME,PMETA_STREAM_NAME, andsync_streams_with_ingestorsare appropriately scoped and necessary for the dual internal stream creation functionality.
Summary by CodeRabbit
New Features
Chores