Class: Esse::AsyncIndexing::Adapters::Faktory
- Defined in:
- lib/esse/async_indexing/adapters/faktory.rb
Overview
This is a Faktory adapter that converts Esse::AsyncIndexing::Worker object into a faktory readable format and then push the jobs into the service.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.coerce_to_worker(payload, **options) ⇒ Esse::AsyncIndexing::Worker
Coerces the raw payload into an instance of Worker.
-
.push(worker) ⇒ Hash
Initializes adapter and push job into the faktory service.
Instance Method Summary collapse
-
#initialize(worker) ⇒ Faktory
constructor
A new instance of Faktory.
-
#push ⇒ Hash
Push job to Faktory * If job has the ‘at’ key.
Constructor Details
#initialize(worker) ⇒ Faktory
Returns a new instance of Faktory.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/esse/async_indexing/adapters/faktory.rb', line 10 def initialize(worker) @worker = worker @queue = worker..fetch(:queue, "default") @payload = worker.payload.merge( "jobtype" => worker.worker_class, "queue" => @queue, "retry" => parse_retry(worker.[:retry]) ) @payload["created_at"] ||= Time.now.to_f end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/esse/async_indexing/adapters/faktory.rb', line 8 def queue @queue end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/esse/async_indexing/adapters/faktory.rb', line 8 def worker @worker end |
Class Method Details
.coerce_to_worker(payload, **options) ⇒ Esse::AsyncIndexing::Worker
Coerces the raw payload into an instance of Worker
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/esse/async_indexing/adapters/faktory.rb', line 26 def self.coerce_to_worker(payload, **) raise(Error, "invalid payload") unless payload.is_a?(Hash) raise(Error, "invalid payload") unless payload["jobtype"].is_a?(String) [:retry] ||= payload["retry"] if payload.key?("retry") [:queue] ||= payload["queue"] if payload.key?("queue") Esse::AsyncIndexing.worker(payload["jobtype"], **, service: :faktory).tap do |worker| worker.with_args(*Array(payload["args"])) if payload.key?("args") worker.with_job_jid(payload["jid"]) if payload.key?("jid") worker.created_at(payload["created_at"]) if payload.key?("created_at") worker.enqueued_at(payload["enqueued_at"]) if payload.key?("enqueued_at") worker.at(payload["at"]) if payload.key?("at") end end |
.push(worker) ⇒ Hash
Initializes adapter and push job into the faktory service
47 48 49 |
# File 'lib/esse/async_indexing/adapters/faktory.rb', line 47 def self.push(worker) new(worker).push end |
Instance Method Details
#push ⇒ Hash
Push job to Faktory
* If job has the 'at' key. Then schedule it
* Otherwise enqueue for immediate execution
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/esse/async_indexing/adapters/faktory.rb', line 57 def push unless Object.const_defined?(:Faktory) raise Esse::AsyncIndexing::Error, <<~ERR Faktory client for ruby is not loaded. You must install and require https://github.com/contribsys/faktory_worker_ruby. ERR end normalize_before_push pool = Thread.current[:faktory_via_pool] || ::Faktory.server_pool ::Faktory.client_middleware.invoke(@payload, pool) do pool.with do |c| c.push(@payload) end end @payload end |