Class: Discobolo::Actor

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/discobolo/actor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Actor

Returns a new instance of Actor.



7
8
9
10
11
12
# File 'lib/discobolo/actor.rb', line 7

def initialize(*args)
  args = Hash[*args.flatten] if args.is_a?(Array)
  Discobolo::Config.logger.info "Initialize actor with: #{args}"
  @queues = Discobolo::Config.queues
  async.fetch if args[:fetch]
end

Instance Attribute Details

#queuesObject

Returns the value of attribute queues.



5
6
7
# File 'lib/discobolo/actor.rb', line 5

def queues
  @queues
end

Instance Method Details

#fetchObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/discobolo/actor.rb', line 14

def fetch
  client = Discobolo::Config.client
  Discobolo::Config.logger.info "Listen Disque queues: #{self.queues} with concurrency of #{Discobolo::Config.actor_concurrency} workers"
  options = Discobolo::Config.fetch_options.merge({from: self.queues})
  loop do
    jobs = client.fetch(options)
    jobs.to_a.each do |queue, job_id, options|
      Discobolo::Config.logger.info "#{queue} queue: received #{job_id} received #{options}"
      
      #since we are supervising the actor, let it crash
      #begin
        # Claims to be still working with the specified job
        #client.working(job_id)

        options = JSON.parse(options)
        klass = Object.const_get(options['class'])
        instance = klass.new
        instance.job_id = job_id
        instance.async.perform_async(*options['args'])

      #rescue => e
      # Discobolo::Config.logger.error "Terrible error happened #{e}"
      #end 

    end
  end
end