Skip to content

Conversation

@gouslu
Copy link
Contributor

@gouslu gouslu commented Nov 29, 2025

  • async exports
  • ack/nack support
  • refactoring for better readability
  • other performance improvements leading to 25-30k events per second based on 1kb completely random highly uncompressible events

@github-actions github-actions bot added the rust Pull requests that update Rust code label Nov 29, 2025
@gouslu gouslu changed the title gigla gzip batcher and performance improvements. azure monitor exporter gzip batcher and performance improvements. Nov 29, 2025
@codecov
Copy link

codecov bot commented Nov 29, 2025

Codecov Report

❌ Patch coverage is 81.71937% with 370 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.08%. Comparing base (28554f9) to head (741f6d1).
⚠️ Report is 11 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1490      +/-   ##
==========================================
- Coverage   84.12%   84.08%   -0.05%     
==========================================
  Files         444      451       +7     
  Lines      125704   128189    +2485     
==========================================
+ Hits       105749   107782    +2033     
- Misses      19421    19873     +452     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 85.78% <81.71%> (-0.14%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 89.98% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

if self.total_uncompressed_size == 0 {
self.buf
.write_all(b"[")
.expect("write to memory buffer failed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With `expect, any unexpected gzip/memory error crashes the exporter instead of surfacing an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should never happen. I understand the concern, but if this happens it is an indicator of a acritical bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @lalitb. Let's avoid expect usage (panic)

let json_bytes =
serde_json::to_vec(&body).map_err(|e| format!("Failed to serialize to JSON: {e}"))?;
// Clone static headers and add the auth header
let mut headers = self.static_headers.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are static headers - I believe we should be able to avoid cloning in hot path.

self.current_uncompressed_size += 1;
} else {
self.buf
.write_all(b",")
Copy link
Member

@lalitb lalitb Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we append , here, but later when the size check triggers a flush, the batch is emitted as [... ,] without the attempted element. Trailing comma will make the JSON invalid, and most probably fail at ingestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the API handles this just fine, but you are right that this is not a valid json format, so I will fix that as you suggested.

@gouslu gouslu force-pushed the gouslu/gigla_exporter_batching branch from d0d5368 to 413048d Compare December 3, 2025 04:11
Copy link
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is still in draft form, please take these comments as conversation starters! 😀

Comment on lines 16 to 26
// TODO - Remove print statements
#[allow(clippy::print_stdout)]
pub struct Auth {
credential: Arc<dyn TokenCredential>,
scope: String,
// Thread-safe shared token cache
cached_token: Arc<RwLock<Option<AccessToken>>>,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO - Remove print statements
#[allow(clippy::print_stdout)]
pub struct Auth {
credential: Arc<dyn TokenCredential>,
scope: String,
// Thread-safe shared token cache
cached_token: Arc<RwLock<Option<AccessToken>>>,
}
// TODO - Consolidate with crates/otap/src/{cloud_auth,object_store)/azure.rs
#[allow(clippy::print_stdout)]
pub struct Auth {
credential: Arc<dyn TokenCredential>,
scope: String,
// Thread-safe shared token cache
cached_token: Arc<RwLock<Option<AccessToken>>>,
}

I recognize this is using things already committed in crates/otap/src/experimental/azure_monitor_exporter/config.rs.

Have a look at rust/otap-dataflow/crates/otap/src/cloud_auth/azure.rs and rust/otap-dataflow/crates/otap/src/object_store/azure.rs, added subsequently in #1517. The struct here looks similar to the object_store struct, and there are two similar Auth configs now. It will be nice to see less code and more re-use as we move forward, not to block this PR.

(@lalitb please review. I believe our position should be that Azure auth code/config belongs in an extension, the extension used by parquet_exporter for object_store and by Azure Monitor here.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmacd That's correct. Instead of each exporter (Azure Monitor, Parquet/object_store, etc.) implementing its own auth config and credential creation logic, they should all reference this shared extension - and any common requirements should be met by extending this module rather than adding parallel implementations.

failed_batch_count: f64,
failed_msg_count: f64,
average_client_latency_secs: f64,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have look at a few of the components in the crates/otap/src folder, such as retry_processor.rs, and how there is an existing metrics APIs covering at the Counters here.

As for histogram measurements, I would prefer to hold back. Again, asking @lquerel for opinions: recording histogram instruments could be a thread-local histogram data structure or a message-passing of latency measurements (or both), at which point it may as well be a span message aggregated in the OTel SDK or one of our own pipelines for a latency metric.

We shouldn't reinvent this stuff, see https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs.

I see this as a question of routing histogram measurements (per core) to the central instrumentation collector

@gouslu I understand that some of this is for your own performance studies. Latency and counters aside, can we remove processing_started_at, last_message_received_at, idle_duration, and average_client_latency_secs?

As for the counters, we aim to standardize pipeline metrics, the topic in #487 and a model RFC in the collector RFC https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md. The counters here are fine until we have a more-standard solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need these for myself until I am done. Can put TODO to change this later.

async fn handle_pdata(
&mut self,
effect_handler: &EffectHandler<OtapPdata>,
request: ExportLogsServiceRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be a relatively large performance improvement when we avoid constructing ExportLogsServiceRequest objects and use the view instead. @lalitb please help with pointer and/or an example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, @gouslu - you can find the example, and the benchmark here - OtapLogsView

self.handle_shutdown(&effect_handler).await?;
return Ok(TerminalState::new(
deadline,
std::iter::empty::<otap_df_telemetry::metrics::MetricSetSnapshot>(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned in another comment, we should follow other crates/otap component examples of the metrics integration with crates/telemetry: then you'll have a proper MetricsSet at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into this in future updates. currently focused on optimizing the performance and refactoring the code.

}

pub struct InFlightExports {
futures: FuturesUnordered<BoxFuture<'static, CompletedExport>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 555 to 494
─────────────── AzureMonitorExporter ──────────────────────────
perf │ th/s={:.2} avg_lat={:.2}ms
success │ rows={:.0} batches={:.0} msgs={:.0}
fail │ rows={:.0} batches={:.0} msgs={:.0}
time │ elapsed={:.1}s active={:.1}s idle={:.1}s
state | batch_to_msg={} msg_to_batch={} msg_to_data={}
───────────────────────────────────────────────────────────────\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @cijothomas I've recommended that this component use the crates/telemetry framework, which would mean we could compute performance measurements using the continuous benchmarks. OTOH we would need a server that accepts gzip-compressed-json for the testing. Either way, I've recommended to use the Counter<_> and existing metrics support for now (for counters); asked questions here about logging and histogram measurements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a TODO for tracking metrics, will work with cijo for perf testing.

@jmacd
Copy link
Contributor

jmacd commented Dec 9, 2025

All of my real questions have to do with instrumentation and the potential for re-use of Azure auth structs. The code looks good!

@gouslu gouslu force-pushed the gouslu/gigla_exporter_batching branch from 9f5cae4 to 29fb7bc Compare December 9, 2025 23:53
pub fn new(config: &Config) -> Result<Self, String> {
let http_client = Client::builder()
.timeout(Duration::from_secs(30))
.http2_prior_knowledge() // Use HTTP/2 directly (faster, no upgrade negotiation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note on HTTP/2: Since Azure Monitor uses HTTPS, ALPN negotiates HTTP/2 as part of the SSL handshake - so there's no extra round trip. Explicitly adding this configuration provides no benefit.

I'd also recommend benchmarking with both HTTP/1.1 and HTTP/2. With HTTP/2, the Geneva backend restricts the client from creating new connections, meaning all requests are multiplexed over a single connection. When payload sizes are large, this single connection can become a bottleneck and lead to timeouts. I ran into this exact issue with the Geneva exporter and had to switch to HTTP/1.1 to resolve it.

Not a blocker for this PR, but good to do some benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering this as well, switched back to 1.1. I did 't have any perf differences, but chose 1.1 because it is better for creating 1 http client per API client, which is what helps improve perf a lot in my case.

@gouslu gouslu force-pushed the gouslu/gigla_exporter_batching branch from be097bf to c3e4790 Compare December 11, 2025 19:33
@gouslu gouslu marked this pull request as ready for review December 12, 2025 00:18
@gouslu gouslu requested a review from a team as a code owner December 12, 2025 00:18
Comment on lines +493 to +495

_ = tokio::time::sleep_until(next_stats_print) => {
next_stats_print = tokio::time::Instant::now() + tokio::time::Duration::from_secs(STATS_PRINT_INTERVAL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK for experimental. We'll want to use the built-in metrics, and we'll want to make the continuous benchmark support this exporter as we've discussed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

5 participants