Class: Moleculer::Broker
- Inherits:
-
Object
- Object
- Moleculer::Broker
- Extended by:
- Forwardable
- Includes:
- Support
- Defined in:
- lib/moleculer/broker.rb
Overview
The Broker is the primary component of Moleculer. It handles action, events, and communication with remote nodes. Only a single broker should be run for any given process, and it is automatically started when Moleculer::start or Moleculer::run is called.
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#call(action_name, params, meta: {}, node_id: nil, timeout: Moleculer.config.timeout) ⇒ Hash
Call the provided action.
- #emit(event_name, payload) ⇒ Object
-
#initialize(config) ⇒ Broker
constructor
A new instance of Broker.
- #local_node ⇒ Object
- #process_event(packet) ⇒ Object
-
#process_message(channel, message) ⇒ Object
Processes an incoming message and passes it to the appropriate channel for handling.
- #process_request(packet) ⇒ Object
- #process_response(packet) ⇒ Object
-
#rescue_action ⇒ Proc
Returns the rescue_action if defined on the configuration.
-
#rescue_event ⇒ Proc
Returns the rescue_event if defined on the configuration.
- #run ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #wait_for_services(*services) ⇒ Object
Constructor Details
#initialize(config) ⇒ Broker
Returns a new instance of Broker.
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/moleculer/broker.rb', line 23 def initialize(config) @config = config @config.broker = self @logger = @config.logger.get_child("[BROKER]") @registry = Registry.new(@config) @transporter = Transporters.for(@config.transporter).new(@config) @contexts = Concurrent::Map.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
17 18 19 |
# File 'lib/moleculer/broker.rb', line 17 def config @config end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
17 18 19 |
# File 'lib/moleculer/broker.rb', line 17 def logger @logger end |
Instance Method Details
#call(action_name, params, meta: {}, node_id: nil, timeout: Moleculer.config.timeout) ⇒ Hash
Call the provided action.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/moleculer/broker.rb', line 42 def call(action_name, params, meta: {}, node_id: nil, timeout: Moleculer.config.timeout) action = node_id ? @registry.fetch_action_for_node_id(action_name, node_id) : @registry.fetch_action(action_name) context = Context.new( broker: self, action: action, params: params, meta: , timeout: timeout, ) future = Concurrent::Promises.resolvable_future @contexts[context.id] = { context: context, called_at: Time.now, future: future, } action.execute(context, self) future.value!(context.timeout) end |
#emit(event_name, payload) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/moleculer/broker.rb', line 66 def emit(event_name, payload) @logger.debug("emitting event '#{event_name}'") events = @registry.fetch_events_for_emit(event_name) events.each { |e| e.execute(payload, self) } end |
#local_node ⇒ Object
120 121 122 |
# File 'lib/moleculer/broker.rb', line 120 def local_node @registry.local_node end |
#process_event(packet) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/moleculer/broker.rb', line 140 def process_event(packet) @logger.debug("processing event '#{packet.event}'") events = @registry.fetch_events_for_emit(packet.event) events.each { |e| e.execute(packet.data, self) } rescue StandardError => e config.handle_error(e) end |
#process_message(channel, message) ⇒ Object
Processes an incoming message and passes it to the appropriate channel for handling
129 130 131 132 133 |
# File 'lib/moleculer/broker.rb', line 129 def (channel, ) subscribers[channel] << Packets.for(channel.split(".")[1]).new() if subscribers[channel] rescue StandardError => e config.handle_error(e) end |
#process_request(packet) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/moleculer/broker.rb', line 149 def process_request(packet) @logger.debug "processing request #{packet.id}" action = @registry.fetch_action_for_node_id(packet.action, node_id) node = @registry.fetch_node(packet.sender) context = Context.new( id: packet.id, broker: self, action: action, params: packet.params, meta: packet., timeout: @config.timeout, ) response = action.execute(context, self) publish_res( id: context.id, success: true, data: response, error: {}, meta: context., stream: false, node: node, ) end |
#process_response(packet) ⇒ Object
135 136 137 138 |
# File 'lib/moleculer/broker.rb', line 135 def process_response(packet) context = @contexts.delete(packet.id) context[:future].fulfill(packet.data) end |
#rescue_action ⇒ Proc
Returns the rescue_action if defined on the configuration
178 179 180 |
# File 'lib/moleculer/broker.rb', line 178 def rescue_action config.rescue_action end |
#rescue_event ⇒ Proc
Returns the rescue_event if defined on the configuration
184 185 186 |
# File 'lib/moleculer/broker.rb', line 184 def rescue_event config.rescue_event end |
#run ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/moleculer/broker.rb', line 73 def run self_read, self_write = IO.pipe %w[INT TERM].each do |sig| trap sig do self_write.puts(sig) end end begin start while (readable_io = IO.select([self_read])) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt stop end end |
#start ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/moleculer/broker.rb', line 94 def start @logger.info "starting" @logger.info "using transporter '#{@config.transporter}'" @transporter.start register_local_node start_subscribers publish_discover publish_info start_heartbeat self end |
#stop ⇒ Object
106 107 108 109 110 111 |
# File 'lib/moleculer/broker.rb', line 106 def stop @logger.info "stopping" publish(:disconnect) @transporter.stop exit 0 end |
#wait_for_services(*services) ⇒ Object
113 114 115 116 117 118 |
# File 'lib/moleculer/broker.rb', line 113 def wait_for_services(*services) until (services = @registry.missing_services(*services)) && services.empty? @logger.info "waiting for services '#{services.join(', ')}'" sleep 0.1 end end |