From 27b05630d84f58f9989826c5971852a929c224df Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Tue, 10 Mar 2026 13:43:10 -0400 Subject: [PATCH 1/5] feat: Add Vector backend --- example_configs/vector_small_example.yaml | 91 +++++++++++++++++++++++ xapi_db_load/backends/vector.py | 80 ++++++++++++++++++++ xapi_db_load/runner.py | 5 ++ 3 files changed, 176 insertions(+) create mode 100644 example_configs/vector_small_example.yaml create mode 100644 xapi_db_load/backends/vector.py diff --git a/example_configs/vector_small_example.yaml b/example_configs/vector_small_example.yaml new file mode 100644 index 0000000..25dfaf9 --- /dev/null +++ b/example_configs/vector_small_example.yaml @@ -0,0 +1,91 @@ +# Use the vector backend to emit log statements that Vector will use to load the data +# to ClickHouse with insert statements + +# Vector Backend configuration +# ################################ +backend: vector +db_host: localhost +db_port: 8123 +db_name: xapi +db_event_sink_name: event_sink +db_username: ch_admin +db_password: ... +s3_key: ... +s3_secret: ... + +# Run options +log_dir: logs +num_xapi_batches: 3 +batch_size: 100 + +# This number is used for each QueueBackend that use workers, so the number of threads if +# multiplicative. Generally this performs best less than 10, as more threads will cost more +# in context switching than they save. +num_workers: 4 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# This replicates users updating their profiles several times, creating +# more rows +num_actor_profile_changes: 5 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# How many times each course will be "published", this creates a more realistic +# distribution of course blocks where each course can be published dozens or +# hundreds of times while it is being developed. +num_course_publishes: 100 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/xapi_db_load/backends/vector.py b/xapi_db_load/backends/vector.py new file mode 100644 index 0000000..09baa42 --- /dev/null +++ b/xapi_db_load/backends/vector.py @@ -0,0 +1,80 @@ +""" +A backend that simply logs the statements to a xapi_tracking logger. + +Vector just reads the log statements, so all we need to do is emit them. +All other tasks use the raw Clickhouse inserts. +""" + +import logging +import sys +from logging import Logger, getLogger +from typing import List + +from xapi_db_load.backends.base_async_backend import ( + BaseBackendTasks, +) +from xapi_db_load.backends.clickhouse import ( + InsertBlocks, + InsertCourses, + InsertExternalIDs, + InsertInitialEnrollments, + InsertObjectTags, + InsertProfiles, + InsertTags, + InsertTaxonomies, + InsertXAPIEvents, +) +from xapi_db_load.generate_load_async import EventGenerator + + +class AsyncVectorTasks(BaseBackendTasks): + def __repr__(self) -> str: + return f"AsyncVectorTasks: {self.config['lrs_url']} -> {self.config['db_host']}" + + def get_test_data_tasks(self): + """ + Return the tasks to be run. + """ + return [ + self.event_generator, + InsertInitialEnrollments(self.config, self.logger, self.event_generator), + InsertCourses(self.config, self.logger, self.event_generator), + InsertBlocks(self.config, self.logger, self.event_generator), + InsertObjectTags(self.config, self.logger, self.event_generator), + InsertTaxonomies(self.config, self.logger, self.event_generator), + InsertTags(self.config, self.logger, self.event_generator), + InsertExternalIDs(self.config, self.logger, self.event_generator), + InsertProfiles(self.config, self.logger, self.event_generator), + # This is the only change from the ClickHouse backend + InsertXAPIEventsVector(self.config, self.logger, self.event_generator), + ] + + +class InsertXAPIEventsVector(InsertXAPIEvents): + """ + Wraps the ClickHouse direct backend so that the rest of the metadata can be sent while using + Ralph to do the xAPI the insertion. + """ + + def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator): + super().__init__(config, logger, event_generator) + + stream_handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("[%(name)s] %(message)s") + stream_handler.setFormatter(formatter) + self.xapi_logger = getLogger("xapi_tracking") + self.xapi_logger.setLevel(logging.INFO) + self.xapi_logger.addHandler(stream_handler) + + def _format_row(self, row: dict): + """ + This overrides the ClickHouse backend's method to format the row for Ralph. + """ + return row["event"] + + async def _do_insert(self, out_data: List): + """ + POST a batch of rows to Ralph instead of inserting directly to ClickHouse. + """ + for event_json in out_data: + self.xapi_logger.info(event_json) diff --git a/xapi_db_load/runner.py b/xapi_db_load/runner.py index 933275a..0d102d6 100644 --- a/xapi_db_load/runner.py +++ b/xapi_db_load/runner.py @@ -7,6 +7,7 @@ from xapi_db_load.backends.clickhouse import AsyncClickHouseTasks from xapi_db_load.backends.csv import AsyncCSVTasks from xapi_db_load.backends.ralph import AsyncRalphTasks +from xapi_db_load.backends.vector import AsyncVectorTasks from xapi_db_load.generate_load_async import EventGenerator @@ -42,6 +43,10 @@ def set_backend(self, backend): self.backend = AsyncRalphTasks( self.config, self.logger, self.event_generator ) + elif backend == "vector": + self.backend = AsyncVectorTasks( + self.config, self.logger, self.event_generator + ) else: raise ValueError("Invalid backend") From 5c4ebe2f8e72177d1819199068889ce194f2a715 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Tue, 10 Mar 2026 15:29:16 -0400 Subject: [PATCH 2/5] fix: Fix Vector formatter, add tests --- xapi_db_load/backends/vector.py | 4 +- .../tests/fixtures/small_vector_config.yaml | 54 ++++++++++++++++ xapi_db_load/tests/test_backends.py | 62 +++++++++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 xapi_db_load/tests/fixtures/small_vector_config.yaml diff --git a/xapi_db_load/backends/vector.py b/xapi_db_load/backends/vector.py index 09baa42..2139d72 100644 --- a/xapi_db_load/backends/vector.py +++ b/xapi_db_load/backends/vector.py @@ -60,7 +60,9 @@ def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator super().__init__(config, logger, event_generator) stream_handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter("[%(name)s] %(message)s") + # This formatter is different from what the LMS uses, but is the smallest possible + # format that passes Vector's regex + formatter = logging.Formatter(" [{name}] [] {message}", style="{") stream_handler.setFormatter(formatter) self.xapi_logger = getLogger("xapi_tracking") self.xapi_logger.setLevel(logging.INFO) diff --git a/xapi_db_load/tests/fixtures/small_vector_config.yaml b/xapi_db_load/tests/fixtures/small_vector_config.yaml new file mode 100644 index 0000000..0bc4df6 --- /dev/null +++ b/xapi_db_load/tests/fixtures/small_vector_config.yaml @@ -0,0 +1,54 @@ +# Test configuration for Ralph / ClickHouse +# ######################################### +backend: vector +db_host: localhost +db_port: 8123 +db_name: xapi +db_username: ch_admin +db_password: foo + +# Run options +log_dir: logs +num_xapi_batches: 3 +batch_size: 100 + +# This number is used for each QueueBackend that use workers, so the number of threads if +# multiplicative. Generally this performs best less than 10, as more threads will cost more +# in context switching than they save. +num_workers: 4 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# This replicates users updating their profiles several times, creating +# more rows +num_actor_profile_changes: 5 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + +# How many times each course will be "published", this creates a more realistic +# distribution of course blocks where each course can be published dozens or +# hundreds of times while it is being developed. +num_course_publishes: 10 + +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 diff --git a/xapi_db_load/tests/test_backends.py b/xapi_db_load/tests/test_backends.py index e61d60f..abbee14 100644 --- a/xapi_db_load/tests/test_backends.py +++ b/xapi_db_load/tests/test_backends.py @@ -3,7 +3,9 @@ """ import gzip +import json import os +import re from contextlib import contextmanager from unittest.mock import AsyncMock, MagicMock, patch @@ -120,6 +122,66 @@ def test_clickhouse_backend(_, tmp_path): assert "Run duration was" in result.output +@patch( + "xapi_db_load.backends.base_async_backend.clickhouse_connect", + new_callable=AsyncMock, +) +@patch( + "xapi_db_load.backends.vector.getLogger", + new_callable=MagicMock, +) +def test_vector_backend(mock_get_logger, _, tmp_path): + """ + Run a test through the Vector backend, currently this just checks that the + output indicates success. + """ + test_path = "xapi_db_load/tests/fixtures/small_vector_config.yaml" + + runner = CliRunner() + + with override_config(test_path, tmp_path): + result = runner.invoke( + load_db, + f"--config_file {test_path}", + catch_exceptions=False, + ) + + # This test should create 300 xAPI log statemetns + assert mock_get_logger.return_value.info.call_count == 300 + + last_logged_statement = mock_get_logger.return_value.info.call_args.args[0] + + # We check to make sure Vector's regex will parse what we're sending. We want it to match both + # the LMS and our local logger formatter. + # This is how things are generally formatted in the LMS + test_str_1 = f"2026-02-24 20:26:13,006 INFO 42 [xapi_tracking] [user None] [ip 172.19.0.1] logger.py:41 - {last_logged_statement}" + + # This returns our message formatted with the abbreviated version we use for size and speed purposes + formatter = mock_get_logger.return_value.addHandler.call_args.args[0].formatter + test_str_2 = formatter._fmt.format( + name="xapi_tracking", message=last_logged_statement + ) + + # This is a direct copy and paste from Aspects' Vector common-post.toml + msg_regex = r"^.* \[xapi_tracking\] [^{}]* (?P\{.*\})$" + + # Quick test to make sure that what's being stored is at least parseable + for s in (test_str_1, test_str_2): + try: + statement = re.match(msg_regex, s).groups()[0] + json.loads(statement) + except Exception as e: + print(e) + print("Exception! Regex testing: ") + print(s) + raise + + assert "Insert xAPI Events complete." in result.output + assert "Insert Initial Enrollments complete." in result.output + assert "ALL TASKS DONE!" in result.output + assert "Run duration was" in result.output + + @patch("xapi_db_load.backends.ralph.requests", new_callable=AsyncMock) @patch( "xapi_db_load.backends.base_async_backend.clickhouse_connect", From 3b4aad19e9198df6795603de7f03187bd946db83 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 16 Mar 2026 12:53:07 -0400 Subject: [PATCH 3/5] fix: use aware datetimes Originally Authored by @Ian2012 --- xapi_db_load/course_configs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xapi_db_load/course_configs.py b/xapi_db_load/course_configs.py index 8a325e2..ff81528 100644 --- a/xapi_db_load/course_configs.py +++ b/xapi_db_load/course_configs.py @@ -182,11 +182,11 @@ def get_random_emission_time(self, actor=None): assert self.end_date # Make sure we're passing in a datetime, not a date - start = datetime.datetime.combine(start, datetime.time()) + start = datetime.datetime.combine(start, datetime.time(), tzinfo=datetime.UTC) # time() is midnight, so make sure we get that last day in there end = datetime.datetime.combine( - self.end_date, datetime.time() + self.end_date, datetime.time(), tzinfo=datetime.UTC ) + datetime.timedelta(days=1) return self._random_datetime(start_datetime=start, end_datetime=end) From 15f5df8b3e866a6fa9fd4fb57b801259aef4f723 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Tue, 17 Mar 2026 08:48:32 -0400 Subject: [PATCH 4/5] chore: Bump version to 2.1 --- xapi_db_load/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xapi_db_load/__init__.py b/xapi_db_load/__init__.py index ca71f7e..c4fd62b 100644 --- a/xapi_db_load/__init__.py +++ b/xapi_db_load/__init__.py @@ -2,4 +2,4 @@ Scripts to generate fake xAPI data against various backends. """ -__version__ = "3.0.0" +__version__ = "3.1.0" From 7cd70cfcfdac59da0fd3c6d80bf58eb0fc08bcfa Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Thu, 19 Mar 2026 09:28:30 -0400 Subject: [PATCH 5/5] fix: Vector config in UI and UI crash on smaller screens --- xapi_db_load/backends/vector.py | 2 +- xapi_db_load/ui/load_ui.py | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/xapi_db_load/backends/vector.py b/xapi_db_load/backends/vector.py index 2139d72..b1f4383 100644 --- a/xapi_db_load/backends/vector.py +++ b/xapi_db_load/backends/vector.py @@ -29,7 +29,7 @@ class AsyncVectorTasks(BaseBackendTasks): def __repr__(self) -> str: - return f"AsyncVectorTasks: {self.config['lrs_url']} -> {self.config['db_host']}" + return f"AsyncVectorTasks: {self.config['db_host']}" def get_test_data_tasks(self): """ diff --git a/xapi_db_load/ui/load_ui.py b/xapi_db_load/ui/load_ui.py index 6e12a69..2569088 100644 --- a/xapi_db_load/ui/load_ui.py +++ b/xapi_db_load/ui/load_ui.py @@ -28,11 +28,9 @@ def __init__(self, app): s = app.runner.backend.get_backend_summary() self.summary = urwid.Text( - """{backend} - - {num_xapi_batches} batches of {batch_size} events for {total_events:,} events - - {num_actors} actors, with profiles saved {num_actor_profile_changes} times - - {num_courses} courses, with {num_course_publishes} publishes - """.format(**s) + "{backend}- {num_xapi_batches} x {batch_size} events for {total_events:,} events".format( + **s + ) ) self.go_button = urwid.Button(GO_TEXT, self.go_pressed) self.load_button = urwid.Button(LOAD_TEXT, self.load_pressed)