From 6515069a2985f12b3c9d56fe4887266fab7c75c8 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 27 Jan 2026 11:42:16 -0800 Subject: [PATCH 1/6] feat: Annotation search for Pipeline Runs --- cloud_pipelines_backend/api_server_sql.py | 334 +++++++++++++- cloud_pipelines_backend/backend_types_sql.py | 19 +- cloud_pipelines_backend/database_ops.py | 6 + pyproject.toml | 18 + tests/test_pipeline_run_search.py | 439 +++++++++++++++++++ 5 files changed, 808 insertions(+), 8 deletions(-) create mode 100644 tests/test_pipeline_run_search.py diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..0667bd1 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1,11 +1,14 @@ import base64 import dataclasses import datetime +import enum import json import logging import typing from typing import Any, Optional +import pydantic # type: ignore[import-unresolved] + if typing.TYPE_CHECKING: from cloud_pipelines.orchestration.storage_providers import ( interfaces as storage_provider_interfaces, @@ -31,6 +34,80 @@ def _get_current_time() -> datetime.datetime: from . import errors from .errors import ItemNotFoundError +# ==== Annotation Filter Types for Search ==== + + +class GroupOperator(enum.StrEnum): + """Logical operators for combining filters in a group.""" + + AND = "and" + OR = "or" + + +class FilterOperator(enum.StrEnum): + """Operators for filtering by annotation key or value.""" + + CONTAINS = "contains" # Contains substring + EQUALS = "equals" # Equals exact string + IN_SET = "in_set" # Is in a set of values + + +class TextFilter(pydantic.BaseModel): + """Filter by text pattern. + + Examples: + - TextFilter(operator=FilterOperator.EQUALS, text="environment") + - TextFilter(operator=FilterOperator.CONTAINS, text="env") + - TextFilter(operator=FilterOperator.IN_SET, texts=["prod", "staging"]) + - TextFilter(operator=FilterOperator.EQUALS, text="test", negate=True) + """ + + operator: FilterOperator + text: str | None = None # For EQUALS, CONTAINS operators + texts: list[str] | None = None # For IN_SET operator + negate: bool = False # If True, negates the operation + + +class AnnotationFilter(pydantic.BaseModel): + """Filter annotations by key and optionally by value. + + Examples: + - AnnotationFilter(key=TextFilter(operator=FilterOperator.EQUALS, text="environment")) + → Find runs that have an "environment" annotation key + - AnnotationFilter( + key=TextFilter(operator=FilterOperator.EQUALS, text="environment"), + value=TextFilter(operator=FilterOperator.EQUALS, text="production") + ) + → Find runs with annotation key "environment" and value "production" + """ + + key: TextFilter + value: TextFilter | None = None + + +class AnnotationFilterGroup(pydantic.BaseModel): + """A flat group of AnnotationFilters combined with AND/OR logic. + + Each AnnotationFilter matches a single annotation row (key + optional value). + The group operator determines how multiple filters are combined. + + Examples: + - AnnotationFilterGroup(filters=[...]) + → Defaults to OR logic (match ANY filter) + - AnnotationFilterGroup(operator=GroupOperator.AND, filters=[...]) + → All filters must match (match ALL filters) + - AnnotationFilterGroup(operator=GroupOperator.OR, filters=[...]) + → At least one filter must match + + SQL Pattern: + filters=[AF(key="env", value="prod"), AF(key="team")] + → EXISTS(key='env' AND value='prod') OR EXISTS(key='team') + """ + + filters: list[AnnotationFilter] + # Operator defaults to None, which is treated as OR logic. + operator: GroupOperator | None = None + # ==== PipelineJobService @dataclasses.dataclass(kw_only=True) @@ -63,15 +140,141 @@ class GetPipelineRunResponse(PipelineRunResponse): class ListPipelineJobsResponse: pipeline_runs: list[PipelineRunResponse] next_page_token: str | None = None + debug_where_clause: str | None = ( + None # Populated when debug_where_clause=True in search() + ) import sqlalchemy as sql from sqlalchemy import orm +def _compile_where_clauses_to_string( + session: orm.Session, + where_clauses: list[sql.ColumnElement[bool]], +) -> str: + """Compile WHERE clauses to a SQL string for debugging. + + Uses the dialect from the session's engine (SQLite, MySQL, etc.) + and inlines literal values for readability. + """ + if not where_clauses: + return "(no where clauses)" + + # Combine all clauses with AND + combined = sql.and_(*where_clauses) if len(where_clauses) > 1 else where_clauses[0] + + # Get dialect from session's engine + dialect = session.bind.dialect if session.bind else None + + try: + compiled = combined.compile( + dialect=dialect, + compile_kwargs={"literal_binds": True}, + ) + return str(compiled) + except Exception as e: + # Fallback if literal_binds fails (e.g., for complex types) + try: + compiled = combined.compile(dialect=dialect) + return f"{compiled} [params: {compiled.params}]" + except Exception: + return f"(failed to compile: {e})" + + class PipelineRunsApiService_Sql: PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name" + def _build_annotation_filter( + self, + *, + filter: TextFilter, + column: sql.ColumnElement[str], + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy condition for a TextFilter. + + Args: + filter: TextFilter with operator, text/texts, and optional negate flag. + column: The SQLAlchemy column to apply the filter to. + + Returns: + A SQLAlchemy condition expression for the filter. + """ + if filter.operator == FilterOperator.EQUALS: + if filter.text is None: + raise ValueError("EQUALS operator requires 'text' to be set") + condition = column == filter.text + elif filter.operator == FilterOperator.CONTAINS: + if filter.text is None: + raise ValueError("CONTAINS operator requires 'text' to be set") + condition = column.contains(filter.text) + elif filter.operator == FilterOperator.IN_SET: + if not filter.texts: + raise ValueError("IN_SET operator requires 'texts' to be set") + condition = column.in_(filter.texts) + else: + raise ValueError(f"Unknown FilterOperator: {filter.operator}") + + if filter.negate: + condition = ~condition + + return condition + + def _build_annotation_where_clauses( + self, + *, + filters: AnnotationFilterGroup | None, + ) -> list[sql.ColumnElement[bool]]: + """Build WHERE clauses from AnnotationFilterGroup. + + For annotation filters, each AnnotationFilter creates an EXISTS subquery + that checks the key (and optionally value) on the SAME annotation row. + Multiple AnnotationFilters are combined with the group operator (OR by default). + + Example: filters=[AF(key="env", value="prod"), AF(key="team", value="ml")] + Produces: EXISTS(...key='env' AND value='prod') OR EXISTS(...key='team' AND value='ml') + """ + where_clauses: list[sql.ColumnElement[bool]] = [] + + if filters is None or not filters.filters: + return where_clauses + + # Build EXISTS clause for each AnnotationFilter + exists_clauses: list[sql.ColumnElement[bool]] = [] + for af in filters.filters: + # Base subquery joining to parent PipelineRun + subquery = sql.select(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id + ) + + # Add key filter (required) + subquery = subquery.where( + self._build_annotation_filter( + filter=af.key, column=bts.PipelineRunAnnotation.key + ) + ) + + # Add value filter (optional) - same row semantics + if af.value is not None: + subquery = subquery.where( + self._build_annotation_filter( + filter=af.value, column=bts.PipelineRunAnnotation.value + ) + ) + + exists_clauses.append(subquery.exists()) + + # Combine EXISTS clauses with group operator + if len(exists_clauses) == 1: + where_clauses.append(exists_clauses[0]) + elif filters.operator == GroupOperator.AND: + where_clauses.append(sql.and_(*exists_clauses)) + else: + # Default to OR if operator is not specified + where_clauses.append(sql.or_(*exists_clauses)) + + return where_clauses + def create( self, session: orm.Session, @@ -162,12 +365,107 @@ def list( *, session: orm.Session, page_token: str | None = None, - # page_size: int = 10, filter: str | None = None, current_user: str | None = None, + annotation_filter: str | None = None, include_pipeline_names: bool = False, include_execution_stats: bool = False, + debug_where_clause: bool = False, ) -> ListPipelineJobsResponse: + """List pipeline runs with optional filtering. + + **Parameters:** + + - **filter**: Simple key:value filter string (e.g., "created_by:alice") + - **annotation_filter**: JSON string for annotation filtering + - **page_token**: Pagination token from previous response + - **include_pipeline_names**: Include pipeline names in response + - **include_execution_stats**: Include execution statistics + - **debug_where_clause**: If True, includes compiled SQL WHERE clause + + **Returns:** `ListPipelineJobsResponse` with matching pipeline runs. + + --- + + ## API Usage + + ### Annotation Filter JSON Format + + The `annotation_filter` parameter accepts a JSON-encoded string. + + ### TextFilter Operators + + | Operator | Description | + |----------|-------------| + | `equals` | Exactly matches string (`text` field) | + | `contains` | Contains substring (`text` field) | + | `in_set` | Matches one of multiple values (`texts` field) | + + All filters support `negate: true` to invert the condition. + + --- + + ### Example 1: Key contains substring + + Find runs with annotation key containing "env": + + ```json + {"filters": [{"key": {"operator": "contains", "text": "env"}}]} + ``` + + ### Example 2: Key-value pair with in_set + + Find runs where key is "environment" and value is one of ["prod", "staging"]: + + ```json + { + "filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "in_set", "texts": ["prod", "staging"]} + } + ] + } + ``` + + ### Example 3: Multiple filters with OR (default) + + Find runs where key contains "env" OR key is in ["team", "owner"]: + + ```json + { + "filters": [ + {"key": {"operator": "contains", "text": "env"}}, + {"key": {"operator": "in_set", "texts": ["team", "owner"]}} + ] + } + ``` + + ### Example 4: Multiple filters with AND + + Find runs that have BOTH conditions: + + ```json + { + "operator": "and", + "filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "contains", "text": "prod"} + }, + {"key": {"operator": "contains", "text": "team"}} + ] + } + ``` + + --- + + **Performance Note:** + + WHERE clause ordering: Simple column filters (indexed) are applied first, + then EXISTS subqueries (annotation filters). Database optimizer may reorder, + but explicit ordering documents intent and helps simpler optimizers. + """ page_token_dict = _decode_page_token(page_token) OFFSET_KEY = "offset" offset = page_token_dict.get(OFFSET_KEY, 0) @@ -176,7 +474,18 @@ def list( FILTER_KEY = "filter" if page_token: filter = page_token_dict.get(FILTER_KEY, None) - where_clauses = [] + + # Parse annotation_filter JSON string if provided + parsed_annotation_filter: AnnotationFilterGroup | None = None + if annotation_filter: + try: + parsed_annotation_filter = AnnotationFilterGroup.model_validate_json( + annotation_filter + ) + except pydantic.ValidationError as e: + raise ApiServiceError(f"Invalid annotation_filter: {e}") + + where_clauses: list[sql.ColumnElement[bool]] = [] parsed_filter = _parse_filter(filter) if filter else {} for key, value in parsed_filter.items(): if key == "_text": @@ -184,9 +493,6 @@ def list( elif key == "created_by": if value == "me": if current_user is None: - # raise ApiServiceError( - # f"The `created_by:me` filter requires `current_user`." - # ) current_user = "" value = current_user # TODO: Maybe make this a bit more robust. @@ -200,6 +506,13 @@ def list( where_clauses.append(bts.PipelineRun.created_by == None) else: raise NotImplementedError(f"Unsupported filter {filter}.") + + if parsed_annotation_filter: + annotation_clauses = self._build_annotation_where_clauses( + filters=parsed_annotation_filter + ) + where_clauses.extend(annotation_clauses) + pipeline_runs = list( session.scalars( sql.select(bts.PipelineRun) @@ -209,6 +522,7 @@ def list( .limit(page_size) ).all() ) + next_page_offset = offset + page_size next_page_token_dict = {OFFSET_KEY: next_page_offset, FILTER_KEY: filter} next_page_token = _encode_page_token(next_page_token_dict) @@ -246,7 +560,7 @@ def create_pipeline_run_response( } return response - return ListPipelineJobsResponse( + response = ListPipelineJobsResponse( pipeline_runs=[ create_pipeline_run_response(pipeline_run) for pipeline_run in pipeline_runs @@ -254,6 +568,14 @@ def create_pipeline_run_response( next_page_token=next_page_token, ) + if debug_where_clause: + response.debug_where_clause = _compile_where_clauses_to_string( + session=session, + where_clauses=where_clauses, + ) + + return response + def _calculate_execution_status_stats( self, session: orm.Session, root_execution_id: bts.IdType ) -> dict[bts.ContainerExecutionStatus, int]: diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index af16b3c..f8df624 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -2,7 +2,7 @@ import datetime import enum import typing -from typing import Any +from typing import Any, Final import sqlalchemy as sql from sqlalchemy import orm @@ -64,7 +64,7 @@ def generate_unique_id() -> str: # # Needed to put a union type into DB # class SqlIOTypeStruct(_BaseModel): -# type: structures.TypeSpecType +# type: structures.TypeSpecType # No. We'll represent TypeSpecType as name:str + properties:dict # Supported cases: # * type: "name" @@ -466,6 +466,11 @@ class ContainerExecution(_TableBase): ) +PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: Final[str] = ( + "ix_pipeline_run_annotation_key_value" +) + + class PipelineRunAnnotation(_TableBase): __tablename__ = "pipeline_run_annotation" pipeline_run_id: orm.Mapped[IdType] = orm.mapped_column( @@ -476,3 +481,13 @@ class PipelineRunAnnotation(_TableBase): pipeline_run: orm.Mapped[PipelineRun] = orm.relationship(repr=False, init=False) key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True) value: orm.Mapped[str | None] = orm.mapped_column(default=None) + + __table_args__ = ( + # Index for searching pipeline runs by annotation key/value + # Enables efficient queries like "find runs where key='environment' and value='production'" + sql.Index( + PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME, + "key", + "value", + ), + ) diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index 3d94ed1..dd22bea 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -77,3 +77,9 @@ def migrate_db(db_engine: sqlalchemy.Engine): for index in bts.ExecutionNode.__table__.indexes: if index.name == "ix_execution_node_container_execution_cache_key": index.create(db_engine, checkfirst=True) + + # Migration for annotation filtering feature + for index in bts.PipelineRunAnnotation.__table__.indexes: + if index.name == bts.PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: + index.create(db_engine, checkfirst=True) + break diff --git a/pyproject.toml b/pyproject.toml index 5f68f78..9363e1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ [dependency-groups] dev = [ + "black>=26.1.0", "pytest>=8.4.2", ] huggingface = [ @@ -23,3 +24,20 @@ huggingface = [ [tool.setuptools.packages.find] include = ["cloud_pipelines_backend*"] namespaces = true + +[tool.black] +# Usage: uv run black . +# +# TODO: Line length per Google Python Style Guide: +# https://google.github.io/styleguide/pyguide.html#32-line-length +# line-length = 80 +target-version = ["py310"] +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.venv + | build + | dist +)/ +''' diff --git a/tests/test_pipeline_run_search.py b/tests/test_pipeline_run_search.py new file mode 100644 index 0000000..5f167cb --- /dev/null +++ b/tests/test_pipeline_run_search.py @@ -0,0 +1,439 @@ +"""Tests for the pipeline run list() annotation filtering functionality. + +These tests document the JSON string format expected by the API and verify +the WHERE clause generation for annotation filters. +""" + +import json + +from sqlalchemy import orm +import pytest + +from cloud_pipelines_backend import api_server_sql +from cloud_pipelines_backend import database_ops + + +def _initialize_db_and_get_session_factory(): + """Initialize an in-memory SQLite database and return a session factory.""" + db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") + return lambda: orm.Session(bind=db_engine) + + +class TestPipelineRunListAnnotationFilter: + """Tests for PipelineRunsApiService_Sql.list() annotation filtering. + + Each test shows the JSON string input format and the expected SQL WHERE clause. + """ + + def test_list_with_no_annotation_filter(self): + """Test list with annotation_filter=None returns all pipeline runs.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + response = service.list( + session=session, + annotation_filter=None, + debug_where_clause=True, + ) + + assert response.debug_where_clause == "(no where clauses)" + assert response.pipeline_runs == [] + + def test_list_with_key_contains_filter_json(self): + """Test annotation filtering with JSON string: key CONTAINS operator. + + JSON Input: + {"filters": [{"key": {"operator": "contains", "text": "env"}}]} + + Expected WHERE clause: + EXISTS (... WHERE key LIKE '%env%') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + # JSON string input (as received from API) + json_input = json.dumps( + {"filters": [{"key": {"operator": "contains", "text": "env"}}]} + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND (pipeline_run_annotation.\"key\" LIKE '%' || 'env' || '%'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_equals_filter_json(self): + """Test annotation filtering with JSON string: key EQUALS operator. + + JSON Input: + {"filters": [{"key": {"operator": "equals", "text": "environment"}}]} + + Expected WHERE clause: + EXISTS (... WHERE key = 'environment') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + {"filters": [{"key": {"operator": "equals", "text": "environment"}}]} + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment')" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_equals_negate_filter_json(self): + """Test annotation filtering with JSON string: key EQUALS with negate=true. + + JSON Input: + {"filters": [{"key": {"operator": "equals", "text": "environment", "negate": true}}]} + + Expected WHERE clause: + EXISTS (... WHERE key != 'environment') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "filters": [ + { + "key": { + "operator": "equals", + "text": "environment", + "negate": True, + } + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" != 'environment')" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_in_set_filter_json(self): + """Test annotation filtering with JSON string: key IN_SET operator. + + JSON Input: + {"filters": [{"key": {"operator": "in_set", "texts": ["environment", "team"]}}]} + + Expected WHERE clause: + EXISTS (... WHERE key IN ('environment', 'team')) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "filters": [ + { + "key": { + "operator": "in_set", + "texts": ["environment", "team"], + } + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" IN ('environment', 'team'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_and_value_filter_json(self): + """Test annotation filtering with JSON string: key AND value on same row. + + JSON Input: + { + "filters": [{ + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"} + }] + } + + Expected WHERE clause: + EXISTS (... WHERE key = 'environment' AND value = 'production') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"}, + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production')" + ) + assert response.debug_where_clause == expected + + def test_list_with_multiple_filters_or_json(self): + """Test annotation filtering with JSON string: multiple filters with OR (default). + + JSON Input: + { + "filters": [ + {"key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"}}, + {"key": {"operator": "equals", "text": "team"}, + "value": {"operator": "equals", "text": "backend"}} + ] + } + + Expected WHERE clause: + (EXISTS ... env=prod) OR (EXISTS ... team=backend) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"}, + }, + { + "key": {"operator": "equals", "text": "team"}, + "value": {"operator": "equals", "text": "backend"}, + }, + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production')) " + "OR " + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'team' " + "AND pipeline_run_annotation.value = 'backend'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_multiple_filters_and_json(self): + """Test annotation filtering with JSON string: multiple filters with AND. + + JSON Input: + { + "operator": "and", + "filters": [ + {"key": {"operator": "contains", "text": "env"}, + "value": {"operator": "in_set", "texts": ["prod", "staging"]}}, + {"key": {"operator": "equals", "text": "team"}, + "value": {"operator": "contains", "text": "test", "negate": true}} + ] + } + + Expected WHERE clause: + (EXISTS ... env LIKE %env% AND value IN (...)) + AND + (EXISTS ... team AND value NOT LIKE %test%) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "operator": "and", + "filters": [ + { + "key": {"operator": "contains", "text": "env"}, + "value": { + "operator": "in_set", + "texts": ["prod", "staging"], + }, + }, + { + "key": {"operator": "equals", "text": "team"}, + "value": { + "operator": "contains", + "text": "test", + "negate": True, + }, + }, + ], + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND (pipeline_run_annotation.\"key\" LIKE '%' || 'env' || '%') " + "AND pipeline_run_annotation.value IN ('prod', 'staging'))) " + "AND " + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'team' " + "AND (pipeline_run_annotation.value NOT LIKE '%' || 'test' || '%')))" + ) + assert response.debug_where_clause == expected + + def test_list_with_user_filter_and_annotation_filter(self): + """Test WHERE clause ordering: user filter BEFORE annotation filter. + + This test verifies that simple column filters (indexed) come before + EXISTS subqueries (annotation filters) for performance optimization. + + Filter input: created_by:alice + Annotation filter: {"filters": [{"key": {"operator": "equals", "text": "env"}}]} + + Expected WHERE clause order: + 1. pipeline_run.created_by = 'alice' (simple, indexed) + 2. EXISTS (...) (subquery) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + {"filters": [{"key": {"operator": "equals", "text": "env"}}]} + ) + + response = service.list( + session=session, + filter="created_by:alice", + annotation_filter=json_input, + debug_where_clause=True, + ) + + # Verify the WHERE clause contains both filters + # User filter should come BEFORE annotation filter + assert "created_by = 'alice'" in response.debug_where_clause + assert "EXISTS" in response.debug_where_clause + + # Verify ordering: created_by appears before EXISTS + created_by_pos = response.debug_where_clause.find("created_by") + exists_pos = response.debug_where_clause.find("EXISTS") + assert ( + created_by_pos < exists_pos + ), "User filter should appear before annotation filter in WHERE clause" + + def test_list_with_invalid_json_raises_error(self): + """Test that invalid JSON in annotation_filter raises ApiServiceError.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + with pytest.raises(api_server_sql.ApiServiceError) as exc_info: + service.list( + session=session, + annotation_filter="not valid json", + debug_where_clause=True, + ) + + # Pydantic's error message includes "Invalid JSON" + assert "Invalid annotation_filter" in str(exc_info.value) + assert "Invalid JSON" in str(exc_info.value) + + def test_list_with_missing_filters_key_raises_error(self): + """Test that JSON without 'filters' key raises ApiServiceError.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + with pytest.raises(api_server_sql.ApiServiceError) as exc_info: + service.list( + session=session, + annotation_filter='{"operator": "and"}', + debug_where_clause=True, + ) + + # Pydantic's error message indicates 'filters' field is required + assert "Invalid annotation_filter" in str(exc_info.value) + assert "filters" in str(exc_info.value) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From 6e4206ee830028cca0a3575de0244a44500316f2 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 27 Jan 2026 15:34:20 -0800 Subject: [PATCH 2/6] Added negate example to API --- cloud_pipelines_backend/api_server_sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 0667bd1..bd6d62e 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -428,15 +428,15 @@ def list( } ``` - ### Example 3: Multiple filters with OR (default) + ### Example 3: Multiple filters with OR (default) + negate - Find runs where key contains "env" OR key is in ["team", "owner"]: + Find runs where key contains "env" OR key is NOT "deprecated": ```json { "filters": [ {"key": {"operator": "contains", "text": "env"}}, - {"key": {"operator": "in_set", "texts": ["team", "owner"]}} + {"key": {"operator": "equals", "text": "deprecated", "negate": true}} ] } ``` From c418d27701219116d43b7b3c8ee412710480f20a Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 27 Jan 2026 16:04:04 -0800 Subject: [PATCH 3/6] Refactored filters to be annotation_filters --- cloud_pipelines_backend/api_server_sql.py | 24 ++++++------ tests/test_pipeline_run_search.py | 48 +++++++++++++---------- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index bd6d62e..86b3ab2 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -92,19 +92,19 @@ class AnnotationFilterGroup(pydantic.BaseModel): The group operator determines how multiple filters are combined. Examples: - - AnnotationFilterGroup(filters=[...]) + - AnnotationFilterGroup(annotation_filters=[...]) → Defaults to OR logic (match ANY filter) - - AnnotationFilterGroup(operator=GroupOperator.AND, filters=[...]) + - AnnotationFilterGroup(operator=GroupOperator.AND, annotation_filters=[...]) → All filters must match (match ALL filters) - - AnnotationFilterGroup(operator=GroupOperator.OR, filters=[...]) + - AnnotationFilterGroup(operator=GroupOperator.OR, annotation_filters=[...]) → At least one filter must match SQL Pattern: - filters=[AF(key="env", value="prod"), AF(key="team")] + annotation_filters=[AF(key="env", value="prod"), AF(key="team")] → EXISTS(key='env' AND value='prod') OR EXISTS(key='team') """ - filters: list[AnnotationFilter] + annotation_filters: list[AnnotationFilter] # Operator defaults to None, which is treated as OR logic. operator: GroupOperator | None = None @@ -231,17 +231,17 @@ def _build_annotation_where_clauses( that checks the key (and optionally value) on the SAME annotation row. Multiple AnnotationFilters are combined with the group operator (OR by default). - Example: filters=[AF(key="env", value="prod"), AF(key="team", value="ml")] + Example: annotation_filters=[AF(key="env", value="prod"), AF(key="team", value="ml")] Produces: EXISTS(...key='env' AND value='prod') OR EXISTS(...key='team' AND value='ml') """ where_clauses: list[sql.ColumnElement[bool]] = [] - if filters is None or not filters.filters: + if filters is None or not filters.annotation_filters: return where_clauses # Build EXISTS clause for each AnnotationFilter exists_clauses: list[sql.ColumnElement[bool]] = [] - for af in filters.filters: + for af in filters.annotation_filters: # Base subquery joining to parent PipelineRun subquery = sql.select(bts.PipelineRunAnnotation).where( bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id @@ -410,7 +410,7 @@ def list( Find runs with annotation key containing "env": ```json - {"filters": [{"key": {"operator": "contains", "text": "env"}}]} + {"annotation_filters": [{"key": {"operator": "contains", "text": "env"}}]} ``` ### Example 2: Key-value pair with in_set @@ -419,7 +419,7 @@ def list( ```json { - "filters": [ + "annotation_filters": [ { "key": {"operator": "equals", "text": "environment"}, "value": {"operator": "in_set", "texts": ["prod", "staging"]} @@ -434,7 +434,7 @@ def list( ```json { - "filters": [ + "annotation_filters": [ {"key": {"operator": "contains", "text": "env"}}, {"key": {"operator": "equals", "text": "deprecated", "negate": true}} ] @@ -448,7 +448,7 @@ def list( ```json { "operator": "and", - "filters": [ + "annotation_filters": [ { "key": {"operator": "equals", "text": "environment"}, "value": {"operator": "contains", "text": "prod"} diff --git a/tests/test_pipeline_run_search.py b/tests/test_pipeline_run_search.py index 5f167cb..a80d67a 100644 --- a/tests/test_pipeline_run_search.py +++ b/tests/test_pipeline_run_search.py @@ -44,7 +44,7 @@ def test_list_with_key_contains_filter_json(self): """Test annotation filtering with JSON string: key CONTAINS operator. JSON Input: - {"filters": [{"key": {"operator": "contains", "text": "env"}}]} + {"annotation_filters": [{"key": {"operator": "contains", "text": "env"}}]} Expected WHERE clause: EXISTS (... WHERE key LIKE '%env%') @@ -55,7 +55,11 @@ def test_list_with_key_contains_filter_json(self): with session_factory() as session: # JSON string input (as received from API) json_input = json.dumps( - {"filters": [{"key": {"operator": "contains", "text": "env"}}]} + { + "annotation_filters": [ + {"key": {"operator": "contains", "text": "env"}} + ] + } ) response = service.list( @@ -77,7 +81,7 @@ def test_list_with_key_equals_filter_json(self): """Test annotation filtering with JSON string: key EQUALS operator. JSON Input: - {"filters": [{"key": {"operator": "equals", "text": "environment"}}]} + {"annotation_filters": [{"key": {"operator": "equals", "text": "environment"}}]} Expected WHERE clause: EXISTS (... WHERE key = 'environment') @@ -87,7 +91,11 @@ def test_list_with_key_equals_filter_json(self): with session_factory() as session: json_input = json.dumps( - {"filters": [{"key": {"operator": "equals", "text": "environment"}}]} + { + "annotation_filters": [ + {"key": {"operator": "equals", "text": "environment"}} + ] + } ) response = service.list( @@ -109,7 +117,7 @@ def test_list_with_key_equals_negate_filter_json(self): """Test annotation filtering with JSON string: key EQUALS with negate=true. JSON Input: - {"filters": [{"key": {"operator": "equals", "text": "environment", "negate": true}}]} + {"annotation_filters": [{"key": {"operator": "equals", "text": "environment", "negate": true}}]} Expected WHERE clause: EXISTS (... WHERE key != 'environment') @@ -120,7 +128,7 @@ def test_list_with_key_equals_negate_filter_json(self): with session_factory() as session: json_input = json.dumps( { - "filters": [ + "annotation_filters": [ { "key": { "operator": "equals", @@ -151,7 +159,7 @@ def test_list_with_key_in_set_filter_json(self): """Test annotation filtering with JSON string: key IN_SET operator. JSON Input: - {"filters": [{"key": {"operator": "in_set", "texts": ["environment", "team"]}}]} + {"annotation_filters": [{"key": {"operator": "in_set", "texts": ["environment", "team"]}}]} Expected WHERE clause: EXISTS (... WHERE key IN ('environment', 'team')) @@ -162,7 +170,7 @@ def test_list_with_key_in_set_filter_json(self): with session_factory() as session: json_input = json.dumps( { - "filters": [ + "annotation_filters": [ { "key": { "operator": "in_set", @@ -193,7 +201,7 @@ def test_list_with_key_and_value_filter_json(self): JSON Input: { - "filters": [{ + "annotation_filters": [{ "key": {"operator": "equals", "text": "environment"}, "value": {"operator": "equals", "text": "production"} }] @@ -208,7 +216,7 @@ def test_list_with_key_and_value_filter_json(self): with session_factory() as session: json_input = json.dumps( { - "filters": [ + "annotation_filters": [ { "key": {"operator": "equals", "text": "environment"}, "value": {"operator": "equals", "text": "production"}, @@ -238,7 +246,7 @@ def test_list_with_multiple_filters_or_json(self): JSON Input: { - "filters": [ + "annotation_filters": [ {"key": {"operator": "equals", "text": "environment"}, "value": {"operator": "equals", "text": "production"}}, {"key": {"operator": "equals", "text": "team"}, @@ -255,7 +263,7 @@ def test_list_with_multiple_filters_or_json(self): with session_factory() as session: json_input = json.dumps( { - "filters": [ + "annotation_filters": [ { "key": {"operator": "equals", "text": "environment"}, "value": {"operator": "equals", "text": "production"}, @@ -297,7 +305,7 @@ def test_list_with_multiple_filters_and_json(self): JSON Input: { "operator": "and", - "filters": [ + "annotation_filters": [ {"key": {"operator": "contains", "text": "env"}, "value": {"operator": "in_set", "texts": ["prod", "staging"]}}, {"key": {"operator": "equals", "text": "team"}, @@ -317,7 +325,7 @@ def test_list_with_multiple_filters_and_json(self): json_input = json.dumps( { "operator": "and", - "filters": [ + "annotation_filters": [ { "key": {"operator": "contains", "text": "env"}, "value": { @@ -367,7 +375,7 @@ def test_list_with_user_filter_and_annotation_filter(self): EXISTS subqueries (annotation filters) for performance optimization. Filter input: created_by:alice - Annotation filter: {"filters": [{"key": {"operator": "equals", "text": "env"}}]} + Annotation filter: {"annotation_filters": [{"key": {"operator": "equals", "text": "env"}}]} Expected WHERE clause order: 1. pipeline_run.created_by = 'alice' (simple, indexed) @@ -378,7 +386,7 @@ def test_list_with_user_filter_and_annotation_filter(self): with session_factory() as session: json_input = json.dumps( - {"filters": [{"key": {"operator": "equals", "text": "env"}}]} + {"annotation_filters": [{"key": {"operator": "equals", "text": "env"}}]} ) response = service.list( @@ -417,8 +425,8 @@ def test_list_with_invalid_json_raises_error(self): assert "Invalid annotation_filter" in str(exc_info.value) assert "Invalid JSON" in str(exc_info.value) - def test_list_with_missing_filters_key_raises_error(self): - """Test that JSON without 'filters' key raises ApiServiceError.""" + def test_list_with_missing_annotation_filters_key_raises_error(self): + """Test that JSON without 'annotation_filters' key raises ApiServiceError.""" session_factory = _initialize_db_and_get_session_factory() service = api_server_sql.PipelineRunsApiService_Sql() @@ -430,9 +438,9 @@ def test_list_with_missing_filters_key_raises_error(self): debug_where_clause=True, ) - # Pydantic's error message indicates 'filters' field is required + # Pydantic's error message indicates 'annotation_filters' field is required assert "Invalid annotation_filter" in str(exc_info.value) - assert "filters" in str(exc_info.value) + assert "annotation_filters" in str(exc_info.value) if __name__ == "__main__": From 1cac643ea6d8f0faa16089318599200195a20fb8 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 27 Jan 2026 16:10:59 -0800 Subject: [PATCH 4/6] Update comment --- cloud_pipelines_backend/api_server_sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 86b3ab2..e1e91c4 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -141,7 +141,7 @@ class ListPipelineJobsResponse: pipeline_runs: list[PipelineRunResponse] next_page_token: str | None = None debug_where_clause: str | None = ( - None # Populated when debug_where_clause=True in search() + None # Populated when debug_where_clause=True in list() ) From a6ea563fade310a951bc6cbc1e0813c4febf1db5 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 27 Jan 2026 16:22:29 -0800 Subject: [PATCH 5/6] Created function apply_annotation_filter --- cloud_pipelines_backend/api_server_sql.py | 44 +++++++++++++++-------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e1e91c4..3bf139c 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -275,6 +275,31 @@ def _build_annotation_where_clauses( return where_clauses + def _apply_annotation_filter( + self, + *, + annotation_filter: str | None, + where_clauses: list[sql.ColumnElement[bool]], + ) -> None: + """Parse annotation_filter JSON and add WHERE clauses. + + Args: + annotation_filter: JSON string for annotation filtering, or None. + where_clauses: List to append WHERE clauses to (modified in place). + + Raises: + ApiServiceError: If the JSON is invalid or malformed. + """ + if not annotation_filter: + return + + try: + parsed = AnnotationFilterGroup.model_validate_json(annotation_filter) + annotation_clauses = self._build_annotation_where_clauses(filters=parsed) + where_clauses.extend(annotation_clauses) + except pydantic.ValidationError as e: + raise ApiServiceError(f"Invalid annotation_filter: {e}") + def create( self, session: orm.Session, @@ -475,16 +500,6 @@ def list( if page_token: filter = page_token_dict.get(FILTER_KEY, None) - # Parse annotation_filter JSON string if provided - parsed_annotation_filter: AnnotationFilterGroup | None = None - if annotation_filter: - try: - parsed_annotation_filter = AnnotationFilterGroup.model_validate_json( - annotation_filter - ) - except pydantic.ValidationError as e: - raise ApiServiceError(f"Invalid annotation_filter: {e}") - where_clauses: list[sql.ColumnElement[bool]] = [] parsed_filter = _parse_filter(filter) if filter else {} for key, value in parsed_filter.items(): @@ -507,11 +522,10 @@ def list( else: raise NotImplementedError(f"Unsupported filter {filter}.") - if parsed_annotation_filter: - annotation_clauses = self._build_annotation_where_clauses( - filters=parsed_annotation_filter - ) - where_clauses.extend(annotation_clauses) + self._apply_annotation_filter( + annotation_filter=annotation_filter, + where_clauses=where_clauses, + ) pipeline_runs = list( session.scalars( From 3adc82be911876077af4e5a7edffe9f539724097 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Wed, 28 Jan 2026 13:14:17 -0800 Subject: [PATCH 6/6] Test locally end-2-end --- cloud_pipelines_backend/api_server_sql.py | 64 ++++++++++-- tests/test_pipeline_run_search.py | 116 +++++++++++++++++++++- 2 files changed, 169 insertions(+), 11 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 3bf139c..7801e0a 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -233,6 +233,15 @@ def _build_annotation_where_clauses( Example: annotation_filters=[AF(key="env", value="prod"), AF(key="team", value="ml")] Produces: EXISTS(...key='env' AND value='prod') OR EXISTS(...key='team' AND value='ml') + + Negate Behavior: + + | key.negate | value.negate | SQL | Result | + |------------|--------------|-------------------------------------|---------------------------------| + | false | false | EXISTS(key='X' AND value='Y') | Has X=Y | + | false | true | EXISTS(key='X' AND value!='Y') | Has X with value not Y | + | true | false | NOT EXISTS(key='X' AND value='Y') | Doesn't have X=Y | + | true | true | NOT EXISTS(key='X' AND value='Y') | Same (value.negate ignored) | """ where_clauses: list[sql.ColumnElement[bool]] = [] @@ -240,6 +249,9 @@ def _build_annotation_where_clauses( return where_clauses # Build EXISTS clause for each AnnotationFilter + # key.negate controls EXISTS vs NOT EXISTS + # value.negate negates the condition inside (only when key.negate=false) + # When key.negate=true, value.negate is ignored for intuitive "doesn't have X=Y" behavior exists_clauses: list[sql.ColumnElement[bool]] = [] for af in filters.annotation_filters: # Base subquery joining to parent PipelineRun @@ -247,22 +259,50 @@ def _build_annotation_where_clauses( bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id ) - # Add key filter (required) + # Add key filter WITHOUT negation (always positive match inside subquery) + # key.negate will be applied at the EXISTS level + key_filter_no_negate = TextFilter( + operator=af.key.operator, + text=af.key.text, + texts=af.key.texts, + negate=False, + ) subquery = subquery.where( self._build_annotation_filter( - filter=af.key, column=bts.PipelineRunAnnotation.key + filter=key_filter_no_negate, column=bts.PipelineRunAnnotation.key ) ) - # Add value filter (optional) - same row semantics + # Add value filter (optional) + # When key.negate=true (NOT EXISTS), ignore value.negate for intuitive behavior if af.value is not None: - subquery = subquery.where( - self._build_annotation_filter( - filter=af.value, column=bts.PipelineRunAnnotation.value + if af.key.negate: + # NOT EXISTS: always use positive value match inside + value_filter_no_negate = TextFilter( + operator=af.value.operator, + text=af.value.text, + texts=af.value.texts, + negate=False, + ) + subquery = subquery.where( + self._build_annotation_filter( + filter=value_filter_no_negate, + column=bts.PipelineRunAnnotation.value, + ) + ) + else: + # EXISTS: respect value.negate for "has X with value != Y" queries + subquery = subquery.where( + self._build_annotation_filter( + filter=af.value, column=bts.PipelineRunAnnotation.value + ) ) - ) - exists_clauses.append(subquery.exists()) + # key.negate controls EXISTS vs NOT EXISTS + if af.key.negate: + exists_clauses.append(~subquery.exists()) # NOT EXISTS + else: + exists_clauses.append(subquery.exists()) # EXISTS # Combine EXISTS clauses with group operator if len(exists_clauses) == 1: @@ -428,6 +468,14 @@ def list( All filters support `negate: true` to invert the condition. + **Key vs Value Negation:** + + - `key.negate=true`: Uses `NOT EXISTS` - finds runs that do NOT have matching annotation + - `value.negate=true`: Negates the value condition (only when `key.negate=false`) + + Note: When `key.negate=true`, `value.negate` is ignored for intuitive behavior. + This ensures "doesn't have X=Y" queries work correctly. + --- ### Example 1: Key contains substring diff --git a/tests/test_pipeline_run_search.py b/tests/test_pipeline_run_search.py index a80d67a..274d930 100644 --- a/tests/test_pipeline_run_search.py +++ b/tests/test_pipeline_run_search.py @@ -120,7 +120,10 @@ def test_list_with_key_equals_negate_filter_json(self): {"annotation_filters": [{"key": {"operator": "equals", "text": "environment", "negate": true}}]} Expected WHERE clause: - EXISTS (... WHERE key != 'environment') + NOT EXISTS (... WHERE key = 'environment') + + key.negate=true means "runs that do NOT have this key" -> NOT EXISTS + The condition inside stays positive (key = 'environment'). """ session_factory = _initialize_db_and_get_session_factory() service = api_server_sql.PipelineRunsApiService_Sql() @@ -147,11 +150,11 @@ def test_list_with_key_equals_negate_filter_json(self): ) expected = ( - "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + "NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' "FROM pipeline_run_annotation, pipeline_run \n" "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " - "AND pipeline_run_annotation.\"key\" != 'environment')" + "AND pipeline_run_annotation.\"key\" = 'environment'))" ) assert response.debug_where_clause == expected @@ -241,6 +244,113 @@ def test_list_with_key_and_value_filter_json(self): ) assert response.debug_where_clause == expected + def test_list_with_key_negate_and_value_filter_json(self): + """Test annotation filtering: key negate + value -> NOT EXISTS. + + JSON Input: + { + "annotation_filters": [{ + "key": {"operator": "equals", "text": "environment", "negate": true}, + "value": {"operator": "equals", "text": "production"} + }] + } + + Expected WHERE clause: + NOT EXISTS (... WHERE key = 'environment' AND value = 'production') + + Meaning: runs that do NOT have key=environment with value=production + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": { + "operator": "equals", + "text": "environment", + "negate": True, + }, + "value": {"operator": "equals", "text": "production"}, + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_negate_and_value_negate_filter_json(self): + """Test annotation filtering: key negate + value negate -> NOT EXISTS (value.negate ignored). + + JSON Input: + { + "annotation_filters": [{ + "key": {"operator": "equals", "text": "environment", "negate": true}, + "value": {"operator": "equals", "text": "production", "negate": true} + }] + } + + Expected WHERE clause: + NOT EXISTS (... WHERE key = 'environment' AND value = 'production') + + Note: value.negate is ignored when key.negate=true for intuitive "doesn't have X=Y" behavior. + Meaning: runs that do NOT have key=environment with value=production + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": { + "operator": "equals", + "text": "environment", + "negate": True, + }, + "value": { + "operator": "equals", + "text": "production", + "negate": True, + }, + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production'))" + ) + assert response.debug_where_clause == expected + def test_list_with_multiple_filters_or_json(self): """Test annotation filtering with JSON string: multiple filters with OR (default).