Class: SidekiqUniqueJobs::Middleware::Client::UniqueJobs

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#itemObject (readonly)

Returns the value of attribute item.



7
8
9
# File 'lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb', line 7

def item
  @item
end

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



7
8
9
# File 'lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb', line 7

def redis_pool
  @redis_pool
end

#worker_classObject (readonly)

Returns the value of attribute worker_class.



7
8
9
# File 'lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb', line 7

def worker_class
  @worker_class
end

Instance Method Details

#call(worker_class, item, queue, redis_pool = nil) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb', line 9

def call(worker_class, item, queue, redis_pool = nil)
  @redis_pool = redis_pool
  @worker_class = worker_class_constantize(worker_class)
  @item = item

  if unique_enabled?
    yield if unique?
  else
    yield
  end
end

#unique?Boolean

Returns:

  • (Boolean)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb', line 21

def unique?
  if testing_enabled?
    unique_for_connection?(SidekiqUniqueJobs.redis_mock)
  else
    if redis_pool
      redis_pool.with do |conn|
        unique_for_connection?(conn)
      end
    else
      Sidekiq.redis do |conn|
        unique_for_connection?(conn)
      end
    end
  end
end

#unique_for_connection?(conn) ⇒ Boolean

Returns:

  • (Boolean)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb', line 37

def unique_for_connection?(conn)
  unique = false
  conn.watch(payload_hash)

  if conn.get(payload_hash).to_i == 1 ||
    (conn.get(payload_hash).to_i == 2 && item['at'])
    # if the job is already queued, or is already scheduled and
    # we're trying to schedule again, abort
    conn.unwatch
  else
    # if the job was previously scheduled and is now being queued,
    # or we've never seen it before
    expires_at = unique_job_expiration || SidekiqUniqueJobs::Config.default_expiration
    expires_at = ((Time.at(item['at']) - Time.now.utc) + expires_at).to_i if item['at']

    unique = conn.multi do
      # set value of 2 for scheduled jobs, 1 for queued jobs.
      conn.setex(payload_hash, expires_at, item['at'] ? 2 : 1)
    end
  end
  unique
end