Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions lib/que/unique.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@ module Unique

included do
singleton_class.class_eval do
def enqueue_before_unique(*args)
def enqueue_before_unique(*args, **kwargs)
thread_local_hash = Thread.current[Que::Unique::THREAD_LOCAL_KEY]
unless thread_local_hash
raise "UniqueQueJob #{self} being scheduled outside a transaction"
end

# Once the args are canonicalised, we convert it to a JSON string to match against.
canonicalised_args = args.map { |arg| Que::Unique.canonicalise_que_unique_arg(arg) }
args_key = { self => canonicalised_args }.to_json
canonicalised_kwargs = Que::Unique.canonicalise_que_unique_arg(kwargs)
cache_key = { self => [canonicalised_args, canonicalised_kwargs] }.to_json

# If this is already known then don't enqueue it again. Otherwise, add it to the last
# element of the array.
if thread_local_hash.key?(args_key)
::Rails.logger.debug "Que::Unique - #{self} - Already scheduled: #{args_key}"
if thread_local_hash.key?(cache_key)
::Rails.logger.debug "Que::Unique - #{self} - Already scheduled: #{cache_key}"
else
::Rails.logger.debug "Que::Unique - #{self} - Enqueuing #{args_key}"
thread_local_hash[args_key] = true
enqueue_after_unique(*canonicalised_args)
::Rails.logger.debug "Que::Unique - #{self} - Enqueuing #{cache_key}"
thread_local_hash[cache_key] = true
enqueue_after_unique(*args, **kwargs)
end
end

Expand Down
79 changes: 54 additions & 25 deletions spec/que/unique_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ def que_job_count
select_jobs.to_a.count
end

def que_version
# The constant holding Que's version was renamed in Que 1
Gem::Version.new(defined?(::Que::Version) ? ::Que::Version : ::Que::VERSION)
end

def run_at_kwargs(run_at)
job_options = { run_at: run_at }

# Que 1.2. introduced a separate job_options kwarg
return job_options if que_version < Gem::Version.new("1.2")

{ job_options: job_options }
end

context "when checking the thread locals" do
after(:each) do
expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq({})
Expand All @@ -33,26 +47,26 @@ def que_job_count

it "has the right thread locals during nested transactions" do
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz)
TestUniqueJob.enqueue("qux", bar: :bob)
TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg)
TestUniqueJob.enqueue("qux", { bar: :bob }, my: :kwarg)

expected = {
{ TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true,
{ TestUniqueJob => ["qux", { bar: :bob }] }.to_json => true,
{ TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true,
{ TestUniqueJob => [["qux", { bar: :bob }], { my: :kwarg }] }.to_json => true,
}
expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected)
expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(1)
expect(que_job_count).to eq(2)

expected_inner = {
{ TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true,
{ TestUniqueJob => ["qux", { bar: :bob }] }.to_json => true,
{ TestUniqueJob => ["bip", { bar: :baz }] }.to_json => true,
{ TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true,
{ TestUniqueJob => [["qux", { bar: :bob }], { my: :kwarg }] }.to_json => true,
{ TestUniqueJob => [["bip", { bar: :baz }], { my: :kwarg }] }.to_json => true,
}

ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz) # Should be ignored
TestUniqueJob.enqueue("bip", bar: :baz) # Should be added
TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) # Should be ignored
TestUniqueJob.enqueue("bip", { bar: :baz }, my: :kwarg) # Should be added
expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected_inner)
expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(2)
expect(que_job_count).to eq(3)
Expand All @@ -70,23 +84,23 @@ def que_job_count

it "has the right thread locals when a rollback occurs" do
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz)
TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg)
expected_outer = {
{ TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true,
{ TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true,
}
expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected_outer)
expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(1)
expect(que_job_count).to eq(1)

expected_inner = {
{ TestUniqueJob => ["foo", { bar: :baz }] }.to_json => true,
{ TestUniqueJob => ["bip", { bar: :baz }] }.to_json => true,
{ TestUniqueJob => [["foo", { bar: :baz }], { my: :kwarg }] }.to_json => true,
{ TestUniqueJob => [["bip", { bar: :baz }], { my: :kwarg }] }.to_json => true,
}

expect do
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz) # Should be ignored
TestUniqueJob.enqueue("bip", bar: :baz) # Should be added
TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg) # Should be ignored
TestUniqueJob.enqueue("bip", { bar: :baz }, my: :kwarg) # Should be added

expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected_inner)
expect(Thread.current[Que::Unique::THREAD_LOCAL_DEPTH_KEY]).to eq(2)
Expand All @@ -111,36 +125,51 @@ def que_job_count

it "enqueues multiple of the same as 1" do
ActiveRecord::Base.transaction do
3.times { TestUniqueJob.enqueue("foo", bar: :baz) }
3.times { TestUniqueJob.enqueue("foo", { bar: :baz }) }
end
expect(que_job_count).to eq(1)
end

it "enqueues differently ordered hashes as 1" do
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz, foo: :qux)
TestUniqueJob.enqueue("foo", foo: :qux, bar: :baz)
TestUniqueJob.enqueue("foo", { bar: :baz, foo: :qux }, my: :kwarg, another: :kwarg)
TestUniqueJob.enqueue("foo", { foo: :qux, bar: :baz }, another: :kwarg, my: :kwarg)
expected = {
{ TestUniqueJob => ["foo", { bar: :baz, foo: :qux }] }.to_json => true,
{
TestUniqueJob => [["foo", { bar: :baz, foo: :qux }], { another: :kwarg, my: :kwarg }]
}.to_json => true,
}
expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected)
end
expect(que_job_count).to eq(1)
end

it "enqueues jobs with the same args that run_at different times as different jobs" do
ActiveRecord::Base.transaction do
run_at = Time.current + 1.hour

TestUniqueJob.enqueue("foo", { foo: :qux, bar: :baz }, **run_at_kwargs(run_at), my: :kwarg)
TestUniqueJob.enqueue("foo", { foo: :qux, bar: :baz }, my: :kwarg, **run_at_kwargs(run_at + 5.minutes))
end

run_ats = select_jobs.to_a.map { |h| h.fetch("run_at") }
expect(run_ats.size).to eq(2)
expect(run_ats.uniq.size).to eq(2)
end

it "enqueues different strings as different calls" do
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz)
TestUniqueJob.enqueue("qux", bar: :baz)
TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg)
TestUniqueJob.enqueue("qux", { bar: :baz }, my: :kwarg)
end
expect(que_job_count).to eq(2)
end

it "enqueues different hashes as different calls" do
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("foo", bar: :baz)
TestUniqueJob.enqueue("foo", qux: :baz)
TestUniqueJob.enqueue("foo", bar: :qux)
TestUniqueJob.enqueue("foo", { bar: :baz }, my: :kwarg)
TestUniqueJob.enqueue("foo", { qux: :baz }, my: :kwarg)
TestUniqueJob.enqueue("foo", { bar: :qux }, my: :kwarg)
end
expect(que_job_count).to eq(3)
end
Expand All @@ -149,7 +178,7 @@ def que_job_count
ActiveRecord::Base.transaction do
TestUniqueJob.enqueue("Test string")
expected = {
{ TestUniqueJob => ["Test string"] }.to_json => true,
{ TestUniqueJob => [["Test string"], {}] }.to_json => true,
}
expect(Thread.current[Que::Unique::THREAD_LOCAL_KEY]).to eq(expected)
end
Expand Down