Class: Legion::Extensions::Actors::Subscription

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async, Base, Helpers::Transport
Defined in:
lib/legion/extensions/actors/subscription.rb

Instance Method Summary collapse

Methods included from Helpers::Transport

#build_default_exchange, #default_exchange, #exchanges, #messages, #queues, #transport_class, #transport_path

Methods included from Helpers::Base

#actor_class, #actor_const, #actor_name, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #normalize, #runner_class, #runner_const, #runner_name, #to_dotted_hash

Methods included from Base

#args, #check_subtask?, #enabled?, #function, #generate_task?, #manual, #runner, #use_runner?

Methods included from Helpers::Lex

#default_settings, #function_desc, #function_example, #function_options, #function_set, included, #runner_desc

Methods included from Helpers::Logger

#handle_exception, #log

Methods included from Helpers::Core

#find_setting, #settings

Constructor Details

#initialize(**_options) ⇒ Subscription

Returns a new instance of Subscription.



12
13
14
15
16
17
18
19
# File 'lib/legion/extensions/actors/subscription.rb', line 12

def initialize(**_options)
  super()
  @queue = queue.new
  @queue.channel.prefetch(prefetch) if defined? prefetch
rescue StandardError => e
  log.fatal e.message
  log.fatal e.backtrace
end

Instance Method Details

#blockObject



48
49
50
# File 'lib/legion/extensions/actors/subscription.rb', line 48

def block
  false
end

#cancelObject



39
40
41
42
43
44
45
46
# File 'lib/legion/extensions/actors/subscription.rb', line 39

def cancel
  return true unless @queue.channel.active

  log.debug "Closing subscription to #{@queue.name}"
  @consumer.cancel
  @queue.channel.close
  true
end

#consumersObject



52
53
54
# File 'lib/legion/extensions/actors/subscription.rb', line 52

def consumers
  1
end

#create_queueObject



21
22
23
24
25
26
27
28
# File 'lib/legion/extensions/actors/subscription.rb', line 21

def create_queue
  queues.const_set(actor_const, Class.new(Legion::Transport::Queue))
  exchange_object = default_exchange.new
  queue_object = Kernel.const_get(queue_string).new

  queue_object.bind(exchange_object, routing_key: actor_name)
  queue_object.bind(exchange_object, routing_key: "#{lex_name}.#{actor_name}.#")
end

#delay_startObject



60
61
62
# File 'lib/legion/extensions/actors/subscription.rb', line 60

def delay_start
  0
end

#find_function(message = {}) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/legion/extensions/actors/subscription.rb', line 96

def find_function(message = {})
  return runner_function if actor_class.instance_methods(false).include?(:runner_function)
  return function if actor_class.instance_methods(false).include?(:function)
  return action if actor_class.instance_methods(false).include?(:action)
  return message[:function] if message.key? :function

  function
end

#include_metadata_in_message?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/legion/extensions/actors/subscription.rb', line 64

def 
  true
end

#manual_ackObject



56
57
58
# File 'lib/legion/extensions/actors/subscription.rb', line 56

def manual_ack
  true
end

#process_message(message, metadata, delivery_info) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/legion/extensions/actors/subscription.rb', line 68

def process_message(message, , delivery_info)
  payload = if .content_encoding && .content_encoding == 'encrypted/cs'
              Legion::Crypt.decrypt(message, .headers['iv'])
            elsif .content_encoding && .content_encoding == 'encrypted/pk'
              Legion::Crypt.decrypt_from_keypair(.headers[:public_key], message)
            else
              message
            end

  message = if .content_type == 'application/json'
              Legion::JSON.load(payload)
            else
              { value: payload }
            end
  if 
    message = message.merge(.headers.transform_keys(&:to_sym)) unless .headers.nil?
    message[:routing_key] = if Legion::Transport::TYPE == 'march_hare'
                              .routing_key
                            else
                              delivery_info[:routing_key]
                            end
  end

  message[:timestamp] = (message[:timestamp_in_ms] / 1000).round if message.key?(:timestamp_in_ms) && !message.key?(:timestamp)
  message[:datetime] = Time.at(message[:timestamp].to_i).to_datetime.to_s if message.key?(:timestamp)
  message
end

#queueObject



30
31
32
33
# File 'lib/legion/extensions/actors/subscription.rb', line 30

def queue
  create_queue unless queues.const_defined?(actor_const)
  Kernel.const_get queue_string
end

#queue_stringObject



35
36
37
# File 'lib/legion/extensions/actors/subscription.rb', line 35

def queue_string
  @queue_string ||= "#{queues}::#{actor_const}"
end

#subscribeObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/legion/extensions/actors/subscription.rb', line 105

def subscribe
  sleep(delay_start)
  consumer_tag = "#{Legion::Settings[:client][:name]}_#{lex_name}_#{runner_name}_#{Thread.current.object_id}"
  on_cancellation = block { cancel }

  @consumer = @queue.subscribe(manual_ack: manual_ack, block: false, consumer_tag: consumer_tag, on_cancellation: on_cancellation) do |*rmq_message|
    payload = rmq_message.pop
     = rmq_message.last
    delivery_info = rmq_message.first

    message = process_message(payload, , delivery_info)
    if use_runner?
      Legion::Runner.run(**message,
                         runner_class:  runner_class,
                         function:      find_function(message),
                         check_subtask: check_subtask?,
                         generate_task: generate_task?)
    else
      runner_class.send(find_function(message), **message)
    end
    @queue.acknowledge(delivery_info.delivery_tag) if manual_ack

    cancel if Legion::Settings[:client][:shutting_down]
  rescue StandardError => e
    Legion::Logging.error e.message
    Legion::Logging.error e.backtrace
    Legion::Logging.error message
    Legion::Logging.error function
    @queue.reject(delivery_info.delivery_tag) if manual_ack
  end
end