Class: Legion::Extensions::Actors::Subscription
- Inherits:
-
Object
- Object
- Legion::Extensions::Actors::Subscription
- Includes:
- Celluloid::IO
- Defined in:
- lib/legion/extensions/actors/subscription.rb
Instance Method Summary collapse
- #action(payload) ⇒ Object
- #cancel ⇒ Object
-
#initialize ⇒ Subscription
constructor
A new instance of Subscription.
- #subscribe(manual_ack = true) ⇒ Object
Constructor Details
#initialize ⇒ Subscription
Returns a new instance of Subscription.
8 9 10 11 12 |
# File 'lib/legion/extensions/actors/subscription.rb', line 8 def initialize require class_path @queue = queue.new async.subscribe end |
Instance Method Details
#action(payload) ⇒ Object
19 20 21 |
# File 'lib/legion/extensions/actors/subscription.rb', line 19 def action(payload) Legion::Logging.warn "Payload from default action was #{payload}" end |
#cancel ⇒ Object
14 15 16 17 |
# File 'lib/legion/extensions/actors/subscription.rb', line 14 def cancel Legion::Logging.debug "Closing subscription to #{@queue.name}" @queue.close end |
#subscribe(manual_ack = true) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/legion/extensions/actors/subscription.rb', line 23 def subscribe(manual_ack = true) require 'legion/extensions/tasker/runners/task_updater' @queue.subscribe(manual_ack: manual_ack) do |delivery_info, , payload| begin = Legion::JSON.load(payload) Legion::Runner::Runner.new(runner_class, runner_method, ) @queue.acknowledge(delivery_info.delivery_tag) if manual_ack rescue StandardError => ex Legion::Logging.error(ex.) Legion::Logging.error(ex.backtrace) @queue.reject(delivery_info.delivery_tag) if manual_ack end end end |