diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 4438f7f27..ebee62b88 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -119,8 +119,8 @@ pub async fn ingest( .await?; //if stream exists, fetch the stream log source - //return error if the stream log source is otel traces or otel metrics - validate_stream_for_ingestion(&stream_name)?; + //return error if the stream log source is otel traces or otel metrics or otel logs + validate_stream_for_ingestion(&stream_name, &log_source)?; PARSEABLE .add_update_log_source(&stream_name, log_source_entry) @@ -415,8 +415,8 @@ pub async fn post_event( } //if stream exists, fetch the stream log source - //return error if the stream log source is otel traces or otel metrics - validate_stream_for_ingestion(&stream_name)?; + //return error if the stream log source is otel traces or otel metrics or otel logs + validate_stream_for_ingestion(&stream_name, &log_source)?; flatten_and_push_logs( json, diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index dcfd83abb..36eafa132 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -276,7 +276,10 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { Ok(()) } -pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<(), PostError> { +pub fn validate_stream_for_ingestion( + stream_name: &str, + log_source: &LogSource, +) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; // Validate that the stream's log source is compatible @@ -289,6 +292,16 @@ pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<(), PostError> }) .ok_or(PostError::IncorrectLogFormat(stream_name.to_string()))?; + // If stream has OtelLogs as log source, only allow OtelLogs ingestion + let has_otel_logs = stream + .get_log_source() + .iter() + .any(|entry| entry.log_source_format == LogSource::OtelLogs); + + if has_otel_logs && *log_source != LogSource::OtelLogs { + return Err(PostError::IncorrectLogFormat(stream_name.to_string())); + } + // Check for time partition if stream.get_time_partition().is_some() { return Err(PostError::Invalid(anyhow::anyhow!(