Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions data_rentgen/db/repositories/job_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def create(self, job_dependency: JobDependencyDTO) -> JobDependency:
async def get_dependencies(
self,
job_ids: list[int],
direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"],
direction: Literal["UPSTREAM", "DOWNSTREAM"],
depth: int,
since: datetime | None = None,
until: datetime | None = None,
Expand All @@ -92,10 +92,6 @@ async def get_dependencies(
query = self._get_upstream_hierarchy_query(core_query)
case "DOWNSTREAM":
query = self._get_downstream_hierarchy_query(core_query)
case "BOTH":
query = self._get_upstream_hierarchy_query(core_query).union(
self._get_downstream_hierarchy_query(core_query)
)

result = await self._session.execute(
query, {"job_ids": job_ids, "depth": depth, "since": since, "until": until}
Expand Down
2 changes: 1 addition & 1 deletion data_rentgen/server/api/v1/router/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def get_job_hierarchy(
current_user: Annotated[User, Depends(get_user())],
) -> JobHierarchyResponseV1:
job_hierarchy = await job_service.get_jobs_hierarchy(
start_node_id=query_args.start_node_id,
start_node_ids={query_args.start_node_id},
direction=query_args.direction,
depth=query_args.depth,
infer_from_lineage=query_args.infer_from_lineage,
Expand Down
128 changes: 97 additions & 31 deletions data_rentgen/server/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class JobHierarchyResult:
dependencies: set[tuple[int, int, str | None]] = field(default_factory=set)
jobs: list[JobServiceResult] = field(default_factory=list)

def merge(self, other: "JobHierarchyResult") -> "JobHierarchyResult":
self.parents.update(other.parents)
self.dependencies.update(other.dependencies)
self.jobs.extend(other.jobs)
return self


class JobService:
def __init__(self, uow: Annotated[UnitOfWork, Depends()]):
Expand Down Expand Up @@ -110,49 +116,109 @@ async def get_job_types(self) -> Sequence[str]:

async def get_jobs_hierarchy(
self,
start_node_id: int,
start_node_ids: set[int],
direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"],
depth: int,
since: datetime | None = None,
until: datetime | None = None,
*,
infer_from_lineage: bool = False,
level: int = 0,
) -> JobHierarchyResult:
logger.info(
"Get jobs hierarchy with start at job with id %s, direction %s, depth %s",
start_node_id,
"Get jobs hierarchy with start at job with ids %s, direction %s, depth %s",
start_node_ids,
direction,
depth,
)
job_ids = {start_node_id}

ancestor_relations = await self._uow.job.list_ancestor_relations([start_node_id])
descendant_relations = await self._uow.job.list_descendant_relations([start_node_id])
job_ids = (
{start_node_id}
| {p.parent_job_id for p in ancestor_relations}
| {p.child_job_id for p in descendant_relations}
)
if not start_node_ids:
return JobHierarchyResult()

dependencies = await self._uow.job_dependency.get_dependencies(
job_ids=list(job_ids),
direction=direction,
depth=depth,
infer_from_lineage=infer_from_lineage,
since=since,
until=until,
# Add ancestors and descendants for current level
ancestor_relations = await self._uow.job.list_ancestor_relations(start_node_ids)
descendant_relations = await self._uow.job.list_descendant_relations(start_node_ids)
job_ids = (
start_node_ids
| {r.parent_job_id for r in ancestor_relations}
| {r.child_job_id for r in descendant_relations}
)
dependency_job_ids = {d.from_job_id for d in dependencies} | {d.to_job_id for d in dependencies}
job_ids |= dependency_job_ids
# return ancestors for all found jobs in the graph
ancestor_relations += await self._uow.job.list_ancestor_relations(list(dependency_job_ids))
job_ids |= {p.parent_job_id for p in ancestor_relations}
jobs = await self._uow.job.list_by_ids(list(job_ids))
return JobHierarchyResult(
parents={(p.parent_job_id, p.child_job_id) for p in ancestor_relations + descendant_relations},
dependencies={
(d.from_job_id, d.to_job_id, d.type)
for d in sorted(dependencies, key=lambda x: (x.from_job_id, x.to_job_id, x.type))
},
jobs=[JobServiceResult.from_orm(job) for job in jobs],
result = JobHierarchyResult()
result.parents.update(ancestor_relations)
result.parents.update(descendant_relations)

upstream_dependecies = set()
downstream_dependencies = set()
if direction in {"UPSTREAM", "BOTH"}:
upstream_dependecies.update(
await self._uow.job_dependency.get_dependencies(
job_ids=list(job_ids),
direction="UPSTREAM",
depth=depth,
infer_from_lineage=infer_from_lineage,
since=since,
until=until,
)
)

if direction in {"DOWNSTREAM", "BOTH"}:
downstream_dependencies.update(
await self._uow.job_dependency.get_dependencies(
job_ids=list(job_ids),
direction="DOWNSTREAM",
depth=depth,
infer_from_lineage=infer_from_lineage,
since=since,
until=until,
)
)
result.dependencies.update(
{(d.from_job_id, d.to_job_id, d.type) for d in upstream_dependecies}
| {(d.from_job_id, d.to_job_id, d.type) for d in downstream_dependencies}
)

if depth > 1:
result.merge(
await self.get_jobs_hierarchy(
start_node_ids={d.from_job_id for d in upstream_dependecies},
direction="UPSTREAM",
depth=depth - 1,
infer_from_lineage=infer_from_lineage,
since=since,
until=until,
level=level + 1,
)
)
result.merge(
await self.get_jobs_hierarchy(
start_node_ids={d.to_job_id for d in downstream_dependencies},
direction="DOWNSTREAM",
depth=depth - 1,
infer_from_lineage=infer_from_lineage,
since=since,
until=until,
level=level + 1,
)
)
else:
# Add parents for last dependencies
ids = {from_job_id for (from_job_id, _, _) in result.dependencies} | {
to_job_id for (_, to_job_id, _) in result.dependencies
}
result.parents.update(await self._uow.job.list_ancestor_relations(ids))
result.parents.update(await self._uow.job.list_descendant_relations(ids))

# Collect all job nodes once, after all recursive calls are merged.
if level == 0:
final_job_ids = (
start_node_ids
| {from_job_id for (from_job_id, _, _) in result.dependencies}
| {to_job_id for (_, to_job_id, _) in result.dependencies}
| {from_job_id for (from_job_id, _) in result.parents}
| {to_job_id for (_, to_job_id) in result.parents}
)
result.jobs = [
JobServiceResult.from_orm(job) for job in await self._uow.job.list_by_ids(list(final_job_ids))
]

return result
2 changes: 2 additions & 0 deletions docs/changelog/next_release/423.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Improved ``/v1/jobs/hierarchy`` to return a deeper graph for traversals with depth>1: hierarchy now aggregates dependencies level-by-level and includes all related job nodes (including lineage-inferred jobs).

12 changes: 6 additions & 6 deletions tests/test_server/fixtures/factories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ async def job_dependency_chain_with_lineage(
right_dataset_location = await create_location(async_session)
right_dataset = await create_dataset(async_session, location_id=right_dataset_location.id)

# Connect left chain to central chain: left_task -> task1
# Connect left chain to central chain: left_spark -> spark1
left_created_at = datetime.now(tz=UTC)
left_operation_id = generate_new_uuid(left_created_at)
left_output = await create_output(
Expand All @@ -544,7 +544,7 @@ async def job_dependency_chain_with_lineage(
"created_at": left_created_at,
"operation_id": left_operation_id,
"run_id": generate_new_uuid(left_created_at),
"job_id": left_task.id,
"job_id": left_spark.id,
"dataset_id": left_dataset.id,
"schema_id": None,
},
Expand All @@ -555,13 +555,13 @@ async def job_dependency_chain_with_lineage(
"created_at": left_created_at - timedelta(seconds=1),
"operation_id": left_operation_id,
"run_id": left_output.run_id,
"job_id": task1.id,
"job_id": spark1.id,
"dataset_id": left_dataset.id,
"schema_id": None,
},
)

# Connect central chain to right chain: task3 -> right_task
# Connect central chain to right chain: spark3 -> right_spark
right_created_at = datetime.now(tz=UTC) + timedelta(seconds=10)
right_operation_id = generate_new_uuid(right_created_at)
right_output = await create_output(
Expand All @@ -570,7 +570,7 @@ async def job_dependency_chain_with_lineage(
"created_at": right_created_at,
"operation_id": right_operation_id,
"run_id": generate_new_uuid(right_created_at),
"job_id": task3.id,
"job_id": spark3.id,
"dataset_id": right_dataset.id,
"schema_id": None,
},
Expand All @@ -581,7 +581,7 @@ async def job_dependency_chain_with_lineage(
"created_at": right_created_at - timedelta(seconds=1),
"operation_id": right_operation_id,
"run_id": right_output.run_id,
"job_id": right_task.id,
"job_id": right_spark.id,
"dataset_id": right_dataset.id,
"schema_id": None,
},
Expand Down
46 changes: 24 additions & 22 deletions tests/test_server/test_jobs/test_job_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,10 @@ async def test_get_job_hierarchy_with_direction_both(
(
(dag1, dag2, dag3),
(task1, task2, task3),
(_, spark2, _),
(spark1, spark2, spark3),
) = job_dependency_chain
expected_nodes = await enrich_jobs(
[
dag1,
dag2,
dag3,
task1,
task2,
task3,
spark2,
],
[dag1, dag2, dag3, task1, task2, task3, spark1, spark2, spark3],
async_session,
)

Expand Down Expand Up @@ -138,10 +130,10 @@ async def test_get_job_hierarchy_with_direction_upstream(
(
(dag1, dag2, _),
(task1, task2, _),
(_, spark2, _),
(spark1, spark2, _),
) = job_dependency_chain
expected_nodes = await enrich_jobs(
[dag1, task1, dag2, task2, spark2],
[dag1, task1, spark1, dag2, task2, spark2],
async_session,
)

Expand Down Expand Up @@ -176,10 +168,10 @@ async def test_get_job_hierarchy_with_direction_downstream(
(
(_, dag2, dag3),
(_, task2, task3),
(_, spark2, _),
(_, spark2, spark3),
) = job_dependency_chain
expected_nodes = await enrich_jobs(
[dag2, task2, spark2, dag3, task3],
[dag2, task2, spark2, dag3, task3, spark3],
async_session,
)

Expand Down Expand Up @@ -367,7 +359,7 @@ async def test_get_job_hierarchy_with_inferred_dependencies(
expected_ids.add(to_idx)
expected_dags = [dags[idx] for idx in expected_ids]
expected_tasks = [tasks[idx] for idx in expected_ids]
expected_sparks = [sparks[start_node_idx]]
expected_sparks = [sparks[idx] for idx in expected_ids]
expected_nodes = await enrich_jobs(expected_dags + expected_tasks + expected_sparks, async_session)

response = await test_client.get(
Expand All @@ -393,6 +385,16 @@ async def test_get_job_hierarchy_with_inferred_dependencies(
"type": dep_type,
}
for from_idx, to_idx, dep_type in expected_deps
if dep_type == "DIRECT_DEPENDENCY"
]
+ [
{
"from": {"kind": "JOB", "id": str(sparks[from_idx].id)},
"to": {"kind": "JOB", "id": str(sparks[to_idx].id)},
"type": dep_type,
}
for from_idx, to_idx, dep_type in expected_deps
if dep_type == "INFERRED_FROM_LINEAGE"
],
},
"nodes": {"jobs": jobs_to_json(expected_nodes)},
Expand All @@ -408,16 +410,16 @@ async def test_get_job_hierarchy_with_inferred_dependencies_with_since_and_until
dags, tasks, sparks = job_dependency_chain_with_lineage
start_node = tasks[2]

# Cover both inferred links connected to task0 and task4.
edge_task_ids = [tasks[0].id, tasks[4].id]
# Cover both inferred links connected to spark0 and spark4.
edge_spark_ids = [sparks[0].id, sparks[4].id]
min_input_created_at = await async_session.scalar(
select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)),
select(func.min(Input.created_at)).where(Input.job_id.in_(edge_spark_ids)),
) - timedelta(seconds=2)
max_output_created_at = await async_session.scalar(
select(func.max(Output.created_at)).where(Output.job_id.in_(edge_task_ids)),
select(func.max(Output.created_at)).where(Output.job_id.in_(edge_spark_ids)),
) + timedelta(seconds=2)

expected_nodes = await enrich_jobs([*dags[1:4], *tasks[1:4], sparks[2]], async_session)
expected_nodes = await enrich_jobs([*dags[1:4], *tasks[1:4], *sparks[1:4]], async_session)
expected_deps = [
(1, 2, "DIRECT_DEPENDENCY"),
(2, 3, "DIRECT_DEPENDENCY"),
Expand Down Expand Up @@ -497,10 +499,10 @@ async def test_get_job_hierarchy_with_inferred_dependencies_since_less_then_unti
job_dependency_chain_with_lineage: tuple[tuple[Job, Job, Job, Job, Job], ...],
mocked_user: MockedUser,
):
_, tasks, _ = job_dependency_chain_with_lineage
_, tasks, sparks = job_dependency_chain_with_lineage
start_node = tasks[2]

edge_task_ids = [tasks[0].id, tasks[4].id]
edge_task_ids = [sparks[0].id, sparks[4].id]
min_input_created_at = await async_session.scalar(
select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)),
) - timedelta(seconds=2)
Expand Down
Loading