From dd54ba9949a9dba886c22befeb7690365c5846c0 Mon Sep 17 00:00:00 2001 From: Owen Stephens Date: Thu, 11 Jul 2024 14:56:54 +0100 Subject: [PATCH 1/2] Support Que >= 1 --- lib/que/testing.rb | 4 +-- .../{adapter.rb => connection_pool.rb} | 14 ++------- lib/que/testing/job_params.rb | 30 +++++++++++++++++++ lib/que/testing/que_ext.rb | 2 +- lib/que/testing/version.rb | 2 +- que-testing.gemspec | 2 +- 6 files changed, 38 insertions(+), 16 deletions(-) rename lib/que/testing/{adapter.rb => connection_pool.rb} (65%) create mode 100644 lib/que/testing/job_params.rb diff --git a/lib/que/testing.rb b/lib/que/testing.rb index b78b8aa..e5bd645 100644 --- a/lib/que/testing.rb +++ b/lib/que/testing.rb @@ -1,6 +1,6 @@ require "que" require "que/testing/que_ext" -require "que/testing/adapter" +require "que/testing/connection_pool" require "que/testing/version" -Que.adapter = Que::Testing::Adapter.new +Que.pool = Que::Testing::ConnectionPool.new diff --git a/lib/que/testing/adapter.rb b/lib/que/testing/connection_pool.rb similarity index 65% rename from lib/que/testing/adapter.rb rename to lib/que/testing/connection_pool.rb index 614e042..040139c 100644 --- a/lib/que/testing/adapter.rb +++ b/lib/que/testing/connection_pool.rb @@ -1,12 +1,8 @@ +require_relative "job_params" + module Que module Testing - class JobParams < Struct.new(:queue, :priority, :run_at, :job_class, :args) - end - - class Adapter < Que::Adapters::Base - def checkout(&block) - end - + class ConnectionPool < Que::ConnectionPool def execute(command, params = []) return [] unless command == :insert_job @@ -16,10 +12,6 @@ def execute(command, params = []) params end - def wake_worker_after_commit - false - end - def class_for(str) str.split('::').reduce(Object, &:const_get) end diff --git a/lib/que/testing/job_params.rb b/lib/que/testing/job_params.rb new file mode 100644 index 0000000..1b71b32 --- /dev/null +++ b/lib/que/testing/job_params.rb @@ -0,0 +1,30 @@ +module Que + module Testing + if Que::VERSION >= Gem::Version.new("2") + # kwargs added in Que 2.0.0 + class JobParams < Struct.new(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data) + def args + Que.deserialize_json(super) + end + + def kwargs + Que.deserialize_json(super) + end + + def data + Que.deserialize_json(super) + end + end + else + class JobParams < Struct.new(:queue, :priority, :run_at, :job_class, :args, :data) + def args + Que.deserialize_json(super) + end + + def data + Que.deserialize_json(super) + end + end + end + end +end diff --git a/lib/que/testing/que_ext.rb b/lib/que/testing/que_ext.rb index 0939f8c..352decf 100644 --- a/lib/que/testing/que_ext.rb +++ b/lib/que/testing/que_ext.rb @@ -1,7 +1,7 @@ module Que class Job def self.jobs - Que.adapter.jobs[self] + Que.pool.jobs[self] end end end diff --git a/lib/que/testing/version.rb b/lib/que/testing/version.rb index e2b38d8..3eadba1 100644 --- a/lib/que/testing/version.rb +++ b/lib/que/testing/version.rb @@ -1,5 +1,5 @@ module Que module Testing - VERSION = "0.2.0" + VERSION = "1.0.0" end end diff --git a/que-testing.gemspec b/que-testing.gemspec index f287f0e..64ce23f 100644 --- a/que-testing.gemspec +++ b/que-testing.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] - spec.add_runtime_dependency "que" + spec.add_runtime_dependency "que", ">= 1" spec.add_development_dependency "bundler", "~> 1.6" spec.add_development_dependency "rake", "~> 10.0" From b7879eabf836a02a467b301c42b17b972c828c04 Mon Sep 17 00:00:00 2001 From: Owen Stephens Date: Thu, 27 Nov 2025 13:21:37 +0000 Subject: [PATCH 2/2] Support Que.bulk_enqueue --- lib/que/testing/connection_pool.rb | 31 +++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/que/testing/connection_pool.rb b/lib/que/testing/connection_pool.rb index 040139c..cc05502 100644 --- a/lib/que/testing/connection_pool.rb +++ b/lib/que/testing/connection_pool.rb @@ -3,16 +3,41 @@ module Que module Testing class ConnectionPool < Que::ConnectionPool + def checkout + yield + end + + def in_transaction? + true + end + def execute(command, params = []) - return [] unless command == :insert_job + if command == :insert_job + insert_job(params) + + params + elsif command == :bulk_insert_jobs + queue, priority, run_at, job_class, args_and_kwargs, data = params + + JSON.parse(args_and_kwargs).each do |hash| + args, kwargs = hash.values_at("args", "kwargs").map { |x| JSON.dump(x) } + + insert_job([queue, priority, run_at, job_class, args, kwargs, data]) + end + + params + else + [] + end + end + private def insert_job(params) job = JobParams.new(*params) klass = class_for(job.job_class) jobs[klass] << job - params end - def class_for(str) + private def class_for(str) str.split('::').reduce(Object, &:const_get) end