Class: Esse::AsyncIndexing::Adapters::Sidekiq
- Defined in:
- lib/esse/async_indexing/adapters/sidekiq.rb
Overview
This is a Sidekiq adapter that converts Esse::AsyncIndexing::Worker object into a sidekiq readable format and then push the jobs into the service.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.coerce_to_worker(payload, **options) ⇒ Esse::AsyncIndexing::Worker
Coerces the raw payload into an instance of Worker.
-
.push(worker) ⇒ Hash
Initializes adapter and push job into the sidekiq service.
Instance Method Summary collapse
-
#initialize(worker) ⇒ Sidekiq
constructor
A new instance of Sidekiq.
-
#push ⇒ Hash
Push sidekiq to the Sidekiq(Redis actually).
Constructor Details
#initialize(worker) ⇒ Sidekiq
Returns a new instance of Sidekiq.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 10 def initialize(worker) @worker = worker @queue = worker..fetch(:queue, "default") @payload = worker.payload.merge( "class" => worker.worker_class, "retry" => worker..fetch(:retry, true), "queue" => @queue ) @payload["created_at"] ||= Time.now.to_f end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 8 def queue @queue end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 8 def worker @worker end |
Class Method Details
.coerce_to_worker(payload, **options) ⇒ Esse::AsyncIndexing::Worker
Coerces the raw payload into an instance of Worker
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 26 def self.coerce_to_worker(payload, **) raise(Error, "invalid payload") unless payload.is_a?(Hash) raise(Error, "invalid payload") unless payload["class"].is_a?(String) [:retry] ||= payload["retry"] if payload.key?("retry") [:queue] ||= payload["queue"] if payload.key?("queue") Esse::AsyncIndexing.worker(payload["class"], **, service: :sidekiq).tap do |worker| worker.with_args(*Array(payload["args"])) if payload.key?("args") worker.with_job_jid(payload["jid"]) if payload.key?("jid") worker.created_at(payload["created_at"]) if payload.key?("created_at") worker.enqueued_at(payload["enqueued_at"]) if payload.key?("enqueued_at") worker.at(payload["at"]) if payload.key?("at") end end |
.push(worker) ⇒ Hash
Initializes adapter and push job into the sidekiq service
47 48 49 |
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 47 def self.push(worker) new(worker).push end |
Instance Method Details
#push ⇒ Hash
Push sidekiq to the Sidekiq(Redis actually).
* If job has the 'at' key. Then schedule it
* Otherwise enqueue for immediate execution
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/esse/async_indexing/adapters/sidekiq.rb', line 56 def push normalize_before_push # Optimization to enqueue something now that is scheduled to go out now or in the past if ( = @payload.delete("at")) && ( > Time.now.to_f) Esse.config.async_indexing.sidekiq.redis_pool.with do |redis| redis.zadd(scheduled_queue_name, .to_f.to_s, to_json(@payload)) end else Esse.config.async_indexing.sidekiq.redis_pool.with do |redis| redis.lpush(immediate_queue_name, to_json(@payload)) end end @payload end |