diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..533db30 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright 2025 Channable + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index c951875..fa7eee8 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,93 @@ The specific advantages for opsqueue are: * One standardized queuing system that can be reused again and again * A single way to implement monitoring, alerting, and debugging workflows +## Getting Started: + +### 1. Grab the `opsqueue` binary and the Python client library + +The binary can be installed from this repo, or build it from source by cloning the repo using `just build` or `cargo build`. +This is a self-contained program, you can run it on a server on its own, include it in a tiny container, etc. + +The Python library used by the `Consumer` and `Producer` can be built from source, or (soon) simply be installed from pypi. (`pip install opsqueue`,`uv install opsqueue` etc.) + +### 2. Create a `Producer` + +```python +import logging +from opsqueue.producer import ProducerClient +from collections.abc import Iterable + +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG) + +def file_to_words(filename: str) -> Iterable[str]: + """ + Iterates over each word and inter-word whitespace strings in a file + while keeping at most one line in memory at a time. + """ + with open(filename) as input_file: + for line in input_file: + for word in line.split(): + yield word + +def print_words(words: Iterable[str]) -> None: + """ + Prints all words and inter-word whitespace tokens + without first loading the full string into memory + """ + for word in words: + print(word, end="") + +def main() -> None: + client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/") + stream_of_words = file_to_words("lipsum.txt") + stream_of_capitalized_words = client.run_submission(stream_of_words, chunk_size=4000) + print_words(stream_of_capitalized_words) + +if __name__ == "__main__": + main() +``` + +### 3. Create a `Consumer` + +```python +import logging +from opsqueue.consumer import ConsumerClient, Strategy + +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) + +def capitalize_word(word: str) -> str: + output = word.capitalize() + # print(f"Capitalized word: {word} -> {output}") + return output + +def main() -> None: + client = ConsumerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/") + client.run_each_op(capitalize_word, strategy=Strategy.Random()) + +if __name__ == "__main__": + main() +``` + + +4. Run the Producer, queue and Consumer + +- Run `opsqueue`. +- Run `python3 capitalize_text_consumer.py` to run a consumer. Feel free to start multiple instances of this program to try out consumer concurrency. +- Run `python3 capitalize_text_producer.py` to run a producer. + +The order you start these in does not matter; systems will reconnect and continue after any kind of failure or disconnect. + +By default the queue will listen on `http://localhost:3999`. The exact port can of course be changed. +Producer and Consumer need to share the same object store location to store the content of their submission chunks. +In development, this can be a local folder as shown in the code above. +In production, you probably want to use Google's GCS, Amazon's S3 or Microsoft's Azure buckets. + +Please tinker with above code! +If you want more logging to look under the hood, run `RUST_LOG=debug opsqueue` to enable extra logging for the queue. +The Producer/Consumer will use whatever log level is configured in Python. + +More examples can be found in `./libs/opsqueue_python/examples/` + ## Project structure The majority of Opsqueue's code is written in Rust. diff --git a/libs/opsqueue_python/README.md b/libs/opsqueue_python/README.md index f89d082..89203fc 100644 --- a/libs/opsqueue_python/README.md +++ b/libs/opsqueue_python/README.md @@ -1,22 +1,3 @@ -# How to run +The Python client library for the Opsqueue lightweight batch processing queue system. -1. Move to the `opsqueue_consumer` subdirectory. With `direnv`, the extra `.envrc` in those directories will load an (essentially empty) Python virtual environment. This is necessary to make the next step work. - -2. Any time you change any Rust code, run [maturin](https://github.com/PyO3/maturin), specifically `maturin develop` to update the Rust<->Python library bindings: -```bash -maturin develop -``` - -3. Now, just run a Python shell which now (courtesy of the virtual env) has access to the `opsqueue_consumer` module using: -```bash -python -``` - -# Structure - -All logic happens inside the main `opsqueue` crate. -Only the Python-specific parts live inside this library. - -You will notice that some structs/enums are defined which seem to be 1:1 copies of definitions inside the main crate. -This is because we cannot add PyO3-specific code, macro calls, conversions, etc. inside the main crate. -And note that this duplication is _fake_ duplication: In cases where we want the Python interface to diverge slightly (or significantly) from the Rust crate's to make it more Python-idiomatic, the types will stop being identical. +Find the full README with examples at https://github.com/channable/opsqueue diff --git a/libs/opsqueue_python/pyproject.toml b/libs/opsqueue_python/pyproject.toml index 6812906..f35ad42 100644 --- a/libs/opsqueue_python/pyproject.toml +++ b/libs/opsqueue_python/pyproject.toml @@ -4,6 +4,12 @@ build-backend="maturin" [project] name = "opsqueue" +readme="README.md" +description = "Python client library for Opsqueue, the lightweight batch processing queue for heavy loads" +license="MIT" +keywords=["queue", "processing", "paralellism", "distributed", "batch", "producer", "consumer"] + + requires-python = ">=3.8" classifiers = [ "Programming Language :: Rust", @@ -20,6 +26,10 @@ dependencies = [ "opentelemetry-exporter-otlp", ] +[project.urls] +Repository="https://github.com/channable/opsqueue" +Issues="https://github.com/channable/opsqueue/issues" + [tool.maturin] features = ["pyo3/extension-module"] diff --git a/opsqueue/.sqlx/query-08fd98126c44dcbd07eb449d09e238c6ea5d468ba73a8b4b1d5f2df9e4bd39b0.json b/opsqueue/.sqlx/query-08fd98126c44dcbd07eb449d09e238c6ea5d468ba73a8b4b1d5f2df9e4bd39b0.json new file mode 100644 index 0000000..1999ee0 --- /dev/null +++ b/opsqueue/.sqlx/query-08fd98126c44dcbd07eb449d09e238c6ea5d468ba73a8b4b1d5f2df9e4bd39b0.json @@ -0,0 +1,23 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT metadata_key, metadata_value FROM chunks_metadata\n WHERE submission_id = $1 AND chunk_index = $2\n ", + "describe": { + "columns": [ + { + "name": "metadata_key", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "metadata_value", + "ordinal": 1, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false, false] + }, + "hash": "08fd98126c44dcbd07eb449d09e238c6ea5d468ba73a8b4b1d5f2df9e4bd39b0" +} diff --git a/opsqueue/.sqlx/query-0f5c290f571f139bc5651240ec05a9da0a4e3a22d1291bc431afe6b976156ca5.json b/opsqueue/.sqlx/query-0f5c290f571f139bc5651240ec05a9da0a4e3a22d1291bc431afe6b976156ca5.json new file mode 100644 index 0000000..7e11e91 --- /dev/null +++ b/opsqueue/.sqlx/query-0f5c290f571f139bc5651240ec05a9da0a4e3a22d1291bc431afe6b976156ca5.json @@ -0,0 +1,48 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO submissions_failed\n (id, chunks_total, prefix, metadata, failed_at, failed_chunk_id)\n SELECT id, chunks_total, prefix, metadata, julianday($1), $2 FROM submissions WHERE id = $3;\n\n DELETE FROM submissions WHERE id = $4 RETURNING *;\n ", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "prefix", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "chunks_total", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "chunks_done", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 4, + "type_info": "Blob" + }, + { + "name": "otel_trace_carrier", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "chunk_size", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 4 + }, + "nullable": [false, true, false, false, true, false, true] + }, + "hash": "0f5c290f571f139bc5651240ec05a9da0a4e3a22d1291bc431afe6b976156ca5" +} diff --git a/opsqueue/.sqlx/query-11d4e8c27b08ec013a154838137bab496b9e37ce482189f38eef19b20fc447f4.json b/opsqueue/.sqlx/query-11d4e8c27b08ec013a154838137bab496b9e37ce482189f38eef19b20fc447f4.json new file mode 100644 index 0000000..502dfc8 --- /dev/null +++ b/opsqueue/.sqlx/query-11d4e8c27b08ec013a154838137bab496b9e37ce482189f38eef19b20fc447f4.json @@ -0,0 +1,33 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n submission_id AS \"submission_id: SubmissionId\"\n , chunk_index AS \"chunk_index: ChunkIndex\"\n , input_content\n , retries\n FROM chunks WHERE submission_id = $1 AND chunk_index = $2\n ", + "describe": { + "columns": [ + { + "name": "submission_id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "chunk_index: ChunkIndex", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "input_content", + "ordinal": 2, + "type_info": "Blob" + }, + { + "name": "retries", + "ordinal": 3, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false, false, true, false] + }, + "hash": "11d4e8c27b08ec013a154838137bab496b9e37ce482189f38eef19b20fc447f4" +} diff --git a/opsqueue/.sqlx/query-13b72b517c04819669c5139d0a823e1d009a0612eeecf94ec68401d36d5434e1.json b/opsqueue/.sqlx/query-13b72b517c04819669c5139d0a823e1d009a0612eeecf94ec68401d36d5434e1.json new file mode 100644 index 0000000..60f8a28 --- /dev/null +++ b/opsqueue/.sqlx/query-13b72b517c04819669c5139d0a823e1d009a0612eeecf94ec68401d36d5434e1.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM submissions_metadata\n WHERE submission_id = (\n SELECT id FROM submissions_failed WHERE failed_at < julianday($1)\n );", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "13b72b517c04819669c5139d0a823e1d009a0612eeecf94ec68401d36d5434e1" +} diff --git a/opsqueue/.sqlx/query-17ef938e2fc0ac7bdc09690094b055db5edcb4af140151dc389548d010de4b5f.json b/opsqueue/.sqlx/query-17ef938e2fc0ac7bdc09690094b055db5edcb4af140151dc389548d010de4b5f.json new file mode 100644 index 0000000..e475140 --- /dev/null +++ b/opsqueue/.sqlx/query-17ef938e2fc0ac7bdc09690094b055db5edcb4af140151dc389548d010de4b5f.json @@ -0,0 +1,48 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n id AS \"id: SubmissionId\"\n , prefix\n , chunks_total AS \"chunks_total: ChunkCount\"\n , chunk_size AS \"chunk_size: ChunkSize\"\n , metadata\n , completed_at AS \"completed_at: DateTime\"\n , otel_trace_carrier\n FROM submissions_completed WHERE id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "prefix", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "chunks_total: ChunkCount", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "chunk_size: ChunkSize", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 4, + "type_info": "Blob" + }, + { + "name": "completed_at: DateTime", + "ordinal": 5, + "type_info": "Datetime" + }, + { + "name": "otel_trace_carrier", + "ordinal": 6, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [false, true, false, true, true, false, false] + }, + "hash": "17ef938e2fc0ac7bdc09690094b055db5edcb4af140151dc389548d010de4b5f" +} diff --git a/opsqueue/.sqlx/query-18f783ff8500f06825c985530486c0624c112f61a5816d4c4528f9b3151713e4.json b/opsqueue/.sqlx/query-18f783ff8500f06825c985530486c0624c112f61a5816d4c4528f9b3151713e4.json new file mode 100644 index 0000000..b84195d --- /dev/null +++ b/opsqueue/.sqlx/query-18f783ff8500f06825c985530486c0624c112f61a5816d4c4528f9b3151713e4.json @@ -0,0 +1,48 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT id AS \"id: SubmissionId\"\n , prefix\n , chunks_total AS \"chunks_total: ChunkCount\"\n , chunks_done AS \"chunks_done: ChunkCount\"\n , chunk_size AS \"chunk_size: ChunkSize\"\n , metadata\n , otel_trace_carrier\n FROM submissions WHERE id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "prefix", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "chunks_total: ChunkCount", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "chunks_done: ChunkCount", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "chunk_size: ChunkSize", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 5, + "type_info": "Blob" + }, + { + "name": "otel_trace_carrier", + "ordinal": 6, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [false, true, false, false, true, true, false] + }, + "hash": "18f783ff8500f06825c985530486c0624c112f61a5816d4c4528f9b3151713e4" +} diff --git a/opsqueue/.sqlx/query-1b3e8cc0730f75e690337f1834d28d8ae98d1fb97195f47c36a7c45038edb7e8.json b/opsqueue/.sqlx/query-1b3e8cc0730f75e690337f1834d28d8ae98d1fb97195f47c36a7c45038edb7e8.json new file mode 100644 index 0000000..9d1b07a --- /dev/null +++ b/opsqueue/.sqlx/query-1b3e8cc0730f75e690337f1834d28d8ae98d1fb97195f47c36a7c45038edb7e8.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM submissions_metadata\n WHERE submission_id = (\n SELECT id FROM submissions_completed WHERE completed_at < julianday($1)\n );", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "1b3e8cc0730f75e690337f1834d28d8ae98d1fb97195f47c36a7c45038edb7e8" +} diff --git a/opsqueue/.sqlx/query-207eb97fe104f49008ebe798860f6340f180a1afde82dd1c08dc4b2b650fbfe2.json b/opsqueue/.sqlx/query-207eb97fe104f49008ebe798860f6340f180a1afde82dd1c08dc4b2b650fbfe2.json new file mode 100644 index 0000000..f0c2f70 --- /dev/null +++ b/opsqueue/.sqlx/query-207eb97fe104f49008ebe798860f6340f180a1afde82dd1c08dc4b2b650fbfe2.json @@ -0,0 +1,33 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n submission_id AS \"submission_id: SubmissionId\"\n , chunk_index AS \"chunk_index: ChunkIndex\"\n , output_content\n , completed_at AS \"completed_at: DateTime\"\n FROM chunks_completed WHERE submission_id = $1 AND chunk_index = $2\n ", + "describe": { + "columns": [ + { + "name": "submission_id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "chunk_index: ChunkIndex", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "output_content", + "ordinal": 2, + "type_info": "Blob" + }, + { + "name": "completed_at: DateTime", + "ordinal": 3, + "type_info": "Datetime" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false, false, true, false] + }, + "hash": "207eb97fe104f49008ebe798860f6340f180a1afde82dd1c08dc4b2b650fbfe2" +} diff --git a/opsqueue/.sqlx/query-2b77ed3badfb9fe0fd7b88635ef9d11f88e91b1cfcef1d93e814ab2da07addcb.json b/opsqueue/.sqlx/query-2b77ed3badfb9fe0fd7b88635ef9d11f88e91b1cfcef1d93e814ab2da07addcb.json new file mode 100644 index 0000000..b724699 --- /dev/null +++ b/opsqueue/.sqlx/query-2b77ed3badfb9fe0fd7b88635ef9d11f88e91b1cfcef1d93e814ab2da07addcb.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COUNT(1) as count FROM chunks_failed;", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "2b77ed3badfb9fe0fd7b88635ef9d11f88e91b1cfcef1d93e814ab2da07addcb" +} diff --git a/opsqueue/.sqlx/query-3d130615a116adca109be4759b7690ad2ca3ddd1912e1539bdc209f51a2282aa.json b/opsqueue/.sqlx/query-3d130615a116adca109be4759b7690ad2ca3ddd1912e1539bdc209f51a2282aa.json new file mode 100644 index 0000000..d65bc72 --- /dev/null +++ b/opsqueue/.sqlx/query-3d130615a116adca109be4759b7690ad2ca3ddd1912e1539bdc209f51a2282aa.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM submissions_completed WHERE completed_at < julianday($1);", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "3d130615a116adca109be4759b7690ad2ca3ddd1912e1539bdc209f51a2282aa" +} diff --git a/opsqueue/.sqlx/query-3dde3eaab357df88cd1c0e0e8e833aca1fce450c2d842a2af63f0e4c128424f7.json b/opsqueue/.sqlx/query-3dde3eaab357df88cd1c0e0e8e833aca1fce450c2d842a2af63f0e4c128424f7.json new file mode 100644 index 0000000..64daa6a --- /dev/null +++ b/opsqueue/.sqlx/query-3dde3eaab357df88cd1c0e0e8e833aca1fce450c2d842a2af63f0e4c128424f7.json @@ -0,0 +1,48 @@ +{ + "db_name": "SQLite", + "query": "\n SAVEPOINT complete_submission_raw;\n\n INSERT INTO submissions_completed\n (id, chunks_total, prefix, metadata, completed_at)\n SELECT id, chunks_total, prefix, metadata, julianday($1) FROM submissions WHERE id = $2;\n\n DELETE FROM submissions WHERE id = $3 RETURNING *;\n\n RELEASE SAVEPOINT complete_submission_raw;\n ", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "prefix", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "chunks_total", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "chunks_done", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 4, + "type_info": "Blob" + }, + { + "name": "otel_trace_carrier", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "chunk_size", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 3 + }, + "nullable": [false, true, false, false, true, false, true] + }, + "hash": "3dde3eaab357df88cd1c0e0e8e833aca1fce450c2d842a2af63f0e4c128424f7" +} diff --git a/opsqueue/.sqlx/query-3f33fea7cc44678f448d8cf3c0aeb59f155dd89f019f7c66c9609083f3a3dc6f.json b/opsqueue/.sqlx/query-3f33fea7cc44678f448d8cf3c0aeb59f155dd89f019f7c66c9609083f3a3dc6f.json new file mode 100644 index 0000000..67004d5 --- /dev/null +++ b/opsqueue/.sqlx/query-3f33fea7cc44678f448d8cf3c0aeb59f155dd89f019f7c66c9609083f3a3dc6f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO submissions (id, prefix, chunks_total, chunks_done, metadata, otel_trace_carrier, chunk_size)\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 7 + }, + "nullable": [] + }, + "hash": "3f33fea7cc44678f448d8cf3c0aeb59f155dd89f019f7c66c9609083f3a3dc6f" +} diff --git a/opsqueue/.sqlx/query-40e96fd79c38c8338bcc2f18fbc43c8f71b1c6e73e73ce5f7687db469377c137.json b/opsqueue/.sqlx/query-40e96fd79c38c8338bcc2f18fbc43c8f71b1c6e73e73ce5f7687db469377c137.json new file mode 100644 index 0000000..1ce8f24 --- /dev/null +++ b/opsqueue/.sqlx/query-40e96fd79c38c8338bcc2f18fbc43c8f71b1c6e73e73ce5f7687db469377c137.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "UPDATE submissions SET chunks_done = chunks_done + 1 WHERE submissions.id = $1 RETURNING submissions.chunk_size;", + "describe": { + "columns": [ + { + "name": "chunk_size", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [true] + }, + "hash": "40e96fd79c38c8338bcc2f18fbc43c8f71b1c6e73e73ce5f7687db469377c137" +} diff --git a/opsqueue/.sqlx/query-47aa7f2a4838460952bb215bef9a036c487350a9886f9747ff0890849637070f.json b/opsqueue/.sqlx/query-47aa7f2a4838460952bb215bef9a036c487350a9886f9747ff0890849637070f.json new file mode 100644 index 0000000..fe364ef --- /dev/null +++ b/opsqueue/.sqlx/query-47aa7f2a4838460952bb215bef9a036c487350a9886f9747ff0890849637070f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM chunks_failed WHERE failed_at < julianday($1);", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "47aa7f2a4838460952bb215bef9a036c487350a9886f9747ff0890849637070f" +} diff --git a/opsqueue/.sqlx/query-5d1f0e786821af3135eb9022004308af3c683ba3cca9c97677072243289cb322.json b/opsqueue/.sqlx/query-5d1f0e786821af3135eb9022004308af3c683ba3cca9c97677072243289cb322.json new file mode 100644 index 0000000..0a4cd49 --- /dev/null +++ b/opsqueue/.sqlx/query-5d1f0e786821af3135eb9022004308af3c683ba3cca9c97677072243289cb322.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COUNT(1) as count FROM chunks;", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "5d1f0e786821af3135eb9022004308af3c683ba3cca9c97677072243289cb322" +} diff --git a/opsqueue/.sqlx/query-5f7d80f5b4990d9d4e0cca6491d7a34acf1328802fda82eb2281b59a792820de.json b/opsqueue/.sqlx/query-5f7d80f5b4990d9d4e0cca6491d7a34acf1328802fda82eb2281b59a792820de.json new file mode 100644 index 0000000..505b3bc --- /dev/null +++ b/opsqueue/.sqlx/query-5f7d80f5b4990d9d4e0cca6491d7a34acf1328802fda82eb2281b59a792820de.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COUNT(1) as count FROM submissions_failed;", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "5f7d80f5b4990d9d4e0cca6491d7a34acf1328802fda82eb2281b59a792820de" +} diff --git a/opsqueue/.sqlx/query-62b1b5988a509f9722489522fbe3c650e1e04bcf51db8f97cbfb85a3e3d0c3bc.json b/opsqueue/.sqlx/query-62b1b5988a509f9722489522fbe3c650e1e04bcf51db8f97cbfb85a3e3d0c3bc.json new file mode 100644 index 0000000..55aa8c1 --- /dev/null +++ b/opsqueue/.sqlx/query-62b1b5988a509f9722489522fbe3c650e1e04bcf51db8f97cbfb85a3e3d0c3bc.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COUNT(1) as count FROM submissions_completed;", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "62b1b5988a509f9722489522fbe3c650e1e04bcf51db8f97cbfb85a3e3d0c3bc" +} diff --git a/opsqueue/.sqlx/query-643bc1f502a5c1cf79773ba3db2a7094263151c99f123eb5c2fa01c303b29730.json b/opsqueue/.sqlx/query-643bc1f502a5c1cf79773ba3db2a7094263151c99f123eb5c2fa01c303b29730.json new file mode 100644 index 0000000..a3fe6ba --- /dev/null +++ b/opsqueue/.sqlx/query-643bc1f502a5c1cf79773ba3db2a7094263151c99f123eb5c2fa01c303b29730.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO submissions_metadata\n ( submission_id\n , metadata_key\n , metadata_value\n )\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "643bc1f502a5c1cf79773ba3db2a7094263151c99f123eb5c2fa01c303b29730" +} diff --git a/opsqueue/.sqlx/query-6c3fac07b6b1b07ccdaaf0d296647176104b93c64a7a9fb85e2fd1e88ec13143.json b/opsqueue/.sqlx/query-6c3fac07b6b1b07ccdaaf0d296647176104b93c64a7a9fb85e2fd1e88ec13143.json new file mode 100644 index 0000000..8dd9d5e --- /dev/null +++ b/opsqueue/.sqlx/query-6c3fac07b6b1b07ccdaaf0d296647176104b93c64a7a9fb85e2fd1e88ec13143.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COUNT(1) as count FROM submissions;", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "6c3fac07b6b1b07ccdaaf0d296647176104b93c64a7a9fb85e2fd1e88ec13143" +} diff --git a/opsqueue/.sqlx/query-833bc5d955f1c2455e06e9762dec84c4c24fe58c21165e6a80d0a5bfae1a3cd7.json b/opsqueue/.sqlx/query-833bc5d955f1c2455e06e9762dec84c4c24fe58c21165e6a80d0a5bfae1a3cd7.json new file mode 100644 index 0000000..257cff9 --- /dev/null +++ b/opsqueue/.sqlx/query-833bc5d955f1c2455e06e9762dec84c4c24fe58c21165e6a80d0a5bfae1a3cd7.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "\n UPDATE chunks SET retries = retries + 1\n WHERE submission_id = $1 AND chunk_index = $2\n RETURNING retries;\n ", + "describe": { + "columns": [ + { + "name": "retries", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false] + }, + "hash": "833bc5d955f1c2455e06e9762dec84c4c24fe58c21165e6a80d0a5bfae1a3cd7" +} diff --git a/opsqueue/.sqlx/query-8b8e30b00dfb87367222b690744540438f1762a55694b429dfb4eeb15c230bd5.json b/opsqueue/.sqlx/query-8b8e30b00dfb87367222b690744540438f1762a55694b429dfb4eeb15c230bd5.json new file mode 100644 index 0000000..00bf073 --- /dev/null +++ b/opsqueue/.sqlx/query-8b8e30b00dfb87367222b690744540438f1762a55694b429dfb4eeb15c230bd5.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO chunks_metadata\n ( submission_id\n , chunk_index\n , metadata_key\n , metadata_value\n )\n VALUES ($1, $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "8b8e30b00dfb87367222b690744540438f1762a55694b429dfb4eeb15c230bd5" +} diff --git a/opsqueue/.sqlx/query-91d3b5ff8e68c788d3209b4f1234a519eba5773d88c91635f5ffb01b1b5302fa.json b/opsqueue/.sqlx/query-91d3b5ff8e68c788d3209b4f1234a519eba5773d88c91635f5ffb01b1b5302fa.json new file mode 100644 index 0000000..d74b332 --- /dev/null +++ b/opsqueue/.sqlx/query-91d3b5ff8e68c788d3209b4f1234a519eba5773d88c91635f5ffb01b1b5302fa.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COALESCE(SUM(chunk_size * 1.0), 0.0) as count FROM chunks INNER JOIN submissions ON chunks.submission_id = submissions.id", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Float" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "91d3b5ff8e68c788d3209b4f1234a519eba5773d88c91635f5ffb01b1b5302fa" +} diff --git a/opsqueue/.sqlx/query-a239d197e4047338a8b0a0549ce792a4326fb523efa9608b285795f9f026103a.json b/opsqueue/.sqlx/query-a239d197e4047338a8b0a0549ce792a4326fb523efa9608b285795f9f026103a.json new file mode 100644 index 0000000..713e6bf --- /dev/null +++ b/opsqueue/.sqlx/query-a239d197e4047338a8b0a0549ce792a4326fb523efa9608b285795f9f026103a.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM chunks_completed WHERE completed_at < julianday($1);", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "a239d197e4047338a8b0a0549ce792a4326fb523efa9608b285795f9f026103a" +} diff --git a/opsqueue/.sqlx/query-b7c7b04d131024ee962a37970a1dc12732c8b627522d508decaa1c164923075f.json b/opsqueue/.sqlx/query-b7c7b04d131024ee962a37970a1dc12732c8b627522d508decaa1c164923075f.json new file mode 100644 index 0000000..0fd668e --- /dev/null +++ b/opsqueue/.sqlx/query-b7c7b04d131024ee962a37970a1dc12732c8b627522d508decaa1c164923075f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM submissions_failed WHERE failed_at < julianday($1);", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "b7c7b04d131024ee962a37970a1dc12732c8b627522d508decaa1c164923075f" +} diff --git a/opsqueue/.sqlx/query-c149e54e9a57cd5ded5e9e2656116ba9d04e56ffdbb41bb152ae5762475b5c78.json b/opsqueue/.sqlx/query-c149e54e9a57cd5ded5e9e2656116ba9d04e56ffdbb41bb152ae5762475b5c78.json new file mode 100644 index 0000000..1419ef9 --- /dev/null +++ b/opsqueue/.sqlx/query-c149e54e9a57cd5ded5e9e2656116ba9d04e56ffdbb41bb152ae5762475b5c78.json @@ -0,0 +1,48 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n id AS \"id: SubmissionId\"\n , prefix\n , chunks_total AS \"chunks_total: ChunkCount\"\n , chunks_done AS \"chunks_done: ChunkCount\"\n , chunk_size AS \"chunk_size: ChunkSize\"\n , metadata\n , otel_trace_carrier\n FROM submissions WHERE id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "prefix", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "chunks_total: ChunkCount", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "chunks_done: ChunkCount", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "chunk_size: ChunkSize", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 5, + "type_info": "Blob" + }, + { + "name": "otel_trace_carrier", + "ordinal": 6, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [false, true, false, false, true, true, false] + }, + "hash": "c149e54e9a57cd5ded5e9e2656116ba9d04e56ffdbb41bb152ae5762475b5c78" +} diff --git a/opsqueue/.sqlx/query-c4fc111b75ae4036c16042be654600dd598dddd4ee9412522b22127e5bceedc1.json b/opsqueue/.sqlx/query-c4fc111b75ae4036c16042be654600dd598dddd4ee9412522b22127e5bceedc1.json new file mode 100644 index 0000000..12d8ee5 --- /dev/null +++ b/opsqueue/.sqlx/query-c4fc111b75ae4036c16042be654600dd598dddd4ee9412522b22127e5bceedc1.json @@ -0,0 +1,23 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO chunks_completed\n (submission_id, chunk_index, output_content, completed_at)\n SELECT submission_id, chunk_index, $1, julianday($2) FROM chunks\n WHERE chunks.submission_id = $3 AND chunks.chunk_index = $4;\n\n DELETE FROM chunks WHERE chunks.submission_id = $5 AND chunks.chunk_index = $6\n RETURNING submission_id, chunk_index;\n ", + "describe": { + "columns": [ + { + "name": "submission_id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "chunk_index", + "ordinal": 1, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 6 + }, + "nullable": [false, false] + }, + "hash": "c4fc111b75ae4036c16042be654600dd598dddd4ee9412522b22127e5bceedc1" +} diff --git a/opsqueue/.sqlx/query-c63e36ec8d3675dd160e289451f181cff1c82b07427f0b4d4b45132a9d3dcc6d.json b/opsqueue/.sqlx/query-c63e36ec8d3675dd160e289451f181cff1c82b07427f0b4d4b45132a9d3dcc6d.json new file mode 100644 index 0000000..746a3b4 --- /dev/null +++ b/opsqueue/.sqlx/query-c63e36ec8d3675dd160e289451f181cff1c82b07427f0b4d4b45132a9d3dcc6d.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n SAVEPOINT skip_remaining_chunks;\n\n INSERT INTO chunks_failed\n (submission_id, chunk_index, input_content, failure, skipped, failed_at)\n SELECT submission_id, chunk_index, input_content, '', 1, julianday($1) FROM chunks WHERE chunks.submission_id = $2;\n\n DELETE FROM chunks WHERE chunks.submission_id = $3;\n\n RELEASE SAVEPOINT skip_remaining_chunks;\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "c63e36ec8d3675dd160e289451f181cff1c82b07427f0b4d4b45132a9d3dcc6d" +} diff --git a/opsqueue/.sqlx/query-c7d9c26e33f222d9b685d3a74a69c3cdd4bce3f9c4fc19e518b1d48930ba793d.json b/opsqueue/.sqlx/query-c7d9c26e33f222d9b685d3a74a69c3cdd4bce3f9c4fc19e518b1d48930ba793d.json new file mode 100644 index 0000000..4373470 --- /dev/null +++ b/opsqueue/.sqlx/query-c7d9c26e33f222d9b685d3a74a69c3cdd4bce3f9c4fc19e518b1d48930ba793d.json @@ -0,0 +1,53 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n id AS \"id: SubmissionId\"\n , prefix\n , chunks_total AS \"chunks_total: ChunkCount\"\n , chunk_size AS \"chunk_size: ChunkSize\"\n , metadata\n , failed_at AS \"failed_at: DateTime\"\n , failed_chunk_id AS \"failed_chunk_id: ChunkIndex\"\n , otel_trace_carrier\n FROM submissions_failed WHERE id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "prefix", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "chunks_total: ChunkCount", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "chunk_size: ChunkSize", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 4, + "type_info": "Blob" + }, + { + "name": "failed_at: DateTime", + "ordinal": 5, + "type_info": "Datetime" + }, + { + "name": "failed_chunk_id: ChunkIndex", + "ordinal": 6, + "type_info": "Integer" + }, + { + "name": "otel_trace_carrier", + "ordinal": 7, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [false, true, false, true, true, false, false, false] + }, + "hash": "c7d9c26e33f222d9b685d3a74a69c3cdd4bce3f9c4fc19e518b1d48930ba793d" +} diff --git a/opsqueue/.sqlx/query-d41c3708cdd638119649b290f4a22300280f308ab8712ac5a44efd17e4680c43.json b/opsqueue/.sqlx/query-d41c3708cdd638119649b290f4a22300280f308ab8712ac5a44efd17e4680c43.json new file mode 100644 index 0000000..4614a39 --- /dev/null +++ b/opsqueue/.sqlx/query-d41c3708cdd638119649b290f4a22300280f308ab8712ac5a44efd17e4680c43.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO chunks (submission_id, chunk_index, input_content) VALUES ($1, $2, $3)", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "d41c3708cdd638119649b290f4a22300280f308ab8712ac5a44efd17e4680c43" +} diff --git a/opsqueue/.sqlx/query-d5aa781509794455e2e8e16b2ba1485efd45597a65e4bb7ae8d2888112b3a272.json b/opsqueue/.sqlx/query-d5aa781509794455e2e8e16b2ba1485efd45597a65e4bb7ae8d2888112b3a272.json new file mode 100644 index 0000000..2aa293e --- /dev/null +++ b/opsqueue/.sqlx/query-d5aa781509794455e2e8e16b2ba1485efd45597a65e4bb7ae8d2888112b3a272.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "SELECT COUNT(1) as count FROM chunks_completed;", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false] + }, + "hash": "d5aa781509794455e2e8e16b2ba1485efd45597a65e4bb7ae8d2888112b3a272" +} diff --git a/opsqueue/.sqlx/query-e282079ded2d608817a2b2d27917f097eb0ec213e5a9a4027563c44f4a05bca3.json b/opsqueue/.sqlx/query-e282079ded2d608817a2b2d27917f097eb0ec213e5a9a4027563c44f4a05bca3.json new file mode 100644 index 0000000..81d6fa0 --- /dev/null +++ b/opsqueue/.sqlx/query-e282079ded2d608817a2b2d27917f097eb0ec213e5a9a4027563c44f4a05bca3.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n submission_id AS \"submission_id: SubmissionId\"\n , chunk_index AS \"chunk_index: ChunkIndex\"\n , input_content\n , failure AS \"failure: String\"\n , failed_at AS \"failed_at: DateTime\"\n FROM chunks_failed WHERE submission_id = $1 AND chunk_index = $2\n ", + "describe": { + "columns": [ + { + "name": "submission_id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "chunk_index: ChunkIndex", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "input_content", + "ordinal": 2, + "type_info": "Blob" + }, + { + "name": "failure: String", + "ordinal": 3, + "type_info": "Blob" + }, + { + "name": "failed_at: DateTime", + "ordinal": 4, + "type_info": "Datetime" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false, false, true, false, false] + }, + "hash": "e282079ded2d608817a2b2d27917f097eb0ec213e5a9a4027563c44f4a05bca3" +} diff --git a/opsqueue/.sqlx/query-e73afd2c8d84e6c3044f0662525f97bc8ca1d36a9305fdb959ef0e577846d606.json b/opsqueue/.sqlx/query-e73afd2c8d84e6c3044f0662525f97bc8ca1d36a9305fdb959ef0e577846d606.json new file mode 100644 index 0000000..50317dd --- /dev/null +++ b/opsqueue/.sqlx/query-e73afd2c8d84e6c3044f0662525f97bc8ca1d36a9305fdb959ef0e577846d606.json @@ -0,0 +1,18 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT id AS \"id: SubmissionId\" FROM submissions WHERE prefix = $1\n UNION ALL\n SELECT id AS \"id: SubmissionId\" FROM submissions_completed WHERE prefix = $2\n UNION ALL\n SELECT id AS \"id: SubmissionId\" FROM submissions_failed WHERE prefix = $3\n ", + "describe": { + "columns": [ + { + "name": "id: SubmissionId", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 3 + }, + "nullable": [false] + }, + "hash": "e73afd2c8d84e6c3044f0662525f97bc8ca1d36a9305fdb959ef0e577846d606" +} diff --git a/opsqueue/.sqlx/query-e92c5478fe8f1e5dee3d5e73ecf4cb3f2b7e422cefb1ecc00cf17299edec83c8.json b/opsqueue/.sqlx/query-e92c5478fe8f1e5dee3d5e73ecf4cb3f2b7e422cefb1ecc00cf17299edec83c8.json new file mode 100644 index 0000000..9b017ac --- /dev/null +++ b/opsqueue/.sqlx/query-e92c5478fe8f1e5dee3d5e73ecf4cb3f2b7e422cefb1ecc00cf17299edec83c8.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n SAVEPOINT move_chunk_to_failed_chunks;\n\n INSERT INTO chunks_failed\n (submission_id, chunk_index, input_content, failure, failed_at)\n SELECT submission_id, chunk_index, input_content, $1, julianday($2) FROM chunks WHERE chunks.submission_id = $3 AND chunks.chunk_index = $4;\n\n DELETE FROM chunks WHERE chunks.submission_id = $5 AND chunks.chunk_index = $6 RETURNING *;\n\n RELEASE SAVEPOINT move_chunk_to_failed_chunks;\n ", + "describe": { + "columns": [ + { + "name": "submission_id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "chunk_index", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "input_content", + "ordinal": 2, + "type_info": "Blob" + }, + { + "name": "retries", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "random_order", + "ordinal": 4, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 6 + }, + "nullable": [false, false, true, false, false] + }, + "hash": "e92c5478fe8f1e5dee3d5e73ecf4cb3f2b7e422cefb1ecc00cf17299edec83c8" +} diff --git a/opsqueue/.sqlx/query-eda8781445cef23e17446ca9007955f377d4059478a2bda4e5bd39ecd836e838.json b/opsqueue/.sqlx/query-eda8781445cef23e17446ca9007955f377d4059478a2bda4e5bd39ecd836e838.json new file mode 100644 index 0000000..a3e64a8 --- /dev/null +++ b/opsqueue/.sqlx/query-eda8781445cef23e17446ca9007955f377d4059478a2bda4e5bd39ecd836e838.json @@ -0,0 +1,23 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT metadata_key, metadata_value FROM submissions_metadata\n WHERE submission_id = $1\n ", + "describe": { + "columns": [ + { + "name": "metadata_key", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "metadata_value", + "ordinal": 1, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [false, false] + }, + "hash": "eda8781445cef23e17446ca9007955f377d4059478a2bda4e5bd39ecd836e838" +} diff --git a/opsqueue/.sqlx/query-ef677e899bf812760d0d24c9fa1fb791a5585629b14baab7a35185469c2e2e85.json b/opsqueue/.sqlx/query-ef677e899bf812760d0d24c9fa1fb791a5585629b14baab7a35185469c2e2e85.json new file mode 100644 index 0000000..4d82179 --- /dev/null +++ b/opsqueue/.sqlx/query-ef677e899bf812760d0d24c9fa1fb791a5585629b14baab7a35185469c2e2e85.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "SELECT * FROM chunks_metadata;", + "describe": { + "columns": [ + { + "name": "submission_id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "chunk_index", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "metadata_key", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "metadata_value", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "random_order", + "ordinal": 4, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [false, false, false, false, true] + }, + "hash": "ef677e899bf812760d0d24c9fa1fb791a5585629b14baab7a35185469c2e2e85" +} diff --git a/opsqueue/Cargo.toml b/opsqueue/Cargo.toml index eb42a73..72d97df 100644 --- a/opsqueue/Cargo.toml +++ b/opsqueue/Cargo.toml @@ -2,10 +2,14 @@ name = "opsqueue" version = "0.30.0" edition = "2021" +description = "lightweight batch processing queue for heavy loads" +repository = "https://github.com/channable/opsqueue" +license = "MIT" [lib] name="opsqueue" path="src/lib.rs" +include=["opsqueue_example_database_schema.db"] [[bin]] name="opsqueue" diff --git a/opsqueue/src/common/chunk.rs b/opsqueue/src/common/chunk.rs index a1cf00e..11c795e 100644 --- a/opsqueue/src/common/chunk.rs +++ b/opsqueue/src/common/chunk.rs @@ -1,3 +1,8 @@ +//! Dealing with `Chunk``s, the most fine-grained datatype the queue itself works with. +//! +//! While the smallest datatype is the 'Operation', the Opsqueue queue binary itself +//! only deals with _chunks_ of them. The operations inside of them are hidden and only read/written +//! from/to object store all the way in the client. use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index 77108c8..38ee007 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -1,3 +1,11 @@ +//! Common error types +//! +//! You might notice in many places in Opsqueue that we use very fine-grained error types, +//! and combine them together using the `E` helper. +//! +//! This is a conscious choice: While it makes some function signatures more complex, +//! it allows us to be super precise in what kind of errors can and cannot occur +//! in certain API calls. use serde::{Deserialize, Serialize}; use thiserror::Error; diff --git a/opsqueue/src/common/mod.rs b/opsqueue/src/common/mod.rs index ac7a61c..a654274 100644 --- a/opsqueue/src/common/mod.rs +++ b/opsqueue/src/common/mod.rs @@ -1,3 +1,4 @@ +//! Common datatypes and errors shared across all parts of Opsqueue use rustc_hash::FxHashMap; pub mod chunk; diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index bb3d2c2..56ab792 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -1,3 +1,4 @@ +//! Dealing with `Submission`s: Collections of (`Chunks`s of) operations. use std::fmt::Display; use std::time::Duration; diff --git a/opsqueue/src/config.rs b/opsqueue/src/config.rs index eb86570..a73a0c0 100644 --- a/opsqueue/src/config.rs +++ b/opsqueue/src/config.rs @@ -1,3 +1,7 @@ +//! Defines the source of truth for configuring the Opsqueue queue +//! +//! We make use of the excellent `clap` crate to make customizing the configuration +//! with command-line args easier. use std::num::NonZero; use clap::Parser; diff --git a/opsqueue/src/consumer/mod.rs b/opsqueue/src/consumer/mod.rs index 133eb04..da9171a 100644 --- a/opsqueue/src/consumer/mod.rs +++ b/opsqueue/src/consumer/mod.rs @@ -1,3 +1,4 @@ +//! The Consumer side: Interface to reserve, work on, and complete/fail individual Chunks pub mod common; pub mod strategy; diff --git a/opsqueue/src/db/conn.rs b/opsqueue/src/db/conn.rs index b92d6dc..30b24ff 100644 --- a/opsqueue/src/db/conn.rs +++ b/opsqueue/src/db/conn.rs @@ -17,7 +17,7 @@ use super::{magic::*, Connection}; /// You can get a `WriterPool` from [`DBPools::writer_pool`], and a [`ReaderPool`] from /// [`DBPools::reader_pool`]. /// -/// [`Pool`]: super::pool::Pool +/// [`Pool`]: super::Pool /// [`ReaderPool`]: super::ReaderPool /// [`WriterPool`]: super::WriterPool /// [`DBPools::reader_pool`]: super::DBPools::reader_pool diff --git a/opsqueue/src/db/mod.rs b/opsqueue/src/db/mod.rs index 4b04a1e..3c3b570 100644 --- a/opsqueue/src/db/mod.rs +++ b/opsqueue/src/db/mod.rs @@ -276,7 +276,7 @@ impl WriterPool { /// /// See [`DBPools`] for further explanation about readers and writers. /// - /// [`DBPools`]: super::DBPools + /// [`DBPools`]: DBPools pub async fn writer_conn(&self) -> sqlx::Result> { self.acquire().await } @@ -287,7 +287,7 @@ impl ReaderPool { /// /// See [`DBPools`] for further explanation about readers and writers. /// - /// [`DBPools`]: super::DBPools + /// [`DBPools`]: DBPools pub async fn reader_conn(&self) -> sqlx::Result> { self.acquire().await } diff --git a/opsqueue/src/lib.rs b/opsqueue/src/lib.rs index 0c48511..0389bbd 100644 --- a/opsqueue/src/lib.rs +++ b/opsqueue/src/lib.rs @@ -1,3 +1,24 @@ +//! Opsqueue: A lightweight batch processing queue for heavy loads +//! +//! Simple 'getting started' instructions can be found [in the repository README](https://github.com/channable/opsqueue/). +//! +//! The Rust codebase defines both the 'server', which makes up the bulk of the Opsqueue binary itself, +//! and the 'client', which is the common part of functionality that clients written in different other programming languages +//! can all use. +//! +//! Many datatypes are shared between this server and client, and therefore their code lives together in the same crate. +//! Instead, we use feature-flags (`client-logic` and `server-logic`) to decide what concrete parts to include when building. +//! The set of dependencies is based on these same feature-flags. +//! Most interestingly, in the test suite we enable both feature-flags so we're able to do a bunch of round-trip testing +//! immediately in Rust code. +//! +//! # Module setup +//! - The basic logic is divided in the `producer` and `consumer` modules. These both have their own `db` submodule. +//! - Common functionality and datatypes exists in the `common` module +//! - Common database helpers live in the `db` module. +//! - Reading/writing to object stores like GCS or S3 is abstracted in the `object_store` module. +//! - Finally, extra modules to have a single source of truth for configuration of the queue, and to nicely do tracing and expose metrics exist. + pub mod common; pub mod consumer; pub mod producer; diff --git a/opsqueue/src/producer/mod.rs b/opsqueue/src/producer/mod.rs index 1870adb..fdc1fb8 100644 --- a/opsqueue/src/producer/mod.rs +++ b/opsqueue/src/producer/mod.rs @@ -1,3 +1,4 @@ +//! The Producer side: Interface to create and read submissions #[cfg(feature = "client-logic")] pub mod client; pub mod common; diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index b3f6d31..74d7514 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -1,3 +1,9 @@ +//! Define common metrics exposed via a Prometheus endpoint +//! +//! This allows inspection of how well the queue is performing under production load. +//! +//! Note that we explicitly have a separate endpoint to check the queue health, +//! which is more fine-grained than Prometheus' way to check whether a service is 'up'. use axum_prometheus::{ metrics::{describe_counter, describe_gauge, describe_histogram, gauge, Unit}, metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}, @@ -133,7 +139,7 @@ pub fn setup_prometheus() -> ( /// Returns the number of seconds contained by this TimeDelta as f64, with nanosecond precision. /// -/// Adapted from https://doc.rust-lang.org/std/time/struct.Duration.html#method.as_secs_f64 +/// Adapted from pub fn time_delta_as_f64(td: chrono::TimeDelta) -> f64 { const NANOS_PER_SEC: f64 = 1_000_000_000.0; (td.num_seconds() as f64) + (td.subsec_nanos() as f64) / NANOS_PER_SEC diff --git a/opsqueue/src/server.rs b/opsqueue/src/server.rs index 555560c..b935963 100644 --- a/opsqueue/src/server.rs +++ b/opsqueue/src/server.rs @@ -1,3 +1,4 @@ +//! Defines the HTTP endpoints that are used by both the `producer` and `consumer` APIs use std::{ any::Any, sync::{atomic::AtomicBool, Arc}, diff --git a/opsqueue/src/tracing.rs b/opsqueue/src/tracing.rs index f387ca3..705e067 100644 --- a/opsqueue/src/tracing.rs +++ b/opsqueue/src/tracing.rs @@ -1,3 +1,4 @@ +//! Helpers to read/write OpenTelemetry Tracing contexts from inside submissions stored in the queue use opentelemetry::propagation::TextMapPropagator; use opentelemetry::{propagation::TextMapCompositePropagator, Context}; use opentelemetry_http::{HeaderExtractor, HeaderInjector};