diff --git a/lib/que/testing.rb b/lib/que/testing.rb index b78b8aa..e5bd645 100644 --- a/lib/que/testing.rb +++ b/lib/que/testing.rb @@ -1,6 +1,6 @@ require "que" require "que/testing/que_ext" -require "que/testing/adapter" +require "que/testing/connection_pool" require "que/testing/version" -Que.adapter = Que::Testing::Adapter.new +Que.pool = Que::Testing::ConnectionPool.new diff --git a/lib/que/testing/adapter.rb b/lib/que/testing/adapter.rb deleted file mode 100644 index 614e042..0000000 --- a/lib/que/testing/adapter.rb +++ /dev/null @@ -1,32 +0,0 @@ -module Que - module Testing - class JobParams < Struct.new(:queue, :priority, :run_at, :job_class, :args) - end - - class Adapter < Que::Adapters::Base - def checkout(&block) - end - - def execute(command, params = []) - return [] unless command == :insert_job - - job = JobParams.new(*params) - klass = class_for(job.job_class) - jobs[klass] << job - params - end - - def wake_worker_after_commit - false - end - - def class_for(str) - str.split('::').reduce(Object, &:const_get) - end - - def jobs - @jobs ||= Hash.new { |h,k| h[k] = [] } - end - end - end -end diff --git a/lib/que/testing/connection_pool.rb b/lib/que/testing/connection_pool.rb new file mode 100644 index 0000000..cc05502 --- /dev/null +++ b/lib/que/testing/connection_pool.rb @@ -0,0 +1,49 @@ +require_relative "job_params" + +module Que + module Testing + class ConnectionPool < Que::ConnectionPool + def checkout + yield + end + + def in_transaction? + true + end + + def execute(command, params = []) + if command == :insert_job + insert_job(params) + + params + elsif command == :bulk_insert_jobs + queue, priority, run_at, job_class, args_and_kwargs, data = params + + JSON.parse(args_and_kwargs).each do |hash| + args, kwargs = hash.values_at("args", "kwargs").map { |x| JSON.dump(x) } + + insert_job([queue, priority, run_at, job_class, args, kwargs, data]) + end + + params + else + [] + end + end + + private def insert_job(params) + job = JobParams.new(*params) + klass = class_for(job.job_class) + jobs[klass] << job + end + + private def class_for(str) + str.split('::').reduce(Object, &:const_get) + end + + def jobs + @jobs ||= Hash.new { |h,k| h[k] = [] } + end + end + end +end diff --git a/lib/que/testing/job_params.rb b/lib/que/testing/job_params.rb new file mode 100644 index 0000000..1b71b32 --- /dev/null +++ b/lib/que/testing/job_params.rb @@ -0,0 +1,30 @@ +module Que + module Testing + if Que::VERSION >= Gem::Version.new("2") + # kwargs added in Que 2.0.0 + class JobParams < Struct.new(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data) + def args + Que.deserialize_json(super) + end + + def kwargs + Que.deserialize_json(super) + end + + def data + Que.deserialize_json(super) + end + end + else + class JobParams < Struct.new(:queue, :priority, :run_at, :job_class, :args, :data) + def args + Que.deserialize_json(super) + end + + def data + Que.deserialize_json(super) + end + end + end + end +end diff --git a/lib/que/testing/que_ext.rb b/lib/que/testing/que_ext.rb index 0939f8c..352decf 100644 --- a/lib/que/testing/que_ext.rb +++ b/lib/que/testing/que_ext.rb @@ -1,7 +1,7 @@ module Que class Job def self.jobs - Que.adapter.jobs[self] + Que.pool.jobs[self] end end end diff --git a/lib/que/testing/version.rb b/lib/que/testing/version.rb index e2b38d8..3eadba1 100644 --- a/lib/que/testing/version.rb +++ b/lib/que/testing/version.rb @@ -1,5 +1,5 @@ module Que module Testing - VERSION = "0.2.0" + VERSION = "1.0.0" end end diff --git a/que-testing.gemspec b/que-testing.gemspec index f287f0e..64ce23f 100644 --- a/que-testing.gemspec +++ b/que-testing.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] - spec.add_runtime_dependency "que" + spec.add_runtime_dependency "que", ">= 1" spec.add_development_dependency "bundler", "~> 1.6" spec.add_development_dependency "rake", "~> 10.0"