diff --git a/lib/que/unique.rb b/lib/que/unique.rb index 92c6f93..96cbe13 100644 --- a/lib/que/unique.rb +++ b/lib/que/unique.rb @@ -14,7 +14,7 @@ module Unique included do singleton_class.class_eval do - def enqueue_before_unique(*args) + def enqueue_before_unique(*args, **kwargs) thread_local_hash = Thread.current[Que::Unique::THREAD_LOCAL_KEY] unless thread_local_hash raise "UniqueQueJob #{self} being scheduled outside a transaction" @@ -22,15 +22,17 @@ def enqueue_before_unique(*args) # Once the args are canonicalised, we convert it to a JSON string to match against. canonicalised_args = args.map { |arg| Que::Unique.canonicalise_que_unique_arg(arg) } - args_key = { self => canonicalised_args }.to_json + canonicalised_kwargs = Que::Unique.canonicalise_que_unique_arg(kwargs) + cache_key = { self => [canonicalised_args, canonicalised_kwargs] }.to_json + # If this is already known then don't enqueue it again. Otherwise, add it to the last # element of the array. - if thread_local_hash.key?(args_key) - ::Rails.logger.debug "Que::Unique - #{self} - Already scheduled: #{args_key}" + if thread_local_hash.key?(cache_key) + ::Rails.logger.debug "Que::Unique - #{self} - Already scheduled: #{cache_key}" else - ::Rails.logger.debug "Que::Unique - #{self} - Enqueuing #{args_key}" - thread_local_hash[args_key] = true - enqueue_after_unique(*canonicalised_args) + ::Rails.logger.debug "Que::Unique - #{self} - Enqueuing #{cache_key}" + thread_local_hash[cache_key] = true + enqueue_after_unique(*args, **kwargs) end end diff --git a/spec/que/unique_spec.rb b/spec/que/unique_spec.rb index d31a5e8..7fce12e 100644 --- a/spec/que/unique_spec.rb +++ b/spec/que/unique_spec.rb @@ -18,6 +18,20 @@ def que_job_count select_jobs.to_a.count end + def que_version + # The constant holding Que's version was renamed in Que 1 + Gem::Version.new(defined?(::Que::Version) ? ::Que::Version : ::Que::VERSION) + end + + def run_at_kwargs(run_at) + job_options = { run_at: run_at } + + # Que 1.2. introduced a separate job_options kwarg + return job_options if que_version < Gem::Version.new("1.2") + + { job_options: job_options } + end + context "when checking the thread locals" do after(:each) do expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq({}) @@ -33,26 +47,26 @@ def que_job_count it "has the right thread locals during nested transactions" do ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz) - TestUniqueJob.enqueue("qux", bar: :bob) + TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) + TestUniqueJob.enqueue("qux", { bar: :bob }, my: :kwarg) expected = { - { TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true, - { TestUniqueJob => ["qux", { bar: :bob }] }.to_json => true, + { TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true, + { TestUniqueJob => [["qux", { bar: :bob }], { my: :kwarg }] }.to_json => true, } expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected) expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(1) expect(que_job_count).to eq(2) expected_inner = { - { TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true, - { TestUniqueJob => ["qux", { bar: :bob }] }.to_json => true, - { TestUniqueJob => ["bip", { bar: :baz }] }.to_json => true, + { TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true, + { TestUniqueJob => [["qux", { bar: :bob }], { my: :kwarg }] }.to_json => true, + { TestUniqueJob => [["bip", { bar: :baz }], { my: :kwarg }] }.to_json => true, } ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz) # Should be ignored - TestUniqueJob.enqueue("bip", bar: :baz) # Should be added + TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) # Should be ignored + TestUniqueJob.enqueue("bip", { bar: :baz }, my: :kwarg) # Should be added expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected_inner) expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(2) expect(que_job_count).to eq(3) @@ -70,23 +84,23 @@ def que_job_count it "has the right thread locals when a rollback occurs" do ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz) + TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) expected_outer = { - { TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true, + { TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true, } expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected_outer) expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(1) expect(que_job_count).to eq(1) expected_inner = { - { TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true, - { TestUniqueJob => ["bip", { bar: :baz }] }.to_json => true, + { TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true, + { TestUniqueJob => [["bip", { bar: :baz }], { my: :kwarg }] }.to_json => true, } expect do ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz) # Should be ignored - TestUniqueJob.enqueue("bip", bar: :baz) # Should be added + TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) # Should be ignored + TestUniqueJob.enqueue("bip", { bar: :baz }, my: :kwarg) # Should be added expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected_inner) expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(2) @@ -111,36 +125,51 @@ def que_job_count it "enqueues multiple of the same as 1" do ActiveRecord::Base.transaction do - 3.times { TestUniqueJob.enqueue("foo", bar: :baz) } + 3.times { TestUniqueJob.enqueue("foo", { bar: :baz }) } end expect(que_job_count).to eq(1) end it "enqueues differently ordered hashes as 1" do ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz, foo: :qux) - TestUniqueJob.enqueue("foo", foo: :qux, bar: :baz) + TestUniqueJob.enqueue("foo", { bar: :baz, foo: :qux }, my: :kwarg, another: :kwarg) + TestUniqueJob.enqueue("foo", { foo: :qux, bar: :baz }, another: :kwarg, my: :kwarg) expected = { - { TestUniqueJob => ["foo", { bar: :baz, foo: :qux }] }.to_json => true, + { + TestUniqueJob => [["foo", { bar: :baz, foo: :qux }], { another: :kwarg, my: :kwarg }] + }.to_json => true, } expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected) end expect(que_job_count).to eq(1) end + it "enqueues jobs with the same args that run_at different times as different jobs" do + ActiveRecord::Base.transaction do + run_at = Time.current + 1.hour + + TestUniqueJob.enqueue("foo", { foo: :qux, bar: :baz }, **run_at_kwargs(run_at), my: :kwarg) + TestUniqueJob.enqueue("foo", { foo: :qux, bar: :baz }, my: :kwarg, **run_at_kwargs(run_at + 5.minutes)) + end + + run_ats = select_jobs.to_a.map { |h| h.fetch("run_at") } + expect(run_ats.size).to eq(2) + expect(run_ats.uniq.size).to eq(2) + end + it "enqueues different strings as different calls" do ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz) - TestUniqueJob.enqueue("qux", bar: :baz) + TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) + TestUniqueJob.enqueue("qux", { bar: :baz }, my: :kwarg) end expect(que_job_count).to eq(2) end it "enqueues different hashes as different calls" do ActiveRecord::Base.transaction do - TestUniqueJob.enqueue("foo", bar: :baz) - TestUniqueJob.enqueue("foo", qux: :baz) - TestUniqueJob.enqueue("foo", bar: :qux) + TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) + TestUniqueJob.enqueue("foo", { qux: :baz }, my: :kwarg) + TestUniqueJob.enqueue("foo", { bar: :qux }, my: :kwarg) end expect(que_job_count).to eq(3) end @@ -149,7 +178,7 @@ def que_job_count ActiveRecord::Base.transaction do TestUniqueJob.enqueue("Test string") expected = { - { TestUniqueJob => ["Test string"] }.to_json => true, + { TestUniqueJob => [["Test string"], {}] }.to_json => true, } expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected) end