Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
insecure=True) # grpc

# Configure span processors
partial_span_processor = PartialSpanProcessor(log_exporter, 5000)
partial_span_processor = PartialSpanProcessor(log_exporter=log_exporter,
heartbeat_interval_millis=1000,
initial_heartbeat_delay_millis=6000,
process_interval_millis=1000)
batch_span_processor = BatchSpanProcessor(span_exporter)

# Create a TracerProvider
Expand All @@ -29,6 +32,11 @@
tracer = trace.get_tracer(__name__)

# Start a span (logs heartbeat and stop events)
with tracer.start_as_current_span("partial_span_1"):
print("partial_span_1 is running")
sleep(10)
with tracer.start_as_current_span("span 1"):
with tracer.start_as_current_span("span 2"):
print("sleeping inside span 2")
sleep(2)
print("sleeping inside span 1")
sleep(5)
print("sleeping outside spans")
sleep(5)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "partial_span_processor"
version = "0.0.8"
version = "0.0.9"
authors = [
{ name = "Mladjan Gadzic", email = "gadzic.mladjan@gmail.com" }
]
Expand Down
145 changes: 113 additions & 32 deletions src/partial_span_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

from __future__ import annotations

import datetime
import json
import threading
import time
from queue import Queue
from typing import TYPE_CHECKING

from google.protobuf import json_format
Expand All @@ -28,31 +28,63 @@
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.trace import TraceFlags

from partial_span_processor.peekable_queue import PeekableQueue

if TYPE_CHECKING:
from opentelemetry import context as context_api
from opentelemetry.sdk._logs.export import LogExporter
from opentelemetry.sdk.resources import Resource

WORKER_THREAD_NAME = "OtelPartialSpanProcessor"
DEFAULT_HEARTBEAT_INTERVAL_MILLIS = 5000
DEFAULT_INITIAL_HEARTBEAT_DELAY_MILLIS = 5000
DEFAULT_PROCESS_INTERVAL_MILLIS = 5000


def validate_parameters(log_exporter, heartbeat_interval_millis,
initial_heartbeat_delay_millis, process_interval_millis):
if log_exporter is None:
msg = "log_exporter must not be None"
raise ValueError(msg)

if heartbeat_interval_millis <= 0:
msg = "heartbeat_interval_millis must be greater than 0"
raise ValueError(msg)

if initial_heartbeat_delay_millis < 0:
msg = "initial_heartbeat_delay_millis must be greater or equal to 0"
raise ValueError(msg)

if process_interval_millis <= 0:
msg = "process_interval_millis must be greater than 0"
raise ValueError(msg)


class PartialSpanProcessor(SpanProcessor):

def __init__(
self,
log_exporter: LogExporter,
heartbeat_interval_millis: int,
heartbeat_interval_millis: int = DEFAULT_HEARTBEAT_INTERVAL_MILLIS,
initial_heartbeat_delay_millis: int = DEFAULT_INITIAL_HEARTBEAT_DELAY_MILLIS,
process_interval_millis: int = DEFAULT_PROCESS_INTERVAL_MILLIS,
resource: Resource | None = None,
) -> None:
if heartbeat_interval_millis <= 0:
msg = "heartbeat_interval_ms must be greater than 0"
raise ValueError(msg)
validate_parameters(log_exporter, heartbeat_interval_millis,
initial_heartbeat_delay_millis, process_interval_millis)

self.log_exporter = log_exporter
self.heartbeat_interval_millis = heartbeat_interval_millis
self.initial_heartbeat_delay_millis = initial_heartbeat_delay_millis
self.process_interval_millis = process_interval_millis
self.resource = resource

self.active_spans = {}
self.ended_spans = Queue()
self.delayed_heartbeat_spans: PeekableQueue[tuple[int, datetime.datetime]] = \
PeekableQueue()
self.delayed_heartbeat_spans_lookup: set[int] = set()
self.ready_heartbeat_spans: PeekableQueue[
tuple[int, datetime.datetime]] = PeekableQueue()
self.lock = threading.Lock()

self.done = False
Expand All @@ -65,44 +97,42 @@ def __init__(
def worker(self) -> None:
while not self.done:
with self.condition:
self.condition.wait(self.heartbeat_interval_millis / 1000)
self.condition.wait(self.process_interval_millis / 1000)
if self.done:
break

# Remove ended spans from active spans
with self.lock:
while not self.ended_spans.empty():
span_key, span = self.ended_spans.get()
if span_key in self.active_spans:
del self.active_spans[span_key]

self.heartbeat()

def heartbeat(self) -> None:
with self.lock:
for span in list(self.active_spans.values()):
attributes = self.get_heartbeat_attributes()
log_data = self.get_log_data(span, attributes)
self.log_exporter.export([log_data])
self.process_delayed_heartbeat_spans()
self.process_ready_heartbeat_spans()

def on_start(self, span: Span,
parent_context: context_api.Context | None = None) -> None:
attributes = self.get_heartbeat_attributes()
log_data = self.get_log_data(span, attributes)
self.log_exporter.export([log_data])

span_key = (span.context.trace_id, span.context.span_id)
with self.lock:
self.active_spans[span_key] = span
self.active_spans[span.context.span_id] = span
self.delayed_heartbeat_spans_lookup.add(span.context.span_id)

next_heartbeat_time = datetime.datetime.now() + datetime.timedelta(
milliseconds=self.initial_heartbeat_delay_millis)
self.delayed_heartbeat_spans.put(
(span.context.span_id, next_heartbeat_time))

def on_end(self, span: ReadableSpan) -> None:
attributes = get_stop_attributes()
is_delayed_heartbeat_pending = False
with self.lock:
self.active_spans.pop(span.context.span_id)

if span.context.span_id in self.delayed_heartbeat_spans_lookup:
is_delayed_heartbeat_pending = True
self.delayed_heartbeat_spans_lookup.remove(span.context.span_id)

if is_delayed_heartbeat_pending:
return

self.export_log(span, get_stop_attributes())

def export_log(self, span, attributes: dict[str, str]) -> None:
log_data = self.get_log_data(span, attributes)
self.log_exporter.export([log_data])

span_key = (span.context.trace_id, span.context.span_id)
self.ended_spans.put((span_key, span))

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
Expand Down Expand Up @@ -161,6 +191,57 @@ def get_log_data(self, span: Span, attributes: dict[str, str]) -> LogData:
log_record=log_record, instrumentation_scope=instrumentation_scope,
)

def process_delayed_heartbeat_spans(self) -> None:
spans_to_be_logged = []
with (self.lock):
now = datetime.datetime.now()
while True:
if self.delayed_heartbeat_spans.empty():
break

(span_id, next_heartbeat_time) = self.delayed_heartbeat_spans.peek()
if next_heartbeat_time > now:
break

self.delayed_heartbeat_spans_lookup.discard(span_id)
self.delayed_heartbeat_spans.get()

span = self.active_spans.get(span_id)
if span:
spans_to_be_logged.append(span)

next_heartbeat_time = now + datetime.timedelta(
milliseconds=self.heartbeat_interval_millis)
self.ready_heartbeat_spans.put((span_id, next_heartbeat_time))

for span in spans_to_be_logged:
self.export_log(span, self.get_heartbeat_attributes())

def process_ready_heartbeat_spans(self) -> None:
spans_to_be_logged = []
now = datetime.datetime.now()
with self.lock:
while True:
if self.ready_heartbeat_spans.empty():
break

(span_id, next_heartbeat_time) = self.ready_heartbeat_spans.peek()
if next_heartbeat_time > now:
break

self.ready_heartbeat_spans.get()

span = self.active_spans.get(span_id)
if span:
spans_to_be_logged.append(span)

next_heartbeat_time = now + datetime.timedelta(
milliseconds=self.heartbeat_interval_millis)
self.ready_heartbeat_spans.put((span_id, next_heartbeat_time))

for span in spans_to_be_logged:
self.export_log(span, self.get_heartbeat_attributes())


def get_stop_attributes() -> dict[str, str]:
return {
Expand Down
8 changes: 8 additions & 0 deletions src/partial_span_processor/peekable_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import queue

class PeekableQueue(queue.Queue):
def peek(self):
with self.mutex:
if self._qsize() > 0:
return self.queue[0]
return None
Loading