Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ Currently we support consuming lineage from:
* Apache Hive
* Apache Flink
* dbt
* StarRocks (proprietary integration, part of MWS Data Engine)

**Note**: service is under active development, so it doesn't have stable API for now.
**Note**: service is under active development, so API can be unstable.

Goals
-----
Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/consumer/extractors/batch_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SparkExtractor,
UnknownExtractor,
)
from data_rentgen.consumer.extractors.impl.starrocks import StarRocksExtractor
from data_rentgen.openlineage.run_event import OpenLineageRunEvent


Expand All @@ -31,6 +32,7 @@ def __init__(self) -> None:
HiveExtractor(),
FlinkExtractor(),
DbtExtractor(),
StarRocksExtractor(),
]
self.unknown_extractor = UnknownExtractor()

Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/consumer/extractors/impl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from data_rentgen.consumer.extractors.impl.hive import HiveExtractor
from data_rentgen.consumer.extractors.impl.interface import ExtractorInterface
from data_rentgen.consumer.extractors.impl.spark import SparkExtractor
from data_rentgen.consumer.extractors.impl.starrocks import StarRocksExtractor
from data_rentgen.consumer.extractors.impl.unknown import UnknownExtractor

__all__ = [
Expand All @@ -17,5 +18,6 @@
"FlinkExtractor",
"HiveExtractor",
"SparkExtractor",
"StarRocksExtractor",
"UnknownExtractor",
]
3 changes: 2 additions & 1 deletion data_rentgen/consumer/extractors/impl/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
run=run,
name=hive_query.queryId,
description=hive_query.operationName,
# no started_at == run.started_at
type=self._extract_operation_type(event),
sql_query=self._extract_sql_query(event),
)
Expand All @@ -94,7 +95,7 @@ def _extract_output_type(
) -> OutputTypeDTO:
match operation.description:
case None:
return OutputTypeDTO(0)
return OutputTypeDTO.UNKNOWN
case value if value.startswith("CREATE"):
return OutputTypeDTO.CREATE
case value if value.startswith("ALTER"):
Expand Down
89 changes: 89 additions & 0 deletions data_rentgen/consumer/extractors/impl/starrocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SPDX-FileCopyrightText: 2024-present MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from typing import cast

from data_rentgen.consumer.extractors.generic import GenericExtractor
from data_rentgen.dto import (
JobDTO,
JobTypeDTO,
OperationDTO,
OutputTypeDTO,
RunDTO,
RunStatusDTO,
UserDTO,
)
from data_rentgen.dto.sql_query import SQLQueryDTO
from data_rentgen.openlineage.dataset import OpenLineageOutputDataset
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
from data_rentgen.openlineage.run_facets import (
OpenLineageStarRocksSessionInfoRunFacet,
)
from data_rentgen.utils.uuid import extract_timestamp_from_uuid


class StarRocksExtractor(GenericExtractor):
def match(self, event: OpenLineageRunEvent) -> bool:
return bool(event.job.facets.jobType and event.job.facets.jobType.integration == "STARROCKS")

def is_operation(self, event: OpenLineageRunEvent) -> bool:
return bool(event.job.facets.jobType and event.job.facets.jobType.jobType == "QUERY")

def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO:
# We treat queries as operations, and operations should be bound to run (session) for grouping.
# So we create run artificially using starrocks_session facet
starrocks_session = cast("OpenLineageStarRocksSessionInfoRunFacet", event.run.facets.starrocks_session)
return RunDTO(
id=starrocks_session.sessionId,
job=JobDTO(
name=f"{starrocks_session.user}@{starrocks_session.clientIp}",
location=self._extract_job_location(event.job),
type=JobTypeDTO(type="STARROCKS_SESSION"),
),
parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
started_at=extract_timestamp_from_uuid(starrocks_session.sessionId),
user=UserDTO(name=starrocks_session.user),
)

def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent):
if self.is_operation(event):
# for query events we don't know session start time
run.status = RunStatusDTO.STARTED
return run

return super()._enrich_run_status(run, event)

def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
run = self.extract_run(event)

operation = OperationDTO(
id=event.run.runId,
run=run,
name=event.job.name,
# no started_at == run.started_at
type=self._extract_operation_type(event),
sql_query=self._extract_sql_query(event),
)
self._enrich_operation_status(operation, event)
return operation

def _extract_output_type( # noqa: PLR0911
self,
operation: OperationDTO,
dataset: OpenLineageOutputDataset,
) -> OutputTypeDTO:
match operation.sql_query:
case None:
return OutputTypeDTO.UNKNOWN
case SQLQueryDTO(query=query) if query.startswith("INSERT"):
return OutputTypeDTO.APPEND
case SQLQueryDTO(query=query) if query.startswith("CREATE"):
return OutputTypeDTO.CREATE
case SQLQueryDTO(query=query) if query.startswith("ALTER"):
return OutputTypeDTO.ALTER
case SQLQueryDTO(query=query) if query.startswith("DROP"):
return OutputTypeDTO.DROP
case SQLQueryDTO(query=query) if query.startswith("TRUNCATE"):
return OutputTypeDTO.TRUNCATE
return OutputTypeDTO.UNKNOWN
7 changes: 5 additions & 2 deletions data_rentgen/openlineage/run_facets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from data_rentgen.openlineage.run_facets.spark_job import (
OpenLineageSparkJobDetailsRunFacet,
)
from data_rentgen.openlineage.run_facets.starrocks_session import OpenLineageStarRocksSessionInfoRunFacet

__all__ = [
"DataRentgenOperationInfoFacet",
Expand Down Expand Up @@ -74,6 +75,7 @@
"OpenLineageSparkApplicationDetailsRunFacet",
"OpenLineageSparkDeployMode",
"OpenLineageSparkJobDetailsRunFacet",
"OpenLineageStarRocksSessionInfoRunFacet",
]


Expand All @@ -85,6 +87,8 @@ class OpenLineageRunFacets(OpenLineageBase):
parent: OpenLineageParentRunFacet | None = None
processing_engine: OpenLineageProcessingEngineRunFacet | None = None
tags: OpenLineageRunTagsFacet | None = None
nominalTime: OpenLineageNominalTimeRunFacet | None = None
jobDependencies: OpenLineageJobDependenciesRunFacet | None = None
dataRentgen_run: DataRentgenRunInfoFacet | None = None
dataRentgen_operation: DataRentgenOperationInfoFacet | None = None
spark_applicationDetails: OpenLineageSparkApplicationDetailsRunFacet | None = None
Expand All @@ -95,5 +99,4 @@ class OpenLineageRunFacets(OpenLineageBase):
flink_job: OpenLineageFlinkJobDetailsRunFacet | None = None
hive_query: OpenLineageHiveQueryInfoRunFacet | None = None
hive_session: OpenLineageHiveSessionInfoRunFacet | None = None
nominalTime: OpenLineageNominalTimeRunFacet | None = None
jobDependencies: OpenLineageJobDependenciesRunFacet | None = None
starrocks_session: OpenLineageStarRocksSessionInfoRunFacet | None = None
14 changes: 14 additions & 0 deletions data_rentgen/openlineage/run_facets/starrocks_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: 2025-present MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import UUID7, Field

from data_rentgen.openlineage.run_facets.base import OpenLineageRunFacet


class OpenLineageStarRocksSessionInfoRunFacet(OpenLineageRunFacet):
"""Run facet describing StarRocks session."""

user: str = Field(examples=["myuser"])
sessionId: UUID7 = Field(examples=["019d455f-cf2a-7b1e-92d4-8f5c3e9a7b2d"])
clientIp: str = Field(examples=["11.22.33.44"])
11 changes: 0 additions & 11 deletions data_rentgen/utils/uuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import time
from datetime import datetime, timezone
from hashlib import sha1
from typing import Any
from uuid import NAMESPACE_URL, uuid5
from uuid import UUID as BaseUUID # noqa: N811

Expand Down Expand Up @@ -111,13 +110,3 @@ def extract_timestamp_from_uuid(uuid: BaseUUID) -> datetime:
msg = "Only UUIDv6+ are supported"
raise ValueError(msg)
return datetime.fromtimestamp(uuid.time / 1000, tz=timezone.utc)


def uuid_version_validator(run_id: Any) -> NewUUID:
if isinstance(run_id, str):
run_id = NewUUID(run_id)
if not run_id.version or run_id.version < 6: # noqa: PLR2004
err_msg = f"Run ID: {run_id} is not valid uuid. Only UUIDv6+ are supported"
raise ValueError(err_msg)
return run_id
return run_id
1 change: 1 addition & 0 deletions docs/changelog/next_release/424.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for OpenLineage integration for StarRocks (proprietary, part of MWS Data Engine).
3 changes: 2 additions & 1 deletion mddocs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ Currently we support consuming lineage from:
* Apache Hive
* Apache Flink
* dbt
* StarRocks (proprietary integration, part of MWS Data Engine)

**Note**: service is under active development, so it doesn’t have stable API for now.
**Note**: service is under active development, so API can be unstable.

# Goals

Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"tests.test_consumer.test_extractors.fixtures.hive_dto",
"tests.test_consumer.test_extractors.fixtures.spark_raw",
"tests.test_consumer.test_extractors.fixtures.spark_dto",
"tests.test_consumer.test_extractors.fixtures.starrocks_raw",
"tests.test_consumer.test_extractors.fixtures.starrocks_dto",
"tests.test_consumer.test_extractors.fixtures.airflow_raw",
"tests.test_consumer.test_extractors.fixtures.airflow_dto",
"tests.test_consumer.test_extractors.fixtures.unknown_raw",
Expand Down
3 changes: 3 additions & 0 deletions tests/resources/events_starrocks.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"eventTime": "2026-03-31T19:30:00.456Z", "producer": "starrocks-openlineage-plugin/1.0.0", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "START", "run": {"runId": "019d455f-d088-7d61-bdcd-6a9085a2096a", "facets": {"starrocks_query": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "queryId": "019d455f-d088-7d61-bdcd-6a9085a2096a", "feIp": "10.0.1.15"}, "starrocks_session": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "sessionId": "019d455f-cf2a-7b1e-92d4-8f5c3e9a7b2d", "user": "myuser", "clientIp": "11.22.33.44"}}}, "job": {"name": "insert_query", "namespace": "starrocks://some-starrocks:9030", "facets": {"sql": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/SQLJobFacet", "query": "INSERT INTO users_backup SELECT * FROM users WHERE active = true"}, "jobType": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json", "processingType": "BATCH", "integration": "STARROCKS", "jobType": "QUERY"}}}, "inputs": [], "outputs": []}
{"eventTime": "2026-03-31T19:40:00.456Z", "producer": "starrocks-openlineage-plugin/1.0.0", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "COMPLETE", "run": {"runId": "019d455f-d088-7d61-bdcd-6a9085a2096a", "facets": {"starrocks_query": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "queryId": "019d455f-d088-7d61-bdcd-6a9085a2096a", "feIp": "10.0.1.15"}, "starrocks_session": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "sessionId": "019d455f-cf2a-7b1e-92d4-8f5c3e9a7b2d", "user": "myuser", "clientIp": "11.22.33.44"}, "starrocks_statistics": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "scanRows": 10000, "scanBytes": 524288, "returnRows": 0, "transmittedBytes": 0, "queryTimeMs": 2500, "cpuCostNs": 500000000, "memCostBytes": 1048576}}}, "job": {"name": "insert_query", "namespace": "starrocks://some-starrocks:9030", "facets": {"sql": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/SQLJobFacet", "query": "INSERT INTO users_backup SELECT * FROM users WHERE active = true"}, "jobType": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json", "processingType": "BATCH", "integration": "STARROCKS", "jobType": "QUERY"}}}, "inputs": [{"namespace": "http://test-iceberg:8181", "name": "test_db.test_table", "inputFacets": {"inputStatistics": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json", "rowCount": 10000, "size": 524288}}}], "outputs": [{"namespace": "http://test-iceberg:8181", "name": "test_db.users_backup", "outputFacets": {"outputStatistics": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OutputStatisticsOutputDatasetFacet.json", "rowCount": 8500}}}]}
{"eventTime": "2026-03-31T19:40:00.456Z", "producer": "starrocks-openlineage-plugin/1.0.0", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "COMPLETE", "run": {"runId": "019d455f-cf2a-7b1e-92d4-8f5c3e9a7b2d", "facets": {"starrocks_session": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "sessionId": "019d455f-cf2a-7b1e-92d4-8f5c3e9a7b2d", "user": "myuser", "clientIp": "11.22.33.44"}}}, "job": {"name": "session", "namespace": "starrocks://some-starrocks:9030", "facets": {"jobType": {"_producer": "starrocks-openlineage-plugin/1.0.0", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json", "processingType": "BATCH", "integration": "STARROCKS", "jobType": "SESSION"}}}, "inputs": [], "outputs": []}
29 changes: 29 additions & 0 deletions tests/test_consumer/test_extractors/fixtures/io_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,32 @@ def extracted_dataset_schema() -> SchemaDTO:
@pytest.fixture
def extracted_user() -> UserDTO:
return UserDTO(name="myuser")


@pytest.fixture
def extracted_iceberg_metastore_location() -> LocationDTO:
return LocationDTO(
type="http",
name="test-iceberg:8181",
addresses={"http://test-iceberg:8181"},
)


@pytest.fixture
def extracted_iceberg_dataset1(
extracted_iceberg_metastore_location: LocationDTO,
) -> DatasetDTO:
return DatasetDTO(
location=extracted_iceberg_metastore_location,
name="test_db.test_table",
)


@pytest.fixture
def extracted_iceberg_dataset2(
extracted_iceberg_metastore_location: LocationDTO,
) -> DatasetDTO:
return DatasetDTO(
location=extracted_iceberg_metastore_location,
name="test_db.users_backup",
)
16 changes: 16 additions & 0 deletions tests/test_consumer/test_extractors/fixtures/io_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,19 @@ def kafka_output_with_stats(
),
},
)


@pytest.fixture
def iceberg_input() -> OpenLineageInputDataset:
return OpenLineageInputDataset(
namespace="http://test-iceberg:8181",
name="test_db.test_table",
)


@pytest.fixture
def iceberg_output() -> OpenLineageOutputDataset:
return OpenLineageOutputDataset(
namespace="http://test-iceberg:8181",
name="test_db.users_backup",
)
Loading
Loading