-
Notifications
You must be signed in to change notification settings - Fork 65
azure monitor exporter gzip batcher and performance improvements. #1490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
azure monitor exporter gzip batcher and performance improvements. #1490
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1490 +/- ##
==========================================
- Coverage 84.12% 84.08% -0.05%
==========================================
Files 444 451 +7
Lines 125704 128189 +2485
==========================================
+ Hits 105749 107782 +2033
- Misses 19421 19873 +452
Partials 534 534
🚀 New features to boost your workflow:
|
rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/auth.rs
Show resolved
Hide resolved
| if self.total_uncompressed_size == 0 { | ||
| self.buf | ||
| .write_all(b"[") | ||
| .expect("write to memory buffer failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With `expect, any unexpected gzip/memory error crashes the exporter instead of surfacing an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should never happen. I understand the concern, but if this happens it is an indicator of a acritical bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @lalitb. Let's avoid expect usage (panic)
| let json_bytes = | ||
| serde_json::to_vec(&body).map_err(|e| format!("Failed to serialize to JSON: {e}"))?; | ||
| // Clone static headers and add the auth header | ||
| let mut headers = self.static_headers.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are static headers - I believe we should be able to avoid cloning in hot path.
| self.current_uncompressed_size += 1; | ||
| } else { | ||
| self.buf | ||
| .write_all(b",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we append , here, but later when the size check triggers a flush, the batch is emitted as [... ,] without the attempted element. Trailing comma will make the JSON invalid, and most probably fail at ingestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the API handles this just fine, but you are right that this is not a valid json format, so I will fix that as you suggested.
d0d5368 to
413048d
Compare
jmacd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is still in draft form, please take these comments as conversation starters! 😀
rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/auth.rs
Show resolved
Hide resolved
| // TODO - Remove print statements | ||
| #[allow(clippy::print_stdout)] | ||
| pub struct Auth { | ||
| credential: Arc<dyn TokenCredential>, | ||
| scope: String, | ||
| // Thread-safe shared token cache | ||
| cached_token: Arc<RwLock<Option<AccessToken>>>, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // TODO - Remove print statements | |
| #[allow(clippy::print_stdout)] | |
| pub struct Auth { | |
| credential: Arc<dyn TokenCredential>, | |
| scope: String, | |
| // Thread-safe shared token cache | |
| cached_token: Arc<RwLock<Option<AccessToken>>>, | |
| } | |
| // TODO - Consolidate with crates/otap/src/{cloud_auth,object_store)/azure.rs | |
| #[allow(clippy::print_stdout)] | |
| pub struct Auth { | |
| credential: Arc<dyn TokenCredential>, | |
| scope: String, | |
| // Thread-safe shared token cache | |
| cached_token: Arc<RwLock<Option<AccessToken>>>, | |
| } | |
I recognize this is using things already committed in crates/otap/src/experimental/azure_monitor_exporter/config.rs.
Have a look at rust/otap-dataflow/crates/otap/src/cloud_auth/azure.rs and rust/otap-dataflow/crates/otap/src/object_store/azure.rs, added subsequently in #1517. The struct here looks similar to the object_store struct, and there are two similar Auth configs now. It will be nice to see less code and more re-use as we move forward, not to block this PR.
(@lalitb please review. I believe our position should be that Azure auth code/config belongs in an extension, the extension used by parquet_exporter for object_store and by Azure Monitor here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jmacd That's correct. Instead of each exporter (Azure Monitor, Parquet/object_store, etc.) implementing its own auth config and credential creation logic, they should all reference this shared extension - and any common requirements should be met by extending this module rather than adding parallel implementations.
| failed_batch_count: f64, | ||
| failed_msg_count: f64, | ||
| average_client_latency_secs: f64, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have look at a few of the components in the crates/otap/src folder, such as retry_processor.rs, and how there is an existing metrics APIs covering at the Counters here.
As for histogram measurements, I would prefer to hold back. Again, asking @lquerel for opinions: recording histogram instruments could be a thread-local histogram data structure or a message-passing of latency measurements (or both), at which point it may as well be a span message aggregated in the OTel SDK or one of our own pipelines for a latency metric.
We shouldn't reinvent this stuff, see https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs.
I see this as a question of routing histogram measurements (per core) to the central instrumentation collector
@gouslu I understand that some of this is for your own performance studies. Latency and counters aside, can we remove processing_started_at, last_message_received_at, idle_duration, and average_client_latency_secs?
As for the counters, we aim to standardize pipeline metrics, the topic in #487 and a model RFC in the collector RFC https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md. The counters here are fine until we have a more-standard solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need these for myself until I am done. Can put TODO to change this later.
| async fn handle_pdata( | ||
| &mut self, | ||
| effect_handler: &EffectHandler<OtapPdata>, | ||
| request: ExportLogsServiceRequest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be a relatively large performance improvement when we avoid constructing ExportLogsServiceRequest objects and use the view instead. @lalitb please help with pointer and/or an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, @gouslu - you can find the example, and the benchmark here - OtapLogsView
| self.handle_shutdown(&effect_handler).await?; | ||
| return Ok(TerminalState::new( | ||
| deadline, | ||
| std::iter::empty::<otap_df_telemetry::metrics::MetricSetSnapshot>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned in another comment, we should follow other crates/otap component examples of the metrics integration with crates/telemetry: then you'll have a proper MetricsSet at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can look into this in future updates. currently focused on optimizing the performance and refactoring the code.
| } | ||
|
|
||
| pub struct InFlightExports { | ||
| futures: FuturesUnordered<BoxFuture<'static, CompletedExport>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| ─────────────── AzureMonitorExporter ────────────────────────── | ||
| perf │ th/s={:.2} avg_lat={:.2}ms | ||
| success │ rows={:.0} batches={:.0} msgs={:.0} | ||
| fail │ rows={:.0} batches={:.0} msgs={:.0} | ||
| time │ elapsed={:.1}s active={:.1}s idle={:.1}s | ||
| state | batch_to_msg={} msg_to_batch={} msg_to_data={} | ||
| ───────────────────────────────────────────────────────────────\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @cijothomas I've recommended that this component use the crates/telemetry framework, which would mean we could compute performance measurements using the continuous benchmarks. OTOH we would need a server that accepts gzip-compressed-json for the testing. Either way, I've recommended to use the Counter<_> and existing metrics support for now (for counters); asked questions here about logging and histogram measurements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created a TODO for tracking metrics, will work with cijo for perf testing.
|
All of my real questions have to do with instrumentation and the potential for re-use of Azure auth structs. The code looks good! |
9f5cae4 to
29fb7bc
Compare
| pub fn new(config: &Config) -> Result<Self, String> { | ||
| let http_client = Client::builder() | ||
| .timeout(Duration::from_secs(30)) | ||
| .http2_prior_knowledge() // Use HTTP/2 directly (faster, no upgrade negotiation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note on HTTP/2: Since Azure Monitor uses HTTPS, ALPN negotiates HTTP/2 as part of the SSL handshake - so there's no extra round trip. Explicitly adding this configuration provides no benefit.
I'd also recommend benchmarking with both HTTP/1.1 and HTTP/2. With HTTP/2, the Geneva backend restricts the client from creating new connections, meaning all requests are multiplexed over a single connection. When payload sizes are large, this single connection can become a bottleneck and lead to timeouts. I ran into this exact issue with the Geneva exporter and had to switch to HTTP/1.1 to resolve it.
Not a blocker for this PR, but good to do some benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was considering this as well, switched back to 1.1. I did 't have any perf differences, but chose 1.1 because it is better for creating 1 http client per API client, which is what helps improve perf a lot in my case.
be097bf to
c3e4790
Compare
|
|
||
| _ = tokio::time::sleep_until(next_stats_print) => { | ||
| next_stats_print = tokio::time::Instant::now() + tokio::time::Duration::from_secs(STATS_PRINT_INTERVAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK for experimental. We'll want to use the built-in metrics, and we'll want to make the continuous benchmark support this exporter as we've discussed.
Uh oh!
There was an error while loading. Please reload this page.