diff --git a/.gitignore b/.gitignore index 4a8240f..9c38e31 100755 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,10 @@ .bundle/ Gemfile.lock +*.log *.gem gems *sketch* *notes.* todo.md fixtures +/test/interactive/stream_name.tmp diff --git a/README.md b/README.md index d6bbf46..4a1ef6c 100755 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # messaging -Common primitives for platform-specific messaging implementations for Eventide. - -This library provides generalizations that are used by platform-specific implementations that provide concrete functionality. +Eventide messaging for Postgres ## Documentation @@ -10,4 +8,4 @@ See the [Eventide documentation site](http://docs.eventide-project.org) for more ## License -The `messaging` library is released under the [MIT License](https://github.com/eventide-project/messaging/blob/master/MIT-License.txt). +The `messaging` library is released under the [MIT License](https://github.com/eventide-project/messaging-postgres/blob/master/MIT-License.txt). diff --git a/init.rb b/init.rb index e359208..a0e8f23 100755 --- a/init.rb +++ b/init.rb @@ -1,3 +1,3 @@ -require_relative './load_path' +require_relative 'load_path' require 'messaging' diff --git a/lib/messaging.rb b/lib/messaging.rb old mode 100755 new mode 100644 diff --git a/lib/messaging/controls.rb b/lib/messaging/controls.rb index 942fb18..3ebe676 100644 --- a/lib/messaging/controls.rb +++ b/lib/messaging/controls.rb @@ -13,6 +13,5 @@ require 'messaging/controls/message_data' require 'messaging/controls/message' require 'messaging/controls/batch' -require 'messaging/controls/write' require 'messaging/controls/settings' require 'messaging/controls/handler' diff --git a/lib/messaging/controls/write.rb b/lib/messaging/controls/write.rb deleted file mode 100644 index 1ddaa99..0000000 --- a/lib/messaging/controls/write.rb +++ /dev/null @@ -1,15 +0,0 @@ -module Messaging - module Controls - module Write - def self.example - Example.build - end - - class Example - include Messaging::Write - - virtual :configure - end - end - end -end diff --git a/lib/messaging/write.rb b/lib/messaging/write.rb index 3f78958..5ace374 100644 --- a/lib/messaging/write.rb +++ b/lib/messaging/write.rb @@ -1,48 +1,30 @@ module Messaging - module Write - class Error < RuntimeError; end - - def self.included(cls) - cls.class_exec do - include Dependency - include Virtual - include Log::Dependency - - dependency :message_writer - dependency :telemetry, ::Telemetry + class Write + include Dependency + include Virtual + include Log::Dependency - extend Build - extend Call - extend Configure - - abstract :configure + class Error < RuntimeError; end - const_set :Substitute, Substitute - end - end + dependency :message_writer, MessageStore::Write + dependency :telemetry, ::Telemetry - module Build - def build(session: nil) - instance = new - instance.configure(session: session) - ::Telemetry.configure instance - instance - end + def self.build(session: nil) + instance = new + MessageStore::Write.configure(instance, attr_name: :message_writer, session: session) + ::Telemetry.configure(instance) + instance end - module Configure - def configure(receiver, session: nil, attr_name: nil) - attr_name ||= :write - instance = build(session: session) - receiver.public_send "#{attr_name}=", instance - end + def self.configure(receiver, session: nil, attr_name: nil) + attr_name ||= :write + instance = build(session: session) + receiver.public_send "#{attr_name}=", instance end - module Call - def call(message, stream_name, expected_version: nil, reply_stream_name: nil, session: nil) - instance = build(session: session) - instance.(message, stream_name, expected_version: expected_version, reply_stream_name: reply_stream_name) - end + def self.call(message, stream_name, expected_version: nil, reply_stream_name: nil, session: nil) + instance = build(session: session) + instance.(message, stream_name, expected_version: expected_version, reply_stream_name: reply_stream_name) end def call(message_or_batch, stream_name, expected_version: nil, reply_stream_name: nil) diff --git a/lib/messaging/write/substitute.rb b/lib/messaging/write/substitute.rb index 3ba34d2..116515f 100644 --- a/lib/messaging/write/substitute.rb +++ b/lib/messaging/write/substitute.rb @@ -1,5 +1,5 @@ module Messaging - module Write + class Write module Substitute def self.build Substitute::Write.build.tap do |substitute_writer| @@ -10,9 +10,7 @@ def self.build Error = Class.new(RuntimeError) - class Write - include Messaging::Write - + class Write < Messaging::Write attr_accessor :sink def raise_expected_version_error @@ -32,7 +30,6 @@ def call(*args, **keyword_args) end alias :write :call - ## Should never have had an exclamation mark - Scott, Tue Jan 17 2023 def raise_expected_version_error! self.raise_expected_version_error = true nil diff --git a/messaging.gemspec b/messaging.gemspec index 75abc9d..f27eefb 100644 --- a/messaging.gemspec +++ b/messaging.gemspec @@ -2,7 +2,7 @@ Gem::Specification.new do |s| s.name = 'evt-messaging' s.version = '2.7.0.1' - s.summary = 'Common primitives for platform-specific messaging implementations for Eventide' + s.summary = 'Eventide messaging for Postgres' s.description = ' ' s.authors = ['The Eventide Project'] @@ -15,6 +15,9 @@ Gem::Specification.new do |s| s.platform = Gem::Platform::RUBY s.required_ruby_version = '>= 2.4.0' + ## Remove after message store is published w/ pg dependency + s.add_runtime_dependency 'pg' + s.add_runtime_dependency 'evt-message_store' s.add_runtime_dependency 'evt-settings' diff --git a/settings/message_store_postgres.json b/settings/message_store_postgres.json new file mode 100644 index 0000000..96139f8 --- /dev/null +++ b/settings/message_store_postgres.json @@ -0,0 +1,14 @@ +{ + "dbname": "message_store", + "host": "localhost", + "hostaddr": "127.0.0.1", + "port": 5432, + "user": "message_store", + "password": null, + "connect_timeout": null, + "options": null, + "sslmode": null, + "krbsrvname": null, + "gsslib": null, + "service": null +} diff --git a/test/automated/write/batch/no_id.rb b/test/automated/write/batch/no_id.rb new file mode 100644 index 0000000..610ca90 --- /dev/null +++ b/test/automated/write/batch/no_id.rb @@ -0,0 +1,29 @@ +require_relative '../../automated_init' + +context "Write" do + context "Batch" do + context "No Message IDs" do + stream_name = Controls::StreamName.example + + batch, values = Controls::Batch.example(id: :none) + + batch.each do |message| + assert(message.id.nil?) + end + + position = Write.(batch, stream_name) + + context "Individual Events are Written" do + 2.times do |i| + read_message = MessageStore::Get.(stream_name, position: i, batch_size: 1).first + + context "Assigns an ID" do + test "Event #{i + 1}" do + refute(read_message.id.nil?) + end + end + end + end + end + end +end diff --git a/test/automated/write/batch/reply.rb b/test/automated/write/batch/reply.rb index 513b2d3..defd266 100644 --- a/test/automated/write/batch/reply.rb +++ b/test/automated/write/batch/reply.rb @@ -10,7 +10,7 @@ batch = [message_1, message_2] - write = Controls::Write.example + write = Write.new test "Is an error" do assert_raises(Messaging::Write::Error) do diff --git a/test/automated/write/batch/reply_is_an_error.rb b/test/automated/write/batch/reply_is_an_error.rb new file mode 100644 index 0000000..bf8f2d2 --- /dev/null +++ b/test/automated/write/batch/reply_is_an_error.rb @@ -0,0 +1,3 @@ +require_relative '../../automated_init' + +comment "Tested in Messaging generalization library" diff --git a/test/automated/write/batch/write.rb b/test/automated/write/batch/write.rb new file mode 100644 index 0000000..2e30787 --- /dev/null +++ b/test/automated/write/batch/write.rb @@ -0,0 +1,25 @@ +require_relative '../../automated_init' + +context "Write" do + context "Batch" do + stream_name = Controls::StreamName.example + + batch, values = Controls::Batch.example + + last_written_position = Write.(batch, stream_name) + + test "Last written position" do + assert(last_written_position == 1) + end + + context "Individual Events are Written" do + 2.times do |i| + read_message = MessageStore::Get.(stream_name, position: i, batch_size: 1).first + + test "Event #{i + 1}" do + assert(read_message.data[:some_attribute] == values[i]) + end + end + end + end +end diff --git a/test/automated/write/batch/write_initial.rb b/test/automated/write/batch/write_initial.rb new file mode 100644 index 0000000..545748e --- /dev/null +++ b/test/automated/write/batch/write_initial.rb @@ -0,0 +1,42 @@ +require_relative '../../automated_init' + +context "Write" do + context "Batch" do + context "Writing the initial message to a stream that has not been created yet" do + stream_name = Controls::StreamName.example + + batch, values = Controls::Batch.example + + write = Write.build + + write.initial(batch, stream_name) + + context "Individual Events are Written" do + 2.times do |i| + read_message = MessageStore::Get.(stream_name, position: i, batch_size: 1).first + + test "Event #{i + 1}" do + assert(read_message.data[:some_attribute] == values[i]) + end + end + end + end + + context "Writing the initial message to a stream that already exists" do + stream_name = Controls::StreamName.example + + batch = Controls::Batch::Messages.example + + write = Write.build + + message = Controls::Message.example + Write.(message, stream_name) + + test "Is an error" do + assert_raises(MessageStore::ExpectedVersion::Error) do + write.initial(batch, stream_name) + end + end + end + end +end diff --git a/test/automated/write/batch/write_with_reply_stream.rb b/test/automated/write/batch/write_with_reply_stream.rb new file mode 100644 index 0000000..980edbd --- /dev/null +++ b/test/automated/write/batch/write_with_reply_stream.rb @@ -0,0 +1,24 @@ +require_relative '../../automated_init' + +context "Write" do + context "Batch" do + context "With Reply Stream" do + stream_name = Controls::StreamName.example + reply_stream_name = Controls::StreamName.example + + batch, values = Controls::Batch.example + + Write.(batch, stream_name, reply_stream_name: reply_stream_name) + + context "Individual Events are Written" do + 2.times do |i| + read_message = MessageStore::Get.(stream_name, position: i, batch_size: 1).first + + test "Event #{i + 1}" do + assert(read_message.data[:some_attribute] == values[i]) + end + end + end + end + end +end diff --git a/test/automated/write/expected_version.rb b/test/automated/write/expected_version.rb new file mode 100644 index 0000000..0f6d0c2 --- /dev/null +++ b/test/automated/write/expected_version.rb @@ -0,0 +1,29 @@ +require_relative '../automated_init' + +context "Write" do + context "Single Message" do + stream_name = Controls::StreamName.example(category: 'testWrongVersion') + + message_1 = Controls::Message.example + + Write.(message_1, stream_name) + + message_2 = Controls::Message.example + + context "Right Version" do + test "Succeeds" do + refute_raises(MessageStore::ExpectedVersion::Error) do + Write.(message_2, stream_name, expected_version: 0) + end + end + end + + context "Wrong Version" do + test "Fails" do + assert_raises(MessageStore::ExpectedVersion::Error) do + Write.(message_2, stream_name, expected_version: 11) + end + end + end + end +end diff --git a/test/automated/write/message/no_id.rb b/test/automated/write/message/no_id.rb new file mode 100644 index 0000000..687319e --- /dev/null +++ b/test/automated/write/message/no_id.rb @@ -0,0 +1,21 @@ +require_relative '../../automated_init' + +context "Write" do + context "Message" do + context "No ID" do + stream_name = Controls::StreamName.example + + message = Controls::Message.example(id: :none) + + assert(message.id.nil?) + + position = Write.(message, stream_name) + + read_message = MessageStore::Get.(stream_name, position: position, batch_size: 1).first + + test "Assigns an ID" do + refute(read_message.id.nil?) + end + end + end +end diff --git a/test/automated/write/message/reply.rb b/test/automated/write/message/reply.rb new file mode 100644 index 0000000..bdc5f01 --- /dev/null +++ b/test/automated/write/message/reply.rb @@ -0,0 +1,26 @@ +require_relative '../../automated_init' + +context "Write" do + context "Reply" do + context "Message" do + message = Controls::Message.example + + reply_stream_name = Controls::StreamName.example + message.metadata.reply_stream_name = reply_stream_name + + write = Write.build + + position = write.reply(message) + + read_message = MessageStore::Get.(reply_stream_name, position: position, batch_size: 1).first + + test "Writes the message to the reply stream" do + assert(read_message.data == message.to_h) + end + + test "Clears the reply stream from the metadata" do + assert(read_message.metadata[:reply_stream_name].nil?) + end + end + end +end diff --git a/test/automated/write/message/reply_missing_reply_stream_name_error.rb b/test/automated/write/message/reply_missing_reply_stream_name_error.rb new file mode 100644 index 0000000..f4138ea --- /dev/null +++ b/test/automated/write/message/reply_missing_reply_stream_name_error.rb @@ -0,0 +1,20 @@ +require_relative '../../automated_init' + +context "Write" do + context "Reply" do + context "Missing Reply Stream Name" do + message = Controls::Message.example + message.metadata.reply_stream_name = nil + + reply_stream_name = Controls::StreamName.example + + write = Write.build + + test "Is an error" do + assert_raises(Messaging::Write::Error) do + write.reply(message) + end + end + end + end +end diff --git a/test/automated/write/message/write.rb b/test/automated/write/message/write.rb new file mode 100644 index 0000000..6c0814c --- /dev/null +++ b/test/automated/write/message/write.rb @@ -0,0 +1,17 @@ +require_relative '../../automated_init' + +context "Write" do + context "Message" do + stream_name = Controls::StreamName.example + + message = Controls::Message.example + + position = Write.(message, stream_name) + + read_message = MessageStore::Get.(stream_name, position: position, batch_size: 1).first + + test "Writes the message" do + assert(read_message.data == message.to_h) + end + end +end diff --git a/test/automated/write/message/write_initial.rb b/test/automated/write/message/write_initial.rb new file mode 100644 index 0000000..2d731c4 --- /dev/null +++ b/test/automated/write/message/write_initial.rb @@ -0,0 +1,37 @@ +require_relative '../../automated_init' + +context "Write" do + context "Message" do + context "Writing the initial message to a stream that has not been created yet" do + stream_name = Controls::StreamName.example + + message = Controls::Message.example + + write = Write.build + + write.initial(message, stream_name) + + read_message = MessageStore::Get.(stream_name, position: 0, batch_size: 1).first + + test "Writes the message" do + assert(read_message.data == message.to_h) + end + end + + context "Writing the initial message to a stream that already exists" do + stream_name = Controls::StreamName.example + + message = Controls::Message.example + + write = Write.build + + write.(message, stream_name) + + test "Is an error" do + assert_raises(MessageStore::ExpectedVersion::Error) do + write.initial(message, stream_name) + end + end + end + end +end diff --git a/test/automated/write/message/write_with_reply_stream.rb b/test/automated/write/message/write_with_reply_stream.rb new file mode 100644 index 0000000..8a01b1b --- /dev/null +++ b/test/automated/write/message/write_with_reply_stream.rb @@ -0,0 +1,20 @@ +require_relative '../../automated_init' + +context "Write" do + context "Message" do + context "With Reply Stream" do + message = Controls::Message.example + + stream_name = Controls::StreamName.example + reply_stream_name = Controls::StreamName.example + + position = Write.(message, stream_name, reply_stream_name: reply_stream_name) + + read_message = MessageStore::Get.(stream_name, position: position, batch_size: 1).first + + test "Sets the metadata reply stream name" do + assert(read_message.metadata[:reply_stream_name] == reply_stream_name) + end + end + end +end diff --git a/test/automated/write/telemetry/batch/write.rb b/test/automated/write/telemetry/batch/write.rb index a27e844..d38401d 100644 --- a/test/automated/write/telemetry/batch/write.rb +++ b/test/automated/write/telemetry/batch/write.rb @@ -11,7 +11,8 @@ batch = [message_1, message_2] - writer = Controls::Write.example + writer = Write.new + writer.telemetry = Telemetry.build sink = Write.register_telemetry_sink(writer) diff --git a/test/automated/write/telemetry/message/reply.rb b/test/automated/write/telemetry/message/reply.rb index e7137c7..28503d9 100644 --- a/test/automated/write/telemetry/message/reply.rb +++ b/test/automated/write/telemetry/message/reply.rb @@ -8,7 +8,8 @@ reply_stream_name = message.metadata.reply_stream_name - writer = Controls::Write.example + writer = Write.new + writer.telemetry = Telemetry.build sink = Write.register_telemetry_sink(writer) diff --git a/test/automated/write/telemetry/message/write.rb b/test/automated/write/telemetry/message/write.rb index d69b596..47cd28a 100644 --- a/test/automated/write/telemetry/message/write.rb +++ b/test/automated/write/telemetry/message/write.rb @@ -7,7 +7,8 @@ message = Controls::Message.example stream_name = Controls::StreamName.example(category: 'testTelemetryWrite') - writer = Controls::Write.example + writer = Write.new + writer.telemetry = Telemetry.build sink = Write.register_telemetry_sink(writer) diff --git a/test/interactive.rb b/test/interactive.rb new file mode 100644 index 0000000..06883ac --- /dev/null +++ b/test/interactive.rb @@ -0,0 +1,6 @@ +require_relative 'test_init' + +TestBench::Runner.( + 'interactive/**/*.rb', + exclude_pattern: %r{\/_|sketch|(_init\.rb|_tests\.rb)\z} +) or exit 1 diff --git a/test/interactive/consume.rb b/test/interactive/consume.rb new file mode 100644 index 0000000..03f1818 --- /dev/null +++ b/test/interactive/consume.rb @@ -0,0 +1,31 @@ +require_relative 'interactive_init' +require_relative 'controls' + +logger = Log.get('Consume') + +logger.level = :debug + +logger.info "Starting Consumer", tag: :test + +stream_name_file = File.expand_path('stream_name.tmp', File.dirname(__FILE__)) +stream_name = nil +begin + stream_name = File.read(stream_name_file) +rescue + raise "Stream name file is missing (#{stream_name_file}). It's created by the producer script, which must be run concurrently with #{__FILE__}." +end + +logger.info "Stream name: #{stream_name}", tag: :test + +logger.info "Starting reader", tag: :test + +handler = Handler.build + +MessageStore::Read.(stream_name, batch_size: 1, cycle_maximum_milliseconds: 10, cycle_timeout_milliseconds: 2000) do |message_data| + logger.debug(tags: [:test, :data, :message]) { message_data.pretty_inspect } + + message = handler.(message_data) + + logger.debug(tags: [:test, :data, :message]) { "Handled message: #{message.message_type}" } + logger.debug(tags: [:test, :data, :message]) { message.pretty_inspect } +end diff --git a/test/interactive/consume_missing_stream_indefinately.rb b/test/interactive/consume_missing_stream_indefinately.rb new file mode 100644 index 0000000..feab9ca --- /dev/null +++ b/test/interactive/consume_missing_stream_indefinately.rb @@ -0,0 +1,14 @@ +require_relative 'interactive_init' +require_relative 'controls' + +logger = Log.get('Consumer') + +logger.info "Starting Consumer - Read Indefinitely", tag: :test + +stream_name = SecureRandom.hex + +logger.info "Stream name: #{stream_name}", tag: :test + +logger.info "Starting reader", tag: :test + +MessageStore::Read.(stream_name, batch_size: 1, cycle_maximum_milliseconds: 10) {} diff --git a/test/interactive/controls.rb b/test/interactive/controls.rb new file mode 100644 index 0000000..71fc8bf --- /dev/null +++ b/test/interactive/controls.rb @@ -0,0 +1,16 @@ +require_relative 'interactive_init' + +class SomeMessage + include Messaging::Message + + attribute :written_time + attribute :handled_time +end + +class Handler + include Messaging::Handle + + handle SomeMessage do |some_message| + some_message.handled_time = "Handled at: #{Clock::UTC.iso8601(precision: 5)}" + end +end diff --git a/test/interactive/interactive_init.rb b/test/interactive/interactive_init.rb new file mode 100644 index 0000000..c63fc5e --- /dev/null +++ b/test/interactive/interactive_init.rb @@ -0,0 +1,4 @@ +require_relative '../test_init' + +ENV['LOG_LEVEL'] = '_max' +ENV['LOG_TAGS'] = 'test' diff --git a/test/interactive/measure.rb b/test/interactive/measure.rb new file mode 100644 index 0000000..5da7cd3 --- /dev/null +++ b/test/interactive/measure.rb @@ -0,0 +1,70 @@ +require_relative 'interactive_init' +require_relative 'controls' + +producer_logger = Log.get('Produce') +producer_logger.level = :info +producer_logger.info "Starting Producer", tag: :test + +stream_name = Controls::StreamName.example(category: 'testInteractive') +producer_logger.info "Stream name: #{stream_name}", tag: :test + +period = ENV['PERIOD'] +period ||= 0 +period_seconds = Rational(period, 1000) + +producer_count = 0 +producer_start_time = Time.now +1000.times do + message = SomeMessage.build(written_time: "Written at: #{Clock::UTC.iso8601(precision: 5)}") + producer_logger.debug message.pretty_inspect, tags: [:test, :data, :message] + + written_position = Write.(message, stream_name) + producer_count += 1 + producer_logger.debug "Wrote message ##{producer_count} at position: #{written_position}", tags: [:test, :data, :message] + + sleep period_seconds +end + +producer_stop_time = Time.now + + +consumer_logger = Log.get('Consume') +consumer_logger.level = :info +consumer_logger.info "Starting Consumer", tag: :test + +consumer_logger.info "Stream name: #{stream_name}", tag: :test + +consumer_logger.info "Starting reader", tag: :test + +handler = Handler.build + +consumer_count = 0 +consumer_start_time = Time.now +MessageStore::Read.(stream_name) do |message_data| + consumer_logger.debug(tags: [:test, :data, :message]) { message_data.pretty_inspect } + + message = handler.(message_data) + consumer_count += 1 + + consumer_logger.debug(tags: [:test, :data, :message]) { "Handled message: #{message.message_type}" } + consumer_logger.debug(tags: [:test, :data, :message]) { message.pretty_inspect } +end + +consumer_stop_time = Time.now + + +producer_duration = producer_stop_time - producer_start_time +producer_throughput = producer_count / producer_duration + +producer_logger.info "Messages: #{producer_count}", tags: [:test, :data, :message] +producer_logger.info "Duration: #{producer_duration}", tags: [:test, :data, :message] +producer_logger.info "Throughput: #{producer_throughput}", tags: [:test, :data, :message] + + +consumer_duration = consumer_stop_time - consumer_start_time +consumer_throughput = consumer_count / consumer_duration + +consumer_logger.info "Messages: #{consumer_count}", tags: [:test, :data, :message] +consumer_logger.info "Duration: #{consumer_duration}", tags: [:test, :data, :message] +consumer_logger.info "Throughput: #{consumer_throughput}", tags: [:test, :data, :message] + diff --git a/test/interactive/produce.rb b/test/interactive/produce.rb new file mode 100644 index 0000000..77341ce --- /dev/null +++ b/test/interactive/produce.rb @@ -0,0 +1,34 @@ +require_relative 'interactive_init' +require_relative 'controls' + +logger = Log.get('Produce') + +logger.level = :debug + +logger.info "Starting Producer", tag: :test + +stream_name = Controls::StreamName.example(category: 'testInteractive') +logger.info "Stream name: #{stream_name}", tag: :test + +stream_name_file = File.expand_path('stream_name.tmp', File.dirname(__FILE__)) + +File.write(stream_name_file, stream_name) +logger.debug "Wrote stream name file: #{stream_name_file}", tag: :test + +at_exit do + File.unlink stream_name_file +end + +period = ENV['PERIOD'] +period ||= 500 +period_seconds = Rational(period, 1000) + +loop do + message = SomeMessage.build(written_time: "Written at: #{Clock::UTC.iso8601(precision: 5)}") + logger.debug message.pretty_inspect, tags: [:test, :data, :message] + + written_position = Write.(message, stream_name) + logger.debug "Wrote message at position: #{written_position}", tags: [:test, :data, :message] + + sleep period_seconds +end diff --git a/test/test_init.rb b/test/test_init.rb index 7ad6716..c23c23e 100755 --- a/test/test_init.rb +++ b/test/test_init.rb @@ -10,6 +10,7 @@ require 'test_bench'; TestBench.activate require 'securerandom' +require 'pp' require 'messaging/controls' include Messaging