From 61426cb56ee07ab458aca005f1a9099f664851cf Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 4 Feb 2026 21:22:30 +1100 Subject: [PATCH 1/2] chore: improve error logging in ingestion handler add stream name to the error log on ingestion failure --- src/handlers/http/ingest.rs | 104 +++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 25 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index ebee62b88..c79531378 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -89,7 +89,10 @@ pub async fn ingest( log_source, LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces ) { - return Err(PostError::OtelNotSupported); + error!( + "Ingestion failed for stream {stream_name}: OTEL log sources are not supported on /api/v1/ingest endpoint" + ); + return Err(PostError::OtelNotSupported(stream_name)); } let mut p_custom_fields = get_custom_fields_from_header(&req); @@ -116,17 +119,28 @@ pub async fn ingest( vec![log_source_entry.clone()], telemetry_type, ) - .await?; + .await + .map_err(|e| { + error!("Ingestion failed for stream {stream_name}: {e}"); + e + })?; //if stream exists, fetch the stream log source //return error if the stream log source is otel traces or otel metrics or otel logs - validate_stream_for_ingestion(&stream_name, &log_source)?; + validate_stream_for_ingestion(&stream_name, &log_source).map_err(|e| { + error!("Ingestion failed for stream {stream_name}: {e}"); + e + })?; PARSEABLE .add_update_log_source(&stream_name, log_source_entry) - .await?; + .await + .map_err(|e| { + error!("Ingestion failed for stream {stream_name}: {e}"); + e + })?; - flatten_and_push_logs( + if let Err(e) = flatten_and_push_logs( json, &stream_name, &log_source, @@ -134,7 +148,11 @@ pub async fn ingest( None, telemetry_type, ) - .await?; + .await + { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } Ok(HttpResponse::Ok().finish()) } @@ -259,7 +277,7 @@ async fn process_otel_content( { Some(content_type) => { if content_type == CONTENT_TYPE_JSON { - flatten_and_push_logs( + if let Err(e) = flatten_and_push_logs( serde_json::from_slice(&body)?, stream_name, log_source, @@ -267,21 +285,33 @@ async fn process_otel_content( None, telemetry_type, ) - .await?; + .await + { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } } else if content_type == CONTENT_TYPE_PROTOBUF { + error!( + "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" + ); return Err(PostError::Invalid(anyhow::anyhow!( - "Protobuf ingestion is not supported in Parseable OSS" + "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" ))); } else { + error!( + "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" + ); return Err(PostError::Invalid(anyhow::anyhow!( - "Unsupported Content-Type: {}. Expected application/json or application/x-protobuf", - content_type + "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" ))); } } None => { + error!( + "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" + ); return Err(PostError::Invalid(anyhow::anyhow!( - "Missing Content-Type header. Expected application/json or application/x-protobuf" + "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" ))); } } @@ -302,7 +332,11 @@ pub async fn handle_otel_logs_ingestion( &OTEL_LOG_KNOWN_FIELD_LIST, TelemetryType::Logs, ) - .await?; + .await + .map_err(|e| { + error!("OTEL logs ingestion failed: {e}"); + e + })?; process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?; @@ -322,7 +356,11 @@ pub async fn handle_otel_metrics_ingestion( &OTEL_METRICS_KNOWN_FIELD_LIST, TelemetryType::Metrics, ) - .await?; + .await + .map_err(|e| { + error!("OTEL metrics ingestion failed: {e}"); + e + })?; process_otel_content( &req, @@ -349,7 +387,11 @@ pub async fn handle_otel_traces_ingestion( &OTEL_TRACES_KNOWN_FIELD_LIST, TelemetryType::Traces, ) - .await?; + .await + .map_err(|e| { + error!("OTEL traces ingestion failed: {e}"); + e + })?; process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Traces).await?; @@ -401,24 +443,33 @@ pub async fn post_event( let mut json = json.into_inner(); match &log_source { LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::OtelNotSupported); + error!( + "Ingestion failed for stream {stream_name}: OTEL log sources are not supported on /api/v1/logstream endpoint" + ); + return Err(PostError::OtelNotSupported(stream_name)); } LogSource::Custom(src) => { - KNOWN_SCHEMA_LIST.extract_from_inline_log( + if let Err(e) = KNOWN_SCHEMA_LIST.extract_from_inline_log( &mut json, &mut p_custom_fields, src, extract_log, - )?; + ) { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e.into()); + } } _ => {} } //if stream exists, fetch the stream log source //return error if the stream log source is otel traces or otel metrics or otel logs - validate_stream_for_ingestion(&stream_name, &log_source)?; + if let Err(e) = validate_stream_for_ingestion(&stream_name, &log_source) { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } - flatten_and_push_logs( + if let Err(e) = flatten_and_push_logs( json, &stream_name, &log_source, @@ -426,7 +477,11 @@ pub async fn post_event( None, TelemetryType::Logs, ) - .await?; + .await + { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } Ok(HttpResponse::Ok().finish()) } @@ -482,9 +537,9 @@ pub enum PostError { #[error("Error: {0}")] JsonFlattenError(#[from] JsonFlattenError), #[error( - "Use the endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" + "Ingestion failed for stream {0}. Use the endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" )] - OtelNotSupported, + OtelNotSupported(String), #[error("The stream {0} is reserved for internal use and cannot be ingested into")] InternalStream(String), #[error(r#"Please use "x-p-log-source: {0}" for ingesting otel {1} data"#)] @@ -528,7 +583,7 @@ impl actix_web::ResponseError for PostError { | InvalidQueryParameter | MissingQueryParameter | CreateStream(CreateStreamError::StreamNameValidation(_)) - | OtelNotSupported => StatusCode::BAD_REQUEST, + | OtelNotSupported(_) => StatusCode::BAD_REQUEST, Event(_) | CreateStream(_) @@ -547,7 +602,6 @@ impl actix_web::ResponseError for PostError { } fn error_response(&self) -> actix_web::HttpResponse { - error!("{self}"); match self { PostError::MetastoreError(metastore_error) => { actix_web::HttpResponse::build(metastore_error.status_code()) From f9f22a6d41023995cb4ac82cae1887a4c270cdc3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 4 Feb 2026 23:04:10 +1100 Subject: [PATCH 2/2] handle json serde error --- src/handlers/http/ingest.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index c79531378..25bf9e1d9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -159,7 +159,13 @@ pub async fn ingest( pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { let size: usize = body.len(); - let json: StrictValue = serde_json::from_slice(&body)?; + let json: StrictValue = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(e) => { + error!("Ingestion failed for stream {stream_name}: malformed JSON in request body"); + return Err(PostError::SerdeError(e)); + } + }; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); @@ -277,8 +283,17 @@ async fn process_otel_content( { Some(content_type) => { if content_type == CONTENT_TYPE_JSON { + let json: serde_json::Value = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(e) => { + error!( + "Ingestion failed for stream {stream_name}: malformed JSON in request body" + ); + return Err(PostError::SerdeError(e)); + } + }; if let Err(e) = flatten_and_push_logs( - serde_json::from_slice(&body)?, + json, stream_name, log_source, &p_custom_fields,