Class: Discobolo::Actor
- Inherits:
-
Object
- Object
- Discobolo::Actor
- Includes:
- Celluloid
- Defined in:
- lib/discobolo/actor.rb
Instance Attribute Summary collapse
-
#queues ⇒ Object
Returns the value of attribute queues.
Instance Method Summary collapse
- #fetch ⇒ Object
-
#initialize(*args) ⇒ Actor
constructor
A new instance of Actor.
Constructor Details
#initialize(*args) ⇒ Actor
Returns a new instance of Actor.
7 8 9 10 11 12 |
# File 'lib/discobolo/actor.rb', line 7 def initialize(*args) args = Hash[*args.flatten] if args.is_a?(Array) Discobolo::Config.logger.info "Initialize actor with: #{args}" @queues = Discobolo::Config.queues async.fetch if args[:fetch] end |
Instance Attribute Details
#queues ⇒ Object
Returns the value of attribute queues.
5 6 7 |
# File 'lib/discobolo/actor.rb', line 5 def queues @queues end |
Instance Method Details
#fetch ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/discobolo/actor.rb', line 14 def fetch client = Discobolo::Config.client Discobolo::Config.logger.info "Listen Disque queues: #{self.queues} with concurrency of #{Discobolo::Config.actor_concurrency} workers" = Discobolo::Config..merge({from: self.queues}) loop do jobs = client.fetch() jobs.to_a.each do |queue, job_id, | Discobolo::Config.logger.info "#{queue} queue: received #{job_id} received #{}" #since we are supervising the actor, let it crash #begin # Claims to be still working with the specified job #client.working(job_id) = JSON.parse() klass = Object.const_get(['class']) instance = klass.new instance.job_id = job_id instance.async.perform_async(*['args']) #rescue => e # Discobolo::Config.logger.error "Terrible error happened #{e}" #end end end end |