Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 96 additions & 27 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ pub async fn ingest(
log_source,
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
) {
return Err(PostError::OtelNotSupported);
error!(
"Ingestion failed for stream {stream_name}: OTEL log sources are not supported on /api/v1/ingest endpoint"
);
return Err(PostError::OtelNotSupported(stream_name));
}

let mut p_custom_fields = get_custom_fields_from_header(&req);
Expand All @@ -116,32 +119,53 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
)
.await?;
.await
.map_err(|e| {
error!("Ingestion failed for stream {stream_name}: {e}");
e
})?;

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics or otel logs
validate_stream_for_ingestion(&stream_name, &log_source)?;
validate_stream_for_ingestion(&stream_name, &log_source).map_err(|e| {
error!("Ingestion failed for stream {stream_name}: {e}");
e
})?;

PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;
.await
.map_err(|e| {
error!("Ingestion failed for stream {stream_name}: {e}");
e
})?;

flatten_and_push_logs(
if let Err(e) = flatten_and_push_logs(
json,
&stream_name,
&log_source,
&p_custom_fields,
None,
telemetry_type,
)
.await?;
.await
{
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}

Ok(HttpResponse::Ok().finish())
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
let size: usize = body.len();
let json: StrictValue = serde_json::from_slice(&body)?;
let json: StrictValue = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
error!("Ingestion failed for stream {stream_name}: malformed JSON in request body");
return Err(PostError::SerdeError(e));
}
};
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
let mut p_custom_fields = HashMap::new();
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
Expand Down Expand Up @@ -259,29 +283,50 @@ async fn process_otel_content(
{
Some(content_type) => {
if content_type == CONTENT_TYPE_JSON {
flatten_and_push_logs(
serde_json::from_slice(&body)?,
let json: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
error!(
"Ingestion failed for stream {stream_name}: malformed JSON in request body"
);
return Err(PostError::SerdeError(e));
}
};
if let Err(e) = flatten_and_push_logs(
json,
stream_name,
log_source,
&p_custom_fields,
None,
telemetry_type,
)
.await?;
.await
{
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}
} else if content_type == CONTENT_TYPE_PROTOBUF {
error!(
"Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS"
);
return Err(PostError::Invalid(anyhow::anyhow!(
"Protobuf ingestion is not supported in Parseable OSS"
"Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS"
)));
} else {
error!(
"Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf"
);
return Err(PostError::Invalid(anyhow::anyhow!(
"Unsupported Content-Type: {}. Expected application/json or application/x-protobuf",
content_type
"Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf"
)));
}
}
None => {
error!(
"Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf"
);
return Err(PostError::Invalid(anyhow::anyhow!(
"Missing Content-Type header. Expected application/json or application/x-protobuf"
"Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf"
)));
}
}
Expand All @@ -302,7 +347,11 @@ pub async fn handle_otel_logs_ingestion(
&OTEL_LOG_KNOWN_FIELD_LIST,
TelemetryType::Logs,
)
.await?;
.await
.map_err(|e| {
error!("OTEL logs ingestion failed: {e}");
e
})?;

process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?;

Expand All @@ -322,7 +371,11 @@ pub async fn handle_otel_metrics_ingestion(
&OTEL_METRICS_KNOWN_FIELD_LIST,
TelemetryType::Metrics,
)
.await?;
.await
.map_err(|e| {
error!("OTEL metrics ingestion failed: {e}");
e
})?;

process_otel_content(
&req,
Expand All @@ -349,7 +402,11 @@ pub async fn handle_otel_traces_ingestion(
&OTEL_TRACES_KNOWN_FIELD_LIST,
TelemetryType::Traces,
)
.await?;
.await
.map_err(|e| {
error!("OTEL traces ingestion failed: {e}");
e
})?;

process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Traces).await?;

Expand Down Expand Up @@ -401,32 +458,45 @@ pub async fn post_event(
let mut json = json.into_inner();
match &log_source {
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
return Err(PostError::OtelNotSupported);
error!(
"Ingestion failed for stream {stream_name}: OTEL log sources are not supported on /api/v1/logstream endpoint"
);
return Err(PostError::OtelNotSupported(stream_name));
}
LogSource::Custom(src) => {
KNOWN_SCHEMA_LIST.extract_from_inline_log(
if let Err(e) = KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
&mut p_custom_fields,
src,
extract_log,
)?;
) {
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e.into());
}
}
_ => {}
}

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics or otel logs
validate_stream_for_ingestion(&stream_name, &log_source)?;
if let Err(e) = validate_stream_for_ingestion(&stream_name, &log_source) {
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}

flatten_and_push_logs(
if let Err(e) = flatten_and_push_logs(
json,
&stream_name,
&log_source,
&p_custom_fields,
None,
TelemetryType::Logs,
)
.await?;
.await
{
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -482,9 +552,9 @@ pub enum PostError {
#[error("Error: {0}")]
JsonFlattenError(#[from] JsonFlattenError),
#[error(
"Use the endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces"
"Ingestion failed for stream {0}. Use the endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces"
)]
OtelNotSupported,
OtelNotSupported(String),
#[error("The stream {0} is reserved for internal use and cannot be ingested into")]
InternalStream(String),
#[error(r#"Please use "x-p-log-source: {0}" for ingesting otel {1} data"#)]
Expand Down Expand Up @@ -528,7 +598,7 @@ impl actix_web::ResponseError for PostError {
| InvalidQueryParameter
| MissingQueryParameter
| CreateStream(CreateStreamError::StreamNameValidation(_))
| OtelNotSupported => StatusCode::BAD_REQUEST,
| OtelNotSupported(_) => StatusCode::BAD_REQUEST,

Event(_)
| CreateStream(_)
Expand All @@ -547,7 +617,6 @@ impl actix_web::ResponseError for PostError {
}

fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
error!("{self}");
match self {
PostError::MetastoreError(metastore_error) => {
actix_web::HttpResponse::build(metastore_error.status_code())
Expand Down
Loading