Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/partial_span_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import datetime
import json
import logging
import threading
import time
from typing import TYPE_CHECKING
Expand All @@ -28,7 +29,7 @@
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.trace import TraceFlags

from partial_span_processor.peekable_queue import PeekableQueue
from .peekable_queue import PeekableQueue

if TYPE_CHECKING:
from opentelemetry import context as context_api
Expand All @@ -40,6 +41,7 @@
DEFAULT_INITIAL_HEARTBEAT_DELAY_MILLIS = 5000
DEFAULT_PROCESS_INTERVAL_MILLIS = 5000

_logger = logging.getLogger(__name__)

def validate_parameters(log_exporter, heartbeat_interval_millis,
initial_heartbeat_delay_millis, process_interval_millis):
Expand Down Expand Up @@ -131,7 +133,10 @@ def on_end(self, span: ReadableSpan) -> None:

def export_log(self, span, attributes: dict[str, str]) -> None:
log_data = self.get_log_data(span, attributes)
self.log_exporter.export([log_data])
try:
self.log_exporter.export([log_data])
except Exception:
_logger.exception("Exception while exporting logs.")

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
Expand Down
67 changes: 67 additions & 0 deletions tests/partial_span_processor/test_partial_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,73 @@ def test_process_ready_heartbeat_spans(self):
self.assertEqual(logs[0].log_record.attributes["partial.event"],
"heartbeat")

@patch('src.partial_span_processor._logger')
def test_export_log_handles_export_exception(self, mock_logger):
# Create a mock log exporter that raises an exception
failing_exporter = mock.Mock()
failing_exporter.export.side_effect = Exception("Export failed")

# Create processor with failing exporter
processor = PartialSpanProcessor(
log_exporter=failing_exporter,
heartbeat_interval_millis=1000,
initial_heartbeat_delay_millis=1000,
process_interval_millis=1000,
)

try:
span = TestPartialSpanProcessor.create_mock_span()

# This should not raise an exception despite the exporter failing
processor.export_log(span, {"test": "attribute"})

# Verify the exception was logged
mock_logger.exception.assert_called_once_with(
"Exception while exporting logs.")

# Verify export was attempted
failing_exporter.export.assert_called_once()
finally:
processor.shutdown()

@patch('src.partial_span_processor._logger')
def test_export_log_handles_runtime_error(self, mock_logger):
# Create a mock log exporter that raises a RuntimeError
failing_exporter = mock.Mock()
failing_exporter.export.side_effect = RuntimeError(
"Runtime error during export")

processor = PartialSpanProcessor(
log_exporter=failing_exporter,
heartbeat_interval_millis=1000,
initial_heartbeat_delay_millis=1000,
process_interval_millis=1000,
)

try:
span = TestPartialSpanProcessor.create_mock_span()

# This should not raise an exception
processor.export_log(span, {"test": "attribute"})

# Verify the exception was logged
mock_logger.exception.assert_called_once_with(
"Exception while exporting logs.")
finally:
processor.shutdown()

def test_export_log_succeeds_normally(self):
# Test that normal operation still works
span = TestPartialSpanProcessor.create_mock_span()

# This should work normally
self.processor.export_log(span, {"test": "attribute"})

# Verify log was exported successfully
logs = self.log_exporter.get_finished_logs()
self.assertEqual(len(logs), 1)
self.assertEqual(logs[0].log_record.attributes["test"], "attribute")


if __name__ == "__main__":
unittest.main()