diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..7801e0a 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1,11 +1,14 @@ import base64 import dataclasses import datetime +import enum import json import logging import typing from typing import Any, Optional +import pydantic # type: ignore[import-unresolved] + if typing.TYPE_CHECKING: from cloud_pipelines.orchestration.storage_providers import ( interfaces as storage_provider_interfaces, @@ -31,6 +34,80 @@ def _get_current_time() -> datetime.datetime: from . import errors from .errors import ItemNotFoundError +# ==== Annotation Filter Types for Search ==== + + +class GroupOperator(enum.StrEnum): + """Logical operators for combining filters in a group.""" + + AND = "and" + OR = "or" + + +class FilterOperator(enum.StrEnum): + """Operators for filtering by annotation key or value.""" + + CONTAINS = "contains" # Contains substring + EQUALS = "equals" # Equals exact string + IN_SET = "in_set" # Is in a set of values + + +class TextFilter(pydantic.BaseModel): + """Filter by text pattern. + + Examples: + - TextFilter(operator=FilterOperator.EQUALS, text="environment") + - TextFilter(operator=FilterOperator.CONTAINS, text="env") + - TextFilter(operator=FilterOperator.IN_SET, texts=["prod", "staging"]) + - TextFilter(operator=FilterOperator.EQUALS, text="test", negate=True) + """ + + operator: FilterOperator + text: str | None = None # For EQUALS, CONTAINS operators + texts: list[str] | None = None # For IN_SET operator + negate: bool = False # If True, negates the operation + + +class AnnotationFilter(pydantic.BaseModel): + """Filter annotations by key and optionally by value. + + Examples: + - AnnotationFilter(key=TextFilter(operator=FilterOperator.EQUALS, text="environment")) + → Find runs that have an "environment" annotation key + - AnnotationFilter( + key=TextFilter(operator=FilterOperator.EQUALS, text="environment"), + value=TextFilter(operator=FilterOperator.EQUALS, text="production") + ) + → Find runs with annotation key "environment" and value "production" + """ + + key: TextFilter + value: TextFilter | None = None + + +class AnnotationFilterGroup(pydantic.BaseModel): + """A flat group of AnnotationFilters combined with AND/OR logic. + + Each AnnotationFilter matches a single annotation row (key + optional value). + The group operator determines how multiple filters are combined. + + Examples: + - AnnotationFilterGroup(annotation_filters=[...]) + → Defaults to OR logic (match ANY filter) + - AnnotationFilterGroup(operator=GroupOperator.AND, annotation_filters=[...]) + → All filters must match (match ALL filters) + - AnnotationFilterGroup(operator=GroupOperator.OR, annotation_filters=[...]) + → At least one filter must match + + SQL Pattern: + annotation_filters=[AF(key="env", value="prod"), AF(key="team")] + → EXISTS(key='env' AND value='prod') OR EXISTS(key='team') + """ + + annotation_filters: list[AnnotationFilter] + # Operator defaults to None, which is treated as OR logic. + operator: GroupOperator | None = None + # ==== PipelineJobService @dataclasses.dataclass(kw_only=True) @@ -63,15 +140,206 @@ class GetPipelineRunResponse(PipelineRunResponse): class ListPipelineJobsResponse: pipeline_runs: list[PipelineRunResponse] next_page_token: str | None = None + debug_where_clause: str | None = ( + None # Populated when debug_where_clause=True in list() + ) import sqlalchemy as sql from sqlalchemy import orm +def _compile_where_clauses_to_string( + session: orm.Session, + where_clauses: list[sql.ColumnElement[bool]], +) -> str: + """Compile WHERE clauses to a SQL string for debugging. + + Uses the dialect from the session's engine (SQLite, MySQL, etc.) + and inlines literal values for readability. + """ + if not where_clauses: + return "(no where clauses)" + + # Combine all clauses with AND + combined = sql.and_(*where_clauses) if len(where_clauses) > 1 else where_clauses[0] + + # Get dialect from session's engine + dialect = session.bind.dialect if session.bind else None + + try: + compiled = combined.compile( + dialect=dialect, + compile_kwargs={"literal_binds": True}, + ) + return str(compiled) + except Exception as e: + # Fallback if literal_binds fails (e.g., for complex types) + try: + compiled = combined.compile(dialect=dialect) + return f"{compiled} [params: {compiled.params}]" + except Exception: + return f"(failed to compile: {e})" + + class PipelineRunsApiService_Sql: PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name" + def _build_annotation_filter( + self, + *, + filter: TextFilter, + column: sql.ColumnElement[str], + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy condition for a TextFilter. + + Args: + filter: TextFilter with operator, text/texts, and optional negate flag. + column: The SQLAlchemy column to apply the filter to. + + Returns: + A SQLAlchemy condition expression for the filter. + """ + if filter.operator == FilterOperator.EQUALS: + if filter.text is None: + raise ValueError("EQUALS operator requires 'text' to be set") + condition = column == filter.text + elif filter.operator == FilterOperator.CONTAINS: + if filter.text is None: + raise ValueError("CONTAINS operator requires 'text' to be set") + condition = column.contains(filter.text) + elif filter.operator == FilterOperator.IN_SET: + if not filter.texts: + raise ValueError("IN_SET operator requires 'texts' to be set") + condition = column.in_(filter.texts) + else: + raise ValueError(f"Unknown FilterOperator: {filter.operator}") + + if filter.negate: + condition = ~condition + + return condition + + def _build_annotation_where_clauses( + self, + *, + filters: AnnotationFilterGroup | None, + ) -> list[sql.ColumnElement[bool]]: + """Build WHERE clauses from AnnotationFilterGroup. + + For annotation filters, each AnnotationFilter creates an EXISTS subquery + that checks the key (and optionally value) on the SAME annotation row. + Multiple AnnotationFilters are combined with the group operator (OR by default). + + Example: annotation_filters=[AF(key="env", value="prod"), AF(key="team", value="ml")] + Produces: EXISTS(...key='env' AND value='prod') OR EXISTS(...key='team' AND value='ml') + + Negate Behavior: + + | key.negate | value.negate | SQL | Result | + |------------|--------------|-------------------------------------|---------------------------------| + | false | false | EXISTS(key='X' AND value='Y') | Has X=Y | + | false | true | EXISTS(key='X' AND value!='Y') | Has X with value not Y | + | true | false | NOT EXISTS(key='X' AND value='Y') | Doesn't have X=Y | + | true | true | NOT EXISTS(key='X' AND value='Y') | Same (value.negate ignored) | + """ + where_clauses: list[sql.ColumnElement[bool]] = [] + + if filters is None or not filters.annotation_filters: + return where_clauses + + # Build EXISTS clause for each AnnotationFilter + # key.negate controls EXISTS vs NOT EXISTS + # value.negate negates the condition inside (only when key.negate=false) + # When key.negate=true, value.negate is ignored for intuitive "doesn't have X=Y" behavior + exists_clauses: list[sql.ColumnElement[bool]] = [] + for af in filters.annotation_filters: + # Base subquery joining to parent PipelineRun + subquery = sql.select(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id + ) + + # Add key filter WITHOUT negation (always positive match inside subquery) + # key.negate will be applied at the EXISTS level + key_filter_no_negate = TextFilter( + operator=af.key.operator, + text=af.key.text, + texts=af.key.texts, + negate=False, + ) + subquery = subquery.where( + self._build_annotation_filter( + filter=key_filter_no_negate, column=bts.PipelineRunAnnotation.key + ) + ) + + # Add value filter (optional) + # When key.negate=true (NOT EXISTS), ignore value.negate for intuitive behavior + if af.value is not None: + if af.key.negate: + # NOT EXISTS: always use positive value match inside + value_filter_no_negate = TextFilter( + operator=af.value.operator, + text=af.value.text, + texts=af.value.texts, + negate=False, + ) + subquery = subquery.where( + self._build_annotation_filter( + filter=value_filter_no_negate, + column=bts.PipelineRunAnnotation.value, + ) + ) + else: + # EXISTS: respect value.negate for "has X with value != Y" queries + subquery = subquery.where( + self._build_annotation_filter( + filter=af.value, column=bts.PipelineRunAnnotation.value + ) + ) + + # key.negate controls EXISTS vs NOT EXISTS + if af.key.negate: + exists_clauses.append(~subquery.exists()) # NOT EXISTS + else: + exists_clauses.append(subquery.exists()) # EXISTS + + # Combine EXISTS clauses with group operator + if len(exists_clauses) == 1: + where_clauses.append(exists_clauses[0]) + elif filters.operator == GroupOperator.AND: + where_clauses.append(sql.and_(*exists_clauses)) + else: + # Default to OR if operator is not specified + where_clauses.append(sql.or_(*exists_clauses)) + + return where_clauses + + def _apply_annotation_filter( + self, + *, + annotation_filter: str | None, + where_clauses: list[sql.ColumnElement[bool]], + ) -> None: + """Parse annotation_filter JSON and add WHERE clauses. + + Args: + annotation_filter: JSON string for annotation filtering, or None. + where_clauses: List to append WHERE clauses to (modified in place). + + Raises: + ApiServiceError: If the JSON is invalid or malformed. + """ + if not annotation_filter: + return + + try: + parsed = AnnotationFilterGroup.model_validate_json(annotation_filter) + annotation_clauses = self._build_annotation_where_clauses(filters=parsed) + where_clauses.extend(annotation_clauses) + except pydantic.ValidationError as e: + raise ApiServiceError(f"Invalid annotation_filter: {e}") + def create( self, session: orm.Session, @@ -162,12 +430,115 @@ def list( *, session: orm.Session, page_token: str | None = None, - # page_size: int = 10, filter: str | None = None, current_user: str | None = None, + annotation_filter: str | None = None, include_pipeline_names: bool = False, include_execution_stats: bool = False, + debug_where_clause: bool = False, ) -> ListPipelineJobsResponse: + """List pipeline runs with optional filtering. + + **Parameters:** + + - **filter**: Simple key:value filter string (e.g., "created_by:alice") + - **annotation_filter**: JSON string for annotation filtering + - **page_token**: Pagination token from previous response + - **include_pipeline_names**: Include pipeline names in response + - **include_execution_stats**: Include execution statistics + - **debug_where_clause**: If True, includes compiled SQL WHERE clause + + **Returns:** `ListPipelineJobsResponse` with matching pipeline runs. + + --- + + ## API Usage + + ### Annotation Filter JSON Format + + The `annotation_filter` parameter accepts a JSON-encoded string. + + ### TextFilter Operators + + | Operator | Description | + |----------|-------------| + | `equals` | Exactly matches string (`text` field) | + | `contains` | Contains substring (`text` field) | + | `in_set` | Matches one of multiple values (`texts` field) | + + All filters support `negate: true` to invert the condition. + + **Key vs Value Negation:** + + - `key.negate=true`: Uses `NOT EXISTS` - finds runs that do NOT have matching annotation + - `value.negate=true`: Negates the value condition (only when `key.negate=false`) + + Note: When `key.negate=true`, `value.negate` is ignored for intuitive behavior. + This ensures "doesn't have X=Y" queries work correctly. + + --- + + ### Example 1: Key contains substring + + Find runs with annotation key containing "env": + + ```json + {"annotation_filters": [{"key": {"operator": "contains", "text": "env"}}]} + ``` + + ### Example 2: Key-value pair with in_set + + Find runs where key is "environment" and value is one of ["prod", "staging"]: + + ```json + { + "annotation_filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "in_set", "texts": ["prod", "staging"]} + } + ] + } + ``` + + ### Example 3: Multiple filters with OR (default) + negate + + Find runs where key contains "env" OR key is NOT "deprecated": + + ```json + { + "annotation_filters": [ + {"key": {"operator": "contains", "text": "env"}}, + {"key": {"operator": "equals", "text": "deprecated", "negate": true}} + ] + } + ``` + + ### Example 4: Multiple filters with AND + + Find runs that have BOTH conditions: + + ```json + { + "operator": "and", + "annotation_filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "contains", "text": "prod"} + }, + {"key": {"operator": "contains", "text": "team"}} + ] + } + ``` + + --- + + **Performance Note:** + + WHERE clause ordering: Simple column filters (indexed) are applied first, + then EXISTS subqueries (annotation filters). Database optimizer may reorder, + but explicit ordering documents intent and helps simpler optimizers. + """ page_token_dict = _decode_page_token(page_token) OFFSET_KEY = "offset" offset = page_token_dict.get(OFFSET_KEY, 0) @@ -176,7 +547,8 @@ def list( FILTER_KEY = "filter" if page_token: filter = page_token_dict.get(FILTER_KEY, None) - where_clauses = [] + + where_clauses: list[sql.ColumnElement[bool]] = [] parsed_filter = _parse_filter(filter) if filter else {} for key, value in parsed_filter.items(): if key == "_text": @@ -184,9 +556,6 @@ def list( elif key == "created_by": if value == "me": if current_user is None: - # raise ApiServiceError( - # f"The `created_by:me` filter requires `current_user`." - # ) current_user = "" value = current_user # TODO: Maybe make this a bit more robust. @@ -200,6 +569,12 @@ def list( where_clauses.append(bts.PipelineRun.created_by == None) else: raise NotImplementedError(f"Unsupported filter {filter}.") + + self._apply_annotation_filter( + annotation_filter=annotation_filter, + where_clauses=where_clauses, + ) + pipeline_runs = list( session.scalars( sql.select(bts.PipelineRun) @@ -209,6 +584,7 @@ def list( .limit(page_size) ).all() ) + next_page_offset = offset + page_size next_page_token_dict = {OFFSET_KEY: next_page_offset, FILTER_KEY: filter} next_page_token = _encode_page_token(next_page_token_dict) @@ -246,7 +622,7 @@ def create_pipeline_run_response( } return response - return ListPipelineJobsResponse( + response = ListPipelineJobsResponse( pipeline_runs=[ create_pipeline_run_response(pipeline_run) for pipeline_run in pipeline_runs @@ -254,6 +630,14 @@ def create_pipeline_run_response( next_page_token=next_page_token, ) + if debug_where_clause: + response.debug_where_clause = _compile_where_clauses_to_string( + session=session, + where_clauses=where_clauses, + ) + + return response + def _calculate_execution_status_stats( self, session: orm.Session, root_execution_id: bts.IdType ) -> dict[bts.ContainerExecutionStatus, int]: diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index af16b3c..f8df624 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -2,7 +2,7 @@ import datetime import enum import typing -from typing import Any +from typing import Any, Final import sqlalchemy as sql from sqlalchemy import orm @@ -64,7 +64,7 @@ def generate_unique_id() -> str: # # Needed to put a union type into DB # class SqlIOTypeStruct(_BaseModel): -# type: structures.TypeSpecType +# type: structures.TypeSpecType # No. We'll represent TypeSpecType as name:str + properties:dict # Supported cases: # * type: "name" @@ -466,6 +466,11 @@ class ContainerExecution(_TableBase): ) +PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: Final[str] = ( + "ix_pipeline_run_annotation_key_value" +) + + class PipelineRunAnnotation(_TableBase): __tablename__ = "pipeline_run_annotation" pipeline_run_id: orm.Mapped[IdType] = orm.mapped_column( @@ -476,3 +481,13 @@ class PipelineRunAnnotation(_TableBase): pipeline_run: orm.Mapped[PipelineRun] = orm.relationship(repr=False, init=False) key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True) value: orm.Mapped[str | None] = orm.mapped_column(default=None) + + __table_args__ = ( + # Index for searching pipeline runs by annotation key/value + # Enables efficient queries like "find runs where key='environment' and value='production'" + sql.Index( + PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME, + "key", + "value", + ), + ) diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index 3d94ed1..dd22bea 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -77,3 +77,9 @@ def migrate_db(db_engine: sqlalchemy.Engine): for index in bts.ExecutionNode.__table__.indexes: if index.name == "ix_execution_node_container_execution_cache_key": index.create(db_engine, checkfirst=True) + + # Migration for annotation filtering feature + for index in bts.PipelineRunAnnotation.__table__.indexes: + if index.name == bts.PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: + index.create(db_engine, checkfirst=True) + break diff --git a/pyproject.toml b/pyproject.toml index 5f68f78..9363e1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ [dependency-groups] dev = [ + "black>=26.1.0", "pytest>=8.4.2", ] huggingface = [ @@ -23,3 +24,20 @@ huggingface = [ [tool.setuptools.packages.find] include = ["cloud_pipelines_backend*"] namespaces = true + +[tool.black] +# Usage: uv run black . +# +# TODO: Line length per Google Python Style Guide: +# https://google.github.io/styleguide/pyguide.html#32-line-length +# line-length = 80 +target-version = ["py310"] +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.venv + | build + | dist +)/ +''' diff --git a/tests/test_pipeline_run_search.py b/tests/test_pipeline_run_search.py new file mode 100644 index 0000000..274d930 --- /dev/null +++ b/tests/test_pipeline_run_search.py @@ -0,0 +1,557 @@ +"""Tests for the pipeline run list() annotation filtering functionality. + +These tests document the JSON string format expected by the API and verify +the WHERE clause generation for annotation filters. +""" + +import json + +from sqlalchemy import orm +import pytest + +from cloud_pipelines_backend import api_server_sql +from cloud_pipelines_backend import database_ops + + +def _initialize_db_and_get_session_factory(): + """Initialize an in-memory SQLite database and return a session factory.""" + db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") + return lambda: orm.Session(bind=db_engine) + + +class TestPipelineRunListAnnotationFilter: + """Tests for PipelineRunsApiService_Sql.list() annotation filtering. + + Each test shows the JSON string input format and the expected SQL WHERE clause. + """ + + def test_list_with_no_annotation_filter(self): + """Test list with annotation_filter=None returns all pipeline runs.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + response = service.list( + session=session, + annotation_filter=None, + debug_where_clause=True, + ) + + assert response.debug_where_clause == "(no where clauses)" + assert response.pipeline_runs == [] + + def test_list_with_key_contains_filter_json(self): + """Test annotation filtering with JSON string: key CONTAINS operator. + + JSON Input: + {"annotation_filters": [{"key": {"operator": "contains", "text": "env"}}]} + + Expected WHERE clause: + EXISTS (... WHERE key LIKE '%env%') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + # JSON string input (as received from API) + json_input = json.dumps( + { + "annotation_filters": [ + {"key": {"operator": "contains", "text": "env"}} + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND (pipeline_run_annotation.\"key\" LIKE '%' || 'env' || '%'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_equals_filter_json(self): + """Test annotation filtering with JSON string: key EQUALS operator. + + JSON Input: + {"annotation_filters": [{"key": {"operator": "equals", "text": "environment"}}]} + + Expected WHERE clause: + EXISTS (... WHERE key = 'environment') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + {"key": {"operator": "equals", "text": "environment"}} + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment')" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_equals_negate_filter_json(self): + """Test annotation filtering with JSON string: key EQUALS with negate=true. + + JSON Input: + {"annotation_filters": [{"key": {"operator": "equals", "text": "environment", "negate": true}}]} + + Expected WHERE clause: + NOT EXISTS (... WHERE key = 'environment') + + key.negate=true means "runs that do NOT have this key" -> NOT EXISTS + The condition inside stays positive (key = 'environment'). + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": { + "operator": "equals", + "text": "environment", + "negate": True, + } + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_in_set_filter_json(self): + """Test annotation filtering with JSON string: key IN_SET operator. + + JSON Input: + {"annotation_filters": [{"key": {"operator": "in_set", "texts": ["environment", "team"]}}]} + + Expected WHERE clause: + EXISTS (... WHERE key IN ('environment', 'team')) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": { + "operator": "in_set", + "texts": ["environment", "team"], + } + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" IN ('environment', 'team'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_and_value_filter_json(self): + """Test annotation filtering with JSON string: key AND value on same row. + + JSON Input: + { + "annotation_filters": [{ + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"} + }] + } + + Expected WHERE clause: + EXISTS (... WHERE key = 'environment' AND value = 'production') + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"}, + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production')" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_negate_and_value_filter_json(self): + """Test annotation filtering: key negate + value -> NOT EXISTS. + + JSON Input: + { + "annotation_filters": [{ + "key": {"operator": "equals", "text": "environment", "negate": true}, + "value": {"operator": "equals", "text": "production"} + }] + } + + Expected WHERE clause: + NOT EXISTS (... WHERE key = 'environment' AND value = 'production') + + Meaning: runs that do NOT have key=environment with value=production + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": { + "operator": "equals", + "text": "environment", + "negate": True, + }, + "value": {"operator": "equals", "text": "production"}, + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_key_negate_and_value_negate_filter_json(self): + """Test annotation filtering: key negate + value negate -> NOT EXISTS (value.negate ignored). + + JSON Input: + { + "annotation_filters": [{ + "key": {"operator": "equals", "text": "environment", "negate": true}, + "value": {"operator": "equals", "text": "production", "negate": true} + }] + } + + Expected WHERE clause: + NOT EXISTS (... WHERE key = 'environment' AND value = 'production') + + Note: value.negate is ignored when key.negate=true for intuitive "doesn't have X=Y" behavior. + Meaning: runs that do NOT have key=environment with value=production + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": { + "operator": "equals", + "text": "environment", + "negate": True, + }, + "value": { + "operator": "equals", + "text": "production", + "negate": True, + }, + } + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_multiple_filters_or_json(self): + """Test annotation filtering with JSON string: multiple filters with OR (default). + + JSON Input: + { + "annotation_filters": [ + {"key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"}}, + {"key": {"operator": "equals", "text": "team"}, + "value": {"operator": "equals", "text": "backend"}} + ] + } + + Expected WHERE clause: + (EXISTS ... env=prod) OR (EXISTS ... team=backend) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "annotation_filters": [ + { + "key": {"operator": "equals", "text": "environment"}, + "value": {"operator": "equals", "text": "production"}, + }, + { + "key": {"operator": "equals", "text": "team"}, + "value": {"operator": "equals", "text": "backend"}, + }, + ] + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment' " + "AND pipeline_run_annotation.value = 'production')) " + "OR " + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'team' " + "AND pipeline_run_annotation.value = 'backend'))" + ) + assert response.debug_where_clause == expected + + def test_list_with_multiple_filters_and_json(self): + """Test annotation filtering with JSON string: multiple filters with AND. + + JSON Input: + { + "operator": "and", + "annotation_filters": [ + {"key": {"operator": "contains", "text": "env"}, + "value": {"operator": "in_set", "texts": ["prod", "staging"]}}, + {"key": {"operator": "equals", "text": "team"}, + "value": {"operator": "contains", "text": "test", "negate": true}} + ] + } + + Expected WHERE clause: + (EXISTS ... env LIKE %env% AND value IN (...)) + AND + (EXISTS ... team AND value NOT LIKE %test%) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + { + "operator": "and", + "annotation_filters": [ + { + "key": {"operator": "contains", "text": "env"}, + "value": { + "operator": "in_set", + "texts": ["prod", "staging"], + }, + }, + { + "key": {"operator": "equals", "text": "team"}, + "value": { + "operator": "contains", + "text": "test", + "negate": True, + }, + }, + ], + } + ) + + response = service.list( + session=session, + annotation_filter=json_input, + debug_where_clause=True, + ) + + expected = ( + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND (pipeline_run_annotation.\"key\" LIKE '%' || 'env' || '%') " + "AND pipeline_run_annotation.value IN ('prod', 'staging'))) " + "AND " + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'team' " + "AND (pipeline_run_annotation.value NOT LIKE '%' || 'test' || '%')))" + ) + assert response.debug_where_clause == expected + + def test_list_with_user_filter_and_annotation_filter(self): + """Test WHERE clause ordering: user filter BEFORE annotation filter. + + This test verifies that simple column filters (indexed) come before + EXISTS subqueries (annotation filters) for performance optimization. + + Filter input: created_by:alice + Annotation filter: {"annotation_filters": [{"key": {"operator": "equals", "text": "env"}}]} + + Expected WHERE clause order: + 1. pipeline_run.created_by = 'alice' (simple, indexed) + 2. EXISTS (...) (subquery) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + json_input = json.dumps( + {"annotation_filters": [{"key": {"operator": "equals", "text": "env"}}]} + ) + + response = service.list( + session=session, + filter="created_by:alice", + annotation_filter=json_input, + debug_where_clause=True, + ) + + # Verify the WHERE clause contains both filters + # User filter should come BEFORE annotation filter + assert "created_by = 'alice'" in response.debug_where_clause + assert "EXISTS" in response.debug_where_clause + + # Verify ordering: created_by appears before EXISTS + created_by_pos = response.debug_where_clause.find("created_by") + exists_pos = response.debug_where_clause.find("EXISTS") + assert ( + created_by_pos < exists_pos + ), "User filter should appear before annotation filter in WHERE clause" + + def test_list_with_invalid_json_raises_error(self): + """Test that invalid JSON in annotation_filter raises ApiServiceError.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + with pytest.raises(api_server_sql.ApiServiceError) as exc_info: + service.list( + session=session, + annotation_filter="not valid json", + debug_where_clause=True, + ) + + # Pydantic's error message includes "Invalid JSON" + assert "Invalid annotation_filter" in str(exc_info.value) + assert "Invalid JSON" in str(exc_info.value) + + def test_list_with_missing_annotation_filters_key_raises_error(self): + """Test that JSON without 'annotation_filters' key raises ApiServiceError.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + with pytest.raises(api_server_sql.ApiServiceError) as exc_info: + service.list( + session=session, + annotation_filter='{"operator": "and"}', + debug_where_clause=True, + ) + + # Pydantic's error message indicates 'annotation_filters' field is required + assert "Invalid annotation_filter" in str(exc_info.value) + assert "annotation_filters" in str(exc_info.value) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])