Class: Moleculer::Broker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Support
Defined in:
lib/moleculer/broker.rb

Overview

The Broker is the primary component of Moleculer. It handles action, events, and communication with remote nodes. Only a single broker should be run for any given process, and it is automatically started when Moleculer::start or Moleculer::run is called.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Broker

Returns a new instance of Broker.

Parameters:

  • config (Moleculer::Config)

    the broker configuration



23
24
25
26
27
28
29
30
31
32
# File 'lib/moleculer/broker.rb', line 23

def initialize(config)
  @config = config

  @config.broker = self

  @logger      = @config.logger.get_child("[BROKER]")
  @registry    = Registry.new(@config)
  @transporter = Transporters.for(@config.transporter).new(@config)
  @contexts    = Concurrent::Map.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



17
18
19
# File 'lib/moleculer/broker.rb', line 17

def config
  @config
end

#loggerObject (readonly)

Returns the value of attribute logger.



17
18
19
# File 'lib/moleculer/broker.rb', line 17

def logger
  @logger
end

Instance Method Details

#call(action_name, params, meta: {}, node_id: nil, timeout: Moleculer.config.timeout) ⇒ Hash

Call the provided action.

Parameters:

  • action_name (String)

    the action to call.

  • params (Hash)

    the params with which to call the action

  • meta (Hash) (defaults to: {})

    the metadata of the request

Returns:

  • (Hash)

    the return result of the action call



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/moleculer/broker.rb', line 42

def call(action_name, params, meta: {}, node_id: nil, timeout: Moleculer.config.timeout)
  action = node_id ? @registry.fetch_action_for_node_id(action_name, node_id) : @registry.fetch_action(action_name)

  context = Context.new(
    broker:  self,
    action:  action,
    params:  params,
    meta:    meta,
    timeout: timeout,
  )

  future = Concurrent::Promises.resolvable_future

  @contexts[context.id] = {
    context:   context,
    called_at: Time.now,
    future:    future,
  }

  action.execute(context, self)

  future.value!(context.timeout)
end

#emit(event_name, payload) ⇒ Object



66
67
68
69
70
71
# File 'lib/moleculer/broker.rb', line 66

def emit(event_name, payload)
  @logger.debug("emitting event '#{event_name}'")
  events = @registry.fetch_events_for_emit(event_name)

  events.each { |e| e.execute(payload, self) }
end

#local_nodeObject



120
121
122
# File 'lib/moleculer/broker.rb', line 120

def local_node
  @registry.local_node
end

#process_event(packet) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/moleculer/broker.rb', line 140

def process_event(packet)
  @logger.debug("processing event '#{packet.event}'")
  events = @registry.fetch_events_for_emit(packet.event)

  events.each { |e| e.execute(packet.data, self) }
rescue StandardError => e
  config.handle_error(e)
end

#process_message(channel, message) ⇒ Object

Processes an incoming message and passes it to the appropriate channel for handling

Parameters:

  • channel (String)

    the channel in which the message came in on

  • message (Hash)

    the raw deserialized message



129
130
131
132
133
# File 'lib/moleculer/broker.rb', line 129

def process_message(channel, message)
  subscribers[channel] << Packets.for(channel.split(".")[1]).new(message) if subscribers[channel]
rescue StandardError => e
  config.handle_error(e)
end

#process_request(packet) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/moleculer/broker.rb', line 149

def process_request(packet)
  @logger.debug "processing request #{packet.id}"
  action = @registry.fetch_action_for_node_id(packet.action, node_id)
  node   = @registry.fetch_node(packet.sender)

  context = Context.new(
    id:      packet.id,
    broker:  self,
    action:  action,
    params:  packet.params,
    meta:    packet.meta,
    timeout: @config.timeout,
  )

  response = action.execute(context, self)

  publish_res(
    id:      context.id,
    success: true,
    data:    response,
    error:   {},
    meta:    context.meta,
    stream:  false,
    node:    node,
  )
end

#process_response(packet) ⇒ Object



135
136
137
138
# File 'lib/moleculer/broker.rb', line 135

def process_response(packet)
  context = @contexts.delete(packet.id)
  context[:future].fulfill(packet.data)
end

#rescue_actionProc

Returns the rescue_action if defined on the configuration

Returns:

  • (Proc)

    returns the rescue_action if defined on the configuration



178
179
180
# File 'lib/moleculer/broker.rb', line 178

def rescue_action
  config.rescue_action
end

#rescue_eventProc

Returns the rescue_event if defined on the configuration

Returns:

  • (Proc)

    returns the rescue_event if defined on the configuration



184
185
186
# File 'lib/moleculer/broker.rb', line 184

def rescue_event
  config.rescue_event
end

#runObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/moleculer/broker.rb', line 73

def run
  self_read, self_write = IO.pipe

  %w[INT TERM].each do |sig|
    trap sig do
      self_write.puts(sig)
    end
  end

  begin
    start

    while (readable_io = IO.select([self_read]))
      signal           = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    stop
  end
end

#startObject



94
95
96
97
98
99
100
101
102
103
104
# File 'lib/moleculer/broker.rb', line 94

def start
  @logger.info "starting"
  @logger.info "using transporter '#{@config.transporter}'"
  @transporter.start
  register_local_node
  start_subscribers
  publish_discover
  publish_info
  start_heartbeat
  self
end

#stopObject



106
107
108
109
110
111
# File 'lib/moleculer/broker.rb', line 106

def stop
  @logger.info "stopping"
  publish(:disconnect)
  @transporter.stop
  exit 0
end

#wait_for_services(*services) ⇒ Object



113
114
115
116
117
118
# File 'lib/moleculer/broker.rb', line 113

def wait_for_services(*services)
  until (services = @registry.missing_services(*services)) && services.empty?
    @logger.info "waiting for services '#{services.join(', ')}'"
    sleep 0.1
  end
end