Class: Legion::Extensions::Actors::Subscription

Inherits:
Object
  • Object
show all
Includes:
Celluloid::IO
Defined in:
lib/legion/extensions/actors/subscription.rb

Instance Method Summary collapse

Constructor Details

#initializeSubscription

Returns a new instance of Subscription.



8
9
10
11
12
# File 'lib/legion/extensions/actors/subscription.rb', line 8

def initialize
  require class_path
  @queue = queue.new
  async.subscribe
end

Instance Method Details

#action(payload) ⇒ Object



19
20
21
# File 'lib/legion/extensions/actors/subscription.rb', line 19

def action(payload)
  Legion::Logging.warn "Payload from default action was #{payload}"
end

#cancelObject



14
15
16
17
# File 'lib/legion/extensions/actors/subscription.rb', line 14

def cancel
  Legion::Logging.debug "Closing subscription to #{@queue.name}"
  @queue.close
end

#subscribe(manual_ack = true) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/legion/extensions/actors/subscription.rb', line 23

def subscribe(manual_ack = true)
  require 'legion/extensions/tasker/runners/task_updater'
  @queue.subscribe(manual_ack: manual_ack) do |delivery_info, , payload|
    begin
      message = Legion::JSON.load(payload)
      Legion::Runner::Runner.new(runner_class, runner_method, message)
      @queue.acknowledge(delivery_info.delivery_tag) if manual_ack
    rescue StandardError => ex
      Legion::Logging.error(ex.message)
      Legion::Logging.error(ex.backtrace)
      @queue.reject(delivery_info.delivery_tag) if manual_ack
    end
  end
end