From 6bdae1f701f17f4a2e63c59dd8f444f6ae470e0d Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 21 Aug 2025 10:55:39 -0700 Subject: [PATCH 1/3] Add profiling for sync. --- gel/_internal/_save.py | 3 +- gel/_internal/_testbase/_base.py | 24 +- gel/asyncio_client.py | 3 +- gel/blocking_client.py | 83 ++- tests/test_profiling.py | 850 +++++++++++++++++++++++++++++++ 5 files changed, 939 insertions(+), 24 deletions(-) create mode 100644 tests/test_profiling.py diff --git a/gel/_internal/_save.py b/gel/_internal/_save.py index 5b93fd8f8..9ed056dcf 100644 --- a/gel/_internal/_save.py +++ b/gel/_internal/_save.py @@ -1172,13 +1172,14 @@ def make_save_executor_constructor( refetch: bool, warn_on_large_sync_set: bool = False, save_postcheck: bool = False, + executor_type: type, ) -> Callable[[], SaveExecutor]: plan = make_plan( objs, refetch=refetch, warn_on_large_sync_set=warn_on_large_sync_set, ) - return lambda: SaveExecutor( + return lambda: executor_type( objs=objs, create_batches=plan.create_batches, updates=plan.update_batch, diff --git a/gel/_internal/_testbase/_base.py b/gel/_internal/_testbase/_base.py index 93ba7ca71..5d99bbeec 100644 --- a/gel/_internal/_testbase/_base.py +++ b/gel/_internal/_testbase/_base.py @@ -634,6 +634,7 @@ class BranchTestCase(InstanceTestCase): # of 10s. DEFAULT_CONNECT_TIMEOUT = 30 + CLIENT_TYPE: ClassVar[type[TestClient | TestAsyncIOClient] | None] client: ClassVar[TestClient | TestAsyncIOClient] @classmethod @@ -682,7 +683,9 @@ def setUp(self) -> None: if self.ISOLATED_TEST_BRANCHES: cls = type(self) testdb = cls.loop.run_until_complete(self.setup_branch_copy()) - client = cls.make_test_client(database=testdb)._with_debug( + client = cls.make_test_client( + database=testdb, client_class=self.CLIENT_TYPE + )._with_debug( save_postcheck=True, ) self.client = client # type: ignore[misc] @@ -721,6 +724,7 @@ def tearDown(self) -> None: def make_test_client( cls, *, + client_class: type[TestClient | TestAsyncIOClient] | None = None, connection_class: type[ asyncio_client.AsyncIOConnection | blocking_client.BlockingIOConnection @@ -762,14 +766,17 @@ def make_blocking_test_client( cls, *, instance: _server.BaseInstance, + client_class: type[TestClient] | None = None, connection_class: type[blocking_client.BlockingIOConnection] | None = None, **kwargs: Any, ) -> TestClient: + if client_class is None: + client_class = TestClient if connection_class is None: connection_class = blocking_client.BlockingIOConnection client = instance.create_blocking_client( - client_class=TestClient, + client_class=client_class, connection_class=connection_class, **cls.get_connect_args(instance, **kwargs), ) @@ -805,13 +812,16 @@ def make_async_test_client( cls, *, instance: _server.BaseInstance, + client_class: type[TestAsyncIOClient] | None = None, connection_class: type[asyncio_client.AsyncIOConnection] | None = None, **kwargs: Any, ) -> TestAsyncIOClient: + if client_class is None: + client_class = TestAsyncIOClient if connection_class is None: connection_class = asyncio_client.AsyncIOConnection client = instance.create_async_client( - client_class=TestAsyncIOClient, + client_class=client_class, connection_class=connection_class, **cls.get_connect_args(instance, **kwargs), ) @@ -889,7 +899,9 @@ async def setup_and_connect(cls) -> None: await cls._create_empty_branch(dbname) if not cls.ISOLATED_TEST_BRANCHES: - cls.client = cls.make_test_client(database=dbname) + cls.client = cls.make_test_client( + database=dbname, client_class=cls.CLIENT_TYPE + ) if isinstance(cls.client, gel.AsyncIOClient): await cls.client.ensure_connected() else: @@ -1029,11 +1041,13 @@ class AsyncQueryTestCase(BranchTestCase): def make_test_client( # pyright: ignore [reportIncompatibleMethodOverride] cls, *, + client_class: type[TestAsyncIOClient] | None = None, connection_class: type[asyncio_client.AsyncIOConnection] | None = None, # type: ignore [override] **kwargs: str, ) -> TestAsyncIOClient: return cls.make_async_test_client( instance=cls.instance, + client_class=client_class, connection_class=connection_class, **kwargs, ) @@ -1070,12 +1084,14 @@ def adapt_call(cls, coro: Any) -> Any: def make_test_client( # pyright: ignore [reportIncompatibleMethodOverride] cls, *, + client_class: type[TestClient] | None = None, connection_class: type[blocking_client.BlockingIOConnection] # type: ignore [override] | None = None, **kwargs: str, ) -> TestClient: return cls.make_blocking_test_client( instance=cls.instance, + client_class=client_class, connection_class=connection_class, **kwargs, ) diff --git a/gel/asyncio_client.py b/gel/asyncio_client.py index f7fcc0c69..e334446cd 100644 --- a/gel/asyncio_client.py +++ b/gel/asyncio_client.py @@ -37,7 +37,7 @@ from .protocol import asyncio_proto # type: ignore [attr-defined, unused-ignore] from .protocol.protocol import InputLanguage, OutputFormat -from ._internal._save import make_save_executor_constructor +from ._internal._save import make_save_executor_constructor, SaveExecutor if typing.TYPE_CHECKING: from ._internal._qbmodel._pydantic import GelModel @@ -675,6 +675,7 @@ async def _save_impl( refetch=refetch, save_postcheck=opts.save_postcheck, warn_on_large_sync_set=warn_on_large_sync_set, + executor_type=SaveExecutor, ) async for tx in self._batch(): diff --git a/gel/blocking_client.py b/gel/blocking_client.py index 2dc6a70be..118f10dfd 100644 --- a/gel/blocking_client.py +++ b/gel/blocking_client.py @@ -38,7 +38,12 @@ from .protocol import blocking_proto # type: ignore [attr-defined, unused-ignore] from .protocol.protocol import InputLanguage, OutputFormat -from ._internal._save import make_save_executor_constructor +from ._internal._save import ( + QueryBatch, + QueryRefetch, + SaveExecutor, + make_save_executor_constructor, +) if typing.TYPE_CHECKING: from ._internal._qbmodel._pydantic import GelModel @@ -681,6 +686,7 @@ class Client( __slots__ = () _impl_class = _PoolImpl + _save_executor_type = SaveExecutor def _save_impl( self, @@ -689,12 +695,9 @@ def _save_impl( objs: tuple[GelModel, ...], warn_on_large_sync_set: bool = False, ) -> None: - opts = self._get_debug_options() - - make_executor = make_save_executor_constructor( - objs, + make_executor = self._get_make_save_executor( refetch=refetch, - save_postcheck=opts.save_postcheck, + objs=objs, warn_on_large_sync_set=warn_on_large_sync_set, ) @@ -703,23 +706,13 @@ def _save_impl( executor = make_executor() for batches in executor: - for batch in batches: - tx.send_query(batch.query, batch.args) - batch_ids = tx.wait() + batch_ids = self._send_batch_queries(tx, batches) for ids, batch in zip(batch_ids, batches, strict=True): batch.record_inserted_data(ids) if refetch: ref_queries = executor.get_refetch_queries() - for ref in ref_queries: - tx.send_query( - ref.query, - spec=ref.args.spec, - new=ref.args.new, - existing=ref.args.existing, - ) - - refetch_data = tx.wait() + refetch_data = self._send_refetch_queries(tx, ref_queries) for ref_data, ref in zip( refetch_data, ref_queries, strict=True @@ -728,6 +721,60 @@ def _save_impl( executor.commit() + def _get_make_save_executor( + self, + *, + refetch: bool, + objs: tuple[GelModel, ...], + warn_on_large_sync_set: bool = False, + ) -> typing.Callable[[], SaveExecutor]: + opts = self._get_debug_options() + + return make_save_executor_constructor( + objs, + refetch=refetch, + save_postcheck=opts.save_postcheck, + warn_on_large_sync_set=warn_on_large_sync_set, + executor_type=self._save_executor_type, + ) + + def _send_batch_queries( + self, + tx: BatchIteration, + batches: list[QueryBatch], + ) -> list[Any]: + for batch in batches: + self._send_batch_query(tx, batch) + return tx.wait() + + def _send_refetch_queries( + self, + tx: BatchIteration, + ref_queries: list[QueryRefetch], + ) -> list[Any]: + for ref in ref_queries: + self._send_refetch_query(tx, ref) + return tx.wait() + + def _send_batch_query( + self, + tx: BatchIteration, + batch: QueryBatch, + ) -> None: + tx.send_query(batch.query, batch.args) + + def _send_refetch_query( + self, + tx: BatchIteration, + ref: QueryRefetch, + ) -> None: + tx.send_query( + ref.query, + spec=ref.args.spec, + new=ref.args.new, + existing=ref.args.existing, + ) + def save( self, *objs: GelModel, diff --git a/tests/test_profiling.py b/tests/test_profiling.py new file mode 100644 index 000000000..08758f5d2 --- /dev/null +++ b/tests/test_profiling.py @@ -0,0 +1,850 @@ +# +# This source file is part of the EdgeDB open source project. +# +# Copyright 2016-present MagicStack Inc. and the EdgeDB authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import typing +import dataclasses + +import csv +import os +import time + +from gel import blocking_client +from gel._internal._qbmodel._pydantic import GelModel +from gel._internal._save import ( + ChangeBatch, + QueryBatch, + QueryRefetch, + QueryRefetchArgs, + SaveExecutor, +) +from gel._internal._testbase import _base as tb +from gel._internal._testbase import _models as tb_models + + +@dataclasses.dataclass +class ProfilingSaveExecutor(SaveExecutor): + batch_queries: list[list[QueryBatch]] = dataclasses.field( + default_factory=list + ) + refetch_queries: list[list[QueryRefetch]] = dataclasses.field( + default_factory=list + ) + + def _compile_batch( + self, batch: ChangeBatch, /, *, for_insert: bool + ) -> list[QueryBatch]: + compiled_batch = super()._compile_batch(batch, for_insert=for_insert) + self.batch_queries.append(compiled_batch) + return compiled_batch + + def get_refetch_queries( + self, + ) -> list[QueryRefetch]: + refetch_query = super().get_refetch_queries() + self.refetch_queries.append(refetch_query) + return refetch_query + + +class ProfilingTestClient(tb.TestClient): + __slots__ = ( + "executors", + "batch_queries", + "batch_query_times", + "refetch_queries", + "refetch_query_times", + ) + _save_executor_type = ProfilingSaveExecutor + + executors: list[ProfilingSaveExecutor] + batch_queries: list[QueryBatch] + batch_query_times: list[float] + refetch_queries: list[QueryRefetch] + refetch_query_times: list[float] + + def __init__(self, **kwargs: typing.Any): + super().__init__(**kwargs) + self.executors = [] + self.batch_queries = [] + self.batch_query_times = [] + self.refetch_queries = [] + self.refetch_query_times = [] + + def _get_make_save_executor( + self, + *, + refetch: bool, + objs: tuple[GelModel, ...], + warn_on_large_sync_set: bool = False, + ) -> typing.Callable[[], SaveExecutor]: + """Clear executors and return a save executor constructor which will + add new save executors to the list.""" + + self.executors = [] + self.batch_queries = [] + self.batch_query_times = [] + self.refetch_queries = [] + self.refetch_query_times = [] + + base_make_executor = super()._get_make_save_executor( + refetch=refetch, + objs=objs, + warn_on_large_sync_set=warn_on_large_sync_set, + ) + + def wrapped_make_executor() -> SaveExecutor: + executor = base_make_executor() + self.executors.append(typing.cast(ProfilingSaveExecutor, executor)) + return executor + + return wrapped_make_executor + + def _send_batch_queries( + self, + tx: blocking_client.BatchIteration, + batches: list[QueryBatch], + ) -> list[typing.Any]: + result: list[typing.Any] = [] + for batch in batches: + self._send_batch_query(tx, batch) + + start = time.perf_counter_ns() + result.extend(tx.wait()) + finish = time.perf_counter_ns() + + self.batch_queries.append(batch) + self.batch_query_times.append((finish - start) / 1000000000) + return result + + def _send_refetch_queries( + self, + tx: blocking_client.BatchIteration, + ref_queries: list[QueryRefetch], + ) -> list[typing.Any]: + result: list[typing.Any] = [] + for ref in ref_queries: + self._send_refetch_query(tx, ref) + + start = time.perf_counter_ns() + result.extend(tx.wait()) + finish = time.perf_counter_ns() + + self.refetch_queries.append(ref) + self.refetch_query_times.append((finish - start) / 1000000000) + return result + + def get_executor_batch_changes( + self, + ) -> list[list[list[tuple[list[tuple[GelModel, set[str]]], bool]]]]: + # Get models per executor, per batch, per query, per change + return [ + [ + [ + ( + [ + (change.model, set(change.fields.keys())) + for change in query.changes + ], + query.insert, + ) + for query in batch + ] + for batch in executor.batch_queries + ] + for executor in self.executors + ] + + def get_executor_refetch_changes( + self, + ) -> list[list[list[QueryRefetchArgs]]]: + # Get models per executor, per batch, per query, per change + return [ + [ + [query.args for query in refetch] + for refetch in executor.refetch_queries + ] + for executor in self.executors + ] + + def get_profiling_query_labels( + self, object_labels: dict[str, typing.Sequence[GelModel]] + ) -> tuple[list[str], list[str]]: + batch_labels = [] + refetch_labels = [] + + for batch_query in self.batch_queries: + batch_labels.append( + "batch(" + + ", ".join( + label + for label, objects in object_labels.items() + if any( + set(objects) + & set(change.model for change in batch_query.changes) + ) + ) + + ")" + ) + + for refetch_query in self.refetch_queries: + batch_labels.append( + "refetch(" + + ", ".join( + label + for label, objects in object_labels.items() + if any( + set(obj.id for obj in objects) + & set(obj_id for obj_id, _ in refetch_query.args.spec) + ) + ) + + ")" + ) + + return batch_labels, refetch_labels + + def get_simple_profiling_query_labels(self) -> tuple[list[str], list[str]]: + """Simple version for timing tests that just need sequential labels.""" + batch_labels = [ + f"batch_{i}" for i in range(len(self.batch_query_times)) + ] + refetch_labels = [ + f"refetch_{i}" for i in range(len(self.refetch_query_times)) + ] + return batch_labels, refetch_labels + + +class BaseProfilingTestCase(tb_models.ModelTestCase): + CLIENT_TYPE = ProfilingTestClient + + SCHEMA = "" + + @classmethod + def _get_client_class(cls, connection_class): + return ProfilingTestClient + + def get_profiling_client(self) -> ProfilingTestClient: + return typing.cast(ProfilingTestClient, self.client) + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class ProfilingRecord: + object_count: int + batch_data: list[float] + refetch_data: list[float] + + +@dataclasses.dataclass(frozen=True) +class ProfilingData: + batch_labels: typing.Sequence[str] + refetch_labels: typing.Sequence[str] + + records: list[ProfilingRecord] = dataclasses.field(default_factory=list) + + def write_csv(self, filename: str): + csv_data = [] + csv_data.append( + ["object_count"] + + list(self.batch_labels) + + list(self.refetch_labels) + ) + + for record in self.records: + csv_data.append( + [ + record.object_count, + *[round(t, 3) for t in record.batch_data], + *[round(t, 3) for t in record.refetch_data], + ] + ) + + with open(filename, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerows(csv_data) + + +def profile_operation( + client: ProfilingTestClient, + operation: typing.Callable[[int], dict[str, typing.Sequence[GelModel]]], + count_max: int = 400, +) -> ProfilingData: + # Get the number of batches and refetches + # This also "warms up" the system + object_labels = operation(1) + + # Prepare profiling data + batch_labels, refetch_labels = client.get_profiling_query_labels( + object_labels + ) + data = ProfilingData(batch_labels, refetch_labels) + + # Profiling with increasing object count + for count in range(5, count_max + 1, 5): + operation(count) + + data.records.append( + ProfilingRecord( + object_count=count, + batch_data=client.batch_query_times, + refetch_data=client.refetch_query_times, + ) + ) + + return data + + +class TestProfilingSimple(BaseProfilingTestCase): + SCHEMA = """ + type Obj_01; + + type Obj_02 { + prop: int64; + }; + + type Obj_03 { + prop: int64 { + default := -1; + }; + }; + + type Target_04; + type Source_04 { + target: Target_04; + }; + + type Target_05; + type Source_05 { + target: Target_05 { + lprop: int64; + }; + }; + + type Target_06; + type Source_06 { + target: Target_06 { + lprop: int64 { + default := 1; + }; + }; + }; + """ + + def test_profiling_simple_01(self) -> None: + # Isolated objects + from models.TestProfilingSimple import default + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + print(f'_operation._operation({count})') + + objs = [default.Obj_01() for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_simple_01.csv") + + def test_profiling_simple_02a(self) -> None: + # Isolated objects with a property + from models.TestProfilingSimple import default + + def _operation_value( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_02._operation_value({count})') + + objs = [default.Obj_02(prop=1) for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation(self.get_profiling_client(), _operation_value) + data.write_csv("profiling_simple_02_value.csv") + + def test_profiling_simple_02b(self) -> None: + # Isolated objects with a property + from models.TestProfilingSimple import default + + def _operation_default( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_02._operation_default({count})') + + objs = [default.Obj_02() for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation( + self.get_profiling_client(), _operation_default + ) + data.write_csv("profiling_simple_02_default.csv") + + def test_profiling_simple_02c(self) -> None: + # Isolated objects with a property + from models.TestProfilingSimple import default + + def _operation_none( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_02._operation_none({count})') + + objs = [default.Obj_02(prop=None) for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation(self.get_profiling_client(), _operation_none) + data.write_csv("profiling_simple_02_none.csv") + + def test_profiling_simple_03a(self) -> None: + # Isolated objects with a property with default + from models.TestProfilingSimple import default + + def _operation_value( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_03._operation_value({count})') + + objs = [default.Obj_03(prop=1) for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation(self.get_profiling_client(), _operation_value) + data.write_csv("profiling_simple_03_value.csv") + + def test_profiling_simple_03b(self) -> None: + # Isolated objects with a property with default + from models.TestProfilingSimple import default + + def _operation_default( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_03._operation_default({count})') + + objs = [default.Obj_03() for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation( + self.get_profiling_client(), _operation_default + ) + data.write_csv("profiling_simple_03_default.csv") + + def test_profiling_simple_03c(self) -> None: + # Isolated objects with a property with default + from models.TestProfilingSimple import default + + def _operation_none( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_03._operation_none({count})') + + objs = [default.Obj_03(prop=None) for _ in range(count)] + + self.client.sync(*objs, warn_on_large_sync=False) + + return { + "object": objs, + } + + data = profile_operation(self.get_profiling_client(), _operation_none) + data.write_csv("profiling_simple_03_none.csv") + + def test_profiling_simple_04(self) -> None: + # Objects with a common link target + + from models.TestProfilingSimple import default + + target = default.Target_04() + self.client.save(target) + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_04._operation({count})') + + sources = [default.Source_04(target=target) for _ in range(count)] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_simple_04.csv") + + def test_profiling_simple_05a(self) -> None: + # Objects with a common link target with a linkprop + + from models.TestProfilingSimple import default + + target = default.Target_05() + self.client.save(target) + + def _operation_value( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_05._operation_value({count})') + + sources = [ + default.Source_05( + target=default.Source_05.target.link(target, lprop=1) + ) + for _ in range(count) + ] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation(self.get_profiling_client(), _operation_value) + data.write_csv("profiling_simple_05_value.csv") + + def test_profiling_simple_05b(self) -> None: + # Objects with a common link target with a linkprop + + from models.TestProfilingSimple import default + + target = default.Target_05() + self.client.save(target) + + def _operation_default( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_05._operation_default({count})') + + sources = [ + default.Source_05(target=default.Source_05.target.link(target)) + for _ in range(count) + ] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation( + self.get_profiling_client(), _operation_default + ) + data.write_csv("profiling_simple_05_default.csv") + + def test_profiling_simple_05c(self) -> None: + # Objects with a common link target with a linkprop + + from models.TestProfilingSimple import default + + target = default.Target_05() + self.client.save(target) + + def _operation_none( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_05._operation_none({count})') + + sources = [ + default.Source_05( + target=default.Source_05.target.link(target, lprop=None) + ) + for _ in range(count) + ] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation(self.get_profiling_client(), _operation_none) + data.write_csv("profiling_simple_05_none.csv") + + def test_profiling_simple_06a(self) -> None: + # Objects with a common link target with a linkprop with default + + from models.TestProfilingSimple import default + + target = default.Target_06() + self.client.save(target) + + def _operation_value( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_06._operation_value({count})') + + sources = [ + default.Source_06( + target=default.Source_06.target.link(target, lprop=1) + ) + for _ in range(count) + ] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation(self.get_profiling_client(), _operation_value) + data.write_csv("profiling_simple_06_value.csv") + + def test_profiling_simple_06b(self) -> None: + # Objects with a common link target with a linkprop with default + + from models.TestProfilingSimple import default + + target = default.Target_06() + self.client.save(target) + + def _operation_default( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_06._operation_default({count})') + + sources = [ + default.Source_06(target=default.Source_06.target.link(target)) + for _ in range(count) + ] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation( + self.get_profiling_client(), _operation_default + ) + data.write_csv("profiling_simple_06_default.csv") + + def test_profiling_simple_06c(self) -> None: + # Objects with a common link target with a linkprop with default + + from models.TestProfilingSimple import default + + target = default.Target_06() + self.client.save(target) + + def _operation_none( + count: int, + ) -> dict[str, typing.Sequence[GelModel]]: + print(f'test_profiling_simple_06._operation_none({count})') + + sources = [ + default.Source_06( + target=default.Source_06.target.link(target, lprop=None) + ) + for _ in range(count) + ] + + self.client.sync(target, *sources, warn_on_large_sync=False) + + return { + "target": [target], + "sources": sources, + } + + data = profile_operation(self.get_profiling_client(), _operation_none) + data.write_csv("profiling_simple_06_none.csv") + + +class TestProfilingChemistry(BaseProfilingTestCase): + SCHEMA = os.path.join( + os.path.dirname(__file__), "dbsetup", "chemistry.gel" + ) + + SETUP = os.path.join( + os.path.dirname(__file__), "dbsetup", "chemistry.esdl" + ) + + def test_profiling_chemistry_01(self) -> None: + # Adding objects with no links + from models.chemistry import default + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + # Create reactors + reactors = [default.Reactor() for _ in range(count)] + + # Sync + self.client.sync(*reactors, warn_on_large_sync=False) + + return { + "reactors": reactors, + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_chemistry_01.csv") + + def test_profiling_chemistry_02(self) -> None: + # Adding a single reactor with increasing number of helium atoms + from models.chemistry import default + + helium = self.client.query_required_single( + default.Element.filter(symbol="He").limit(1) + ) + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + # Create new reactor and atoms + reactor = default.Reactor() + atoms = [ + default.Atom(reactor=reactor, element=helium) + for _ in range(count) + ] + + # Sync + self.client.sync(reactor, *atoms, warn_on_large_sync=False) + + return { + "helium": [helium], + "reactor": [reactor], + "atoms": atoms, + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_chemistry_02.csv") + + def test_profiling_chemistry_03(self) -> None: + # Create two reactors, add atoms to first, sync, then move all to + # second + from models.chemistry import default + + helium = self.client.query_required_single( + default.Element.filter(symbol="He").limit(1) + ) + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + # Create two reactors + reactor_1 = default.Reactor() + reactor_2 = default.Reactor() + + # Create atoms in reactor 1 + atoms = [ + default.Atom(reactor=reactor_1, element=helium) + for _ in range(count) + ] + + # Initial sync - create reactors and atoms + self.client.sync( + reactor_1, reactor_2, *atoms, warn_on_large_sync=False + ) + + # Move all atoms to reactor 2 + for atom in atoms: + atom.reactor = reactor_2 + + # Sync the move operation + self.client.sync(*atoms, warn_on_large_sync=False) + + return { + "helium": [helium], + "reactor_1": [reactor_1], + "reactor_2": [reactor_2], + "atoms": atoms, + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_chemistry_03.csv") + + def test_profiling_chemistry_04(self) -> None: + # Create a compound with an increasing number of helium RefAtoms + from models.chemistry import default + + helium = self.client.query_required_single( + default.Element.filter(symbol="He").limit(1) + ) + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + # Create RefAtoms with helium element + ref_atoms = [default.RefAtom(element=helium) for _ in range(count)] + + # Create compound with all the atoms + compound = default.Compound( + name=f"test_compound_{count}", atoms=ref_atoms + ) + + # Sync - create all objects + self.client.sync(*ref_atoms, compound, warn_on_large_sync=False) + + return { + "helium": [helium], + "ref_atoms": ref_atoms, + "compound": [compound], + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_chemistry_04.csv") + + def test_profiling_chemistry_05(self) -> None: + # Create compound with atoms, sync, then clear atoms and sync again + from models.chemistry import default + + helium = self.client.query_required_single( + default.Element.filter(symbol="He").limit(1) + ) + + def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + # Create RefAtoms with helium element + ref_atoms = [default.RefAtom(element=helium) for _ in range(count)] + + # Create compound with all the atoms + compound = default.Compound( + name=f"test_compound_{count}", atoms=ref_atoms + ) + + # Initial sync - create all objects + self.client.sync(*ref_atoms, compound, warn_on_large_sync=False) + + # Clear all atoms from the compound + compound.atoms.clear() + + # Sync the clear operation + self.client.sync(compound, warn_on_large_sync=False) + + return { + "helium": [helium], + "ref_atoms": ref_atoms, + "compound": [compound], + } + + data = profile_operation(self.get_profiling_client(), _operation) + data.write_csv("profiling_chemistry_05.csv") From 306768d4fbe103e3b32745cab8826a6a90e4bc70 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 18 Sep 2025 18:01:24 -0700 Subject: [PATCH 2/3] Cleanup operation after each measurement. --- tests/test_profiling.py | 319 ++++++++++++++++++++++++++++------------ 1 file changed, 224 insertions(+), 95 deletions(-) diff --git a/tests/test_profiling.py b/tests/test_profiling.py index 08758f5d2..afc10e670 100644 --- a/tests/test_profiling.py +++ b/tests/test_profiling.py @@ -17,6 +17,7 @@ # import typing +import typing_extensions import dataclasses import csv @@ -276,14 +277,41 @@ def write_csv(self, filename: str): writer.writerows(csv_data) +CleanupObjects = typing_extensions.TypeAliasType( + "CleanupObjects", + dict[type[GelModel], typing.Sequence[GelModel]], +) + + +OperationResult = typing_extensions.TypeAliasType( + "OperationResult", + tuple[ + dict[str, typing.Sequence[GelModel]], + CleanupObjects, + ], +) + + +def cleanup_operation( + client: ProfilingTestClient, + objects: CleanupObjects, +) -> None: + cleanup_ids = [o.id for _, t_objs in objects.items() for o in t_objs] + client.query( + "delete Object filter .id in array_unpack(>$0)", + cleanup_ids, + ) + + def profile_operation( client: ProfilingTestClient, - operation: typing.Callable[[int], dict[str, typing.Sequence[GelModel]]], + operation: typing.Callable[[int], OperationResult], count_max: int = 400, ) -> ProfilingData: # Get the number of batches and refetches # This also "warms up" the system - object_labels = operation(1) + object_labels, clenaup_objects = operation(1) + cleanup_operation(client, clenaup_objects) # Prepare profiling data batch_labels, refetch_labels = client.get_profiling_query_labels( @@ -293,7 +321,7 @@ def profile_operation( # Profiling with increasing object count for count in range(5, count_max + 1, 5): - operation(count) + _, clenaup_objects = operation(count) data.records.append( ProfilingRecord( @@ -303,6 +331,8 @@ def profile_operation( ) ) + cleanup_operation(client, clenaup_objects) + return data @@ -346,16 +376,21 @@ def test_profiling_simple_01(self) -> None: # Isolated objects from models.TestProfilingSimple import default - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: print(f'_operation._operation({count})') objs = [default.Obj_01() for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_01: objs, + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_simple_01.csv") @@ -366,16 +401,21 @@ def test_profiling_simple_02a(self) -> None: def _operation_value( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_02._operation_value({count})') objs = [default.Obj_02(prop=1) for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_02: objs, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_value) data.write_csv("profiling_simple_02_value.csv") @@ -386,16 +426,21 @@ def test_profiling_simple_02b(self) -> None: def _operation_default( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_02._operation_default({count})') objs = [default.Obj_02() for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_02: objs, + }, + ) data = profile_operation( self.get_profiling_client(), _operation_default @@ -408,16 +453,21 @@ def test_profiling_simple_02c(self) -> None: def _operation_none( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_02._operation_none({count})') objs = [default.Obj_02(prop=None) for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_02: objs, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_none) data.write_csv("profiling_simple_02_none.csv") @@ -428,16 +478,21 @@ def test_profiling_simple_03a(self) -> None: def _operation_value( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_03._operation_value({count})') objs = [default.Obj_03(prop=1) for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_03: objs, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_value) data.write_csv("profiling_simple_03_value.csv") @@ -448,16 +503,21 @@ def test_profiling_simple_03b(self) -> None: def _operation_default( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_03._operation_default({count})') objs = [default.Obj_03() for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_03: objs, + }, + ) data = profile_operation( self.get_profiling_client(), _operation_default @@ -470,16 +530,21 @@ def test_profiling_simple_03c(self) -> None: def _operation_none( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_03._operation_none({count})') objs = [default.Obj_03(prop=None) for _ in range(count)] self.client.sync(*objs, warn_on_large_sync=False) - return { - "object": objs, - } + return ( + { + "object": objs, + }, + { + default.Obj_03: objs, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_none) data.write_csv("profiling_simple_03_none.csv") @@ -492,17 +557,22 @@ def test_profiling_simple_04(self) -> None: target = default.Target_04() self.client.save(target) - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: print(f'test_profiling_simple_04._operation({count})') sources = [default.Source_04(target=target) for _ in range(count)] self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_04: sources, + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_simple_04.csv") @@ -517,7 +587,7 @@ def test_profiling_simple_05a(self) -> None: def _operation_value( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_05._operation_value({count})') sources = [ @@ -529,10 +599,15 @@ def _operation_value( self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_05: sources, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_value) data.write_csv("profiling_simple_05_value.csv") @@ -547,7 +622,7 @@ def test_profiling_simple_05b(self) -> None: def _operation_default( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_05._operation_default({count})') sources = [ @@ -557,10 +632,15 @@ def _operation_default( self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_05: sources, + }, + ) data = profile_operation( self.get_profiling_client(), _operation_default @@ -577,7 +657,7 @@ def test_profiling_simple_05c(self) -> None: def _operation_none( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_05._operation_none({count})') sources = [ @@ -589,10 +669,15 @@ def _operation_none( self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_05: sources, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_none) data.write_csv("profiling_simple_05_none.csv") @@ -607,7 +692,7 @@ def test_profiling_simple_06a(self) -> None: def _operation_value( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_06._operation_value({count})') sources = [ @@ -619,10 +704,15 @@ def _operation_value( self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_06: sources, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_value) data.write_csv("profiling_simple_06_value.csv") @@ -637,7 +727,7 @@ def test_profiling_simple_06b(self) -> None: def _operation_default( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_06._operation_default({count})') sources = [ @@ -647,10 +737,15 @@ def _operation_default( self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_06: sources, + }, + ) data = profile_operation( self.get_profiling_client(), _operation_default @@ -667,7 +762,7 @@ def test_profiling_simple_06c(self) -> None: def _operation_none( count: int, - ) -> dict[str, typing.Sequence[GelModel]]: + ) -> OperationResult: print(f'test_profiling_simple_06._operation_none({count})') sources = [ @@ -679,10 +774,15 @@ def _operation_none( self.client.sync(target, *sources, warn_on_large_sync=False) - return { - "target": [target], - "sources": sources, - } + return ( + { + "target": [target], + "sources": sources, + }, + { + default.Source_06: sources, + }, + ) data = profile_operation(self.get_profiling_client(), _operation_none) data.write_csv("profiling_simple_06_none.csv") @@ -701,16 +801,21 @@ def test_profiling_chemistry_01(self) -> None: # Adding objects with no links from models.chemistry import default - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: # Create reactors reactors = [default.Reactor() for _ in range(count)] # Sync self.client.sync(*reactors, warn_on_large_sync=False) - return { - "reactors": reactors, - } + return ( + { + "reactors": reactors, + }, + { + default.Reactor: reactors, + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_chemistry_01.csv") @@ -723,7 +828,7 @@ def test_profiling_chemistry_02(self) -> None: default.Element.filter(symbol="He").limit(1) ) - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: # Create new reactor and atoms reactor = default.Reactor() atoms = [ @@ -734,11 +839,17 @@ def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: # Sync self.client.sync(reactor, *atoms, warn_on_large_sync=False) - return { - "helium": [helium], - "reactor": [reactor], - "atoms": atoms, - } + return ( + { + "helium": [helium], + "reactor": [reactor], + "atoms": atoms, + }, + { + default.Reactor: [reactor], + default.Atom: atoms, + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_chemistry_02.csv") @@ -752,7 +863,7 @@ def test_profiling_chemistry_03(self) -> None: default.Element.filter(symbol="He").limit(1) ) - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: # Create two reactors reactor_1 = default.Reactor() reactor_2 = default.Reactor() @@ -775,12 +886,18 @@ def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: # Sync the move operation self.client.sync(*atoms, warn_on_large_sync=False) - return { - "helium": [helium], - "reactor_1": [reactor_1], - "reactor_2": [reactor_2], - "atoms": atoms, - } + return ( + { + "helium": [helium], + "reactor_1": [reactor_1], + "reactor_2": [reactor_2], + "atoms": atoms, + }, + { + default.Reactor: [reactor_1, reactor_2], + default.Atom: atoms, + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_chemistry_03.csv") @@ -793,7 +910,7 @@ def test_profiling_chemistry_04(self) -> None: default.Element.filter(symbol="He").limit(1) ) - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: # Create RefAtoms with helium element ref_atoms = [default.RefAtom(element=helium) for _ in range(count)] @@ -805,11 +922,17 @@ def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: # Sync - create all objects self.client.sync(*ref_atoms, compound, warn_on_large_sync=False) - return { - "helium": [helium], - "ref_atoms": ref_atoms, - "compound": [compound], - } + return ( + { + "helium": [helium], + "ref_atoms": ref_atoms, + "compound": [compound], + }, + { + default.RefAtom: ref_atoms, + default.Compound: [compound], + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_chemistry_04.csv") @@ -822,7 +945,7 @@ def test_profiling_chemistry_05(self) -> None: default.Element.filter(symbol="He").limit(1) ) - def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: + def _operation(count: int) -> OperationResult: # Create RefAtoms with helium element ref_atoms = [default.RefAtom(element=helium) for _ in range(count)] @@ -840,11 +963,17 @@ def _operation(count: int) -> dict[str, typing.Sequence[GelModel]]: # Sync the clear operation self.client.sync(compound, warn_on_large_sync=False) - return { - "helium": [helium], - "ref_atoms": ref_atoms, - "compound": [compound], - } + return ( + { + "helium": [helium], + "ref_atoms": ref_atoms, + "compound": [compound], + }, + { + default.RefAtom: ref_atoms, + default.Compound: [compound], + }, + ) data = profile_operation(self.get_profiling_client(), _operation) data.write_csv("profiling_chemistry_05.csv") From e69e754b707bb050faacbd04577abeea8e0ed671 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Fri, 19 Sep 2025 11:27:54 -0700 Subject: [PATCH 3/3] Use gel venv. --- gel/_internal/_testbase/_base.py | 12 ++++++++++-- tests/test_profiling.py | 9 +++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/gel/_internal/_testbase/_base.py b/gel/_internal/_testbase/_base.py index 5d99bbeec..7e0a5cfc6 100644 --- a/gel/_internal/_testbase/_base.py +++ b/gel/_internal/_testbase/_base.py @@ -356,9 +356,17 @@ async def set_up(self, ui: UI) -> None: if self._instance is not None: return - if self._server_addr is not None: + server_addr = self._server_addr + if server_addr is None: + server_addr = { + "host": "localhost", + "port": 5656, + "tls_ca_file": "/home/dnwpark/work/dev-3.12/edgedb/tmp/devdatadir/edbtlscert.pem", + } + + if server_addr is not None: await self._set_up_running_instance( - self._server_addr, + server_addr, self._server_version, ) else: diff --git a/tests/test_profiling.py b/tests/test_profiling.py index afc10e670..89fff34bd 100644 --- a/tests/test_profiling.py +++ b/tests/test_profiling.py @@ -338,6 +338,8 @@ def profile_operation( class TestProfilingSimple(BaseProfilingTestCase): SCHEMA = """ + using future simple_scoping; + module default { type Obj_01; type Obj_02 { @@ -370,6 +372,7 @@ class TestProfilingSimple(BaseProfilingTestCase): }; }; }; + } """ def test_profiling_simple_01(self) -> None: @@ -620,6 +623,12 @@ def test_profiling_simple_05b(self) -> None: target = default.Target_05() self.client.save(target) + noise = [ + default.Source_05(target=default.Source_05.target.link(target)) + for _ in range(50000) + ] + self.client.save(*noise) + def _operation_default( count: int, ) -> OperationResult: