Class: Esse::AsyncIndexing::Adapters::Sidekiq

Inherits:
Adapter
  • Object
show all
Defined in:
lib/esse/async_indexing/adapters/sidekiq.rb

Overview

This is a Sidekiq adapter that converts Esse::AsyncIndexing::Worker object into a sidekiq readable format and then push the jobs into the service.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Sidekiq

Returns a new instance of Sidekiq.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 10

def initialize(worker)
  @worker = worker
  @queue = worker.options.fetch(:queue, "default")

  @payload = worker.payload.merge(
    "class" => worker.worker_class,
    "retry" => worker.options.fetch(:retry, true),
    "queue" => @queue
  )
  @payload["created_at"] ||= Time.now.to_f
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 8

def queue
  @queue
end

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 8

def worker
  @worker
end

Class Method Details

.coerce_to_worker(payload, **options) ⇒ Esse::AsyncIndexing::Worker

Coerces the raw payload into an instance of Worker

Parameters:

  • payload (Hash)

    The job as json from redis

Returns:

Raises:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 26

def self.coerce_to_worker(payload, **options)
  raise(Error, "invalid payload") unless payload.is_a?(Hash)
  raise(Error, "invalid payload") unless payload["class"].is_a?(String)

  options[:retry] ||= payload["retry"] if payload.key?("retry")
  options[:queue] ||= payload["queue"] if payload.key?("queue")

  Esse::AsyncIndexing.worker(payload["class"], **options, service: :sidekiq).tap do |worker|
    worker.with_args(*Array(payload["args"])) if payload.key?("args")
    worker.with_job_jid(payload["jid"]) if payload.key?("jid")
    worker.created_at(payload["created_at"]) if payload.key?("created_at")
    worker.enqueued_at(payload["enqueued_at"]) if payload.key?("enqueued_at")
    worker.at(payload["at"]) if payload.key?("at")
  end
end

.push(worker) ⇒ Hash

Initializes adapter and push job into the sidekiq service

Parameters:

Returns:

  • (Hash)

    Job payload

See Also:



47
48
49
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 47

def self.push(worker)
  new(worker).push
end

Instance Method Details

#pushHash

Push sidekiq to the Sidekiq(Redis actually).

* If job has the 'at' key. Then schedule it
* Otherwise enqueue for immediate execution

Returns:

  • (Hash)

    Payload that was sent to redis



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 56

def push
  normalize_before_push
  # Optimization to enqueue something now that is scheduled to go out now or in the past
  if (timestamp = @payload.delete("at")) && (timestamp > Time.now.to_f)
    Esse.config.async_indexing.sidekiq.redis_pool.with do |redis|
      redis.zadd(scheduled_queue_name, timestamp.to_f.to_s, to_json(@payload))
    end
  else
    Esse.config.async_indexing.sidekiq.redis_pool.with do |redis|
      redis.lpush(immediate_queue_name, to_json(@payload))
    end
  end
  @payload
end