Class: Moleculer::Registry

Inherits:
Object
  • Object
show all
Includes:
Support
Defined in:
lib/moleculer/registry.rb

Overview

The Registry manages the available services on the network

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Registry

Returns a new instance of Registry.

Parameters:

  • config (Moleculer::Config)

    the moleculer configuration



153
154
155
156
157
158
159
160
161
# File 'lib/moleculer/registry.rb', line 153

def initialize(config)
  @config           = config
  @nodes            = NodeList.new(@config.heartbeat_interval)
  @actions          = ActionList.new(@config.heartbeat_interval)
  @events           = EventList.new(@config.heartbeat_interval)
  @services         = Concurrent::Hash.new
  @logger           = @config.logger.get_child("[REGISTRY]")
  @remove_semaphore = Concurrent::Semaphore.new(1)
end

Instance Attribute Details

#local_nodeObject (readonly)

Returns the value of attribute local_node.



149
150
151
# File 'lib/moleculer/registry.rb', line 149

def local_node
  @local_node
end

Instance Method Details

#expire_nodesObject

Looks for nodes that have stopped reporting heartbeats and removes them from the registry



271
272
273
# File 'lib/moleculer/registry.rb', line 271

def expire_nodes
  @nodes.expired_nodes.each { |node| remove_node(node[:node].id, "it expired after 10 minutes") }
end

#fetch_action(action_name) ⇒ Moleculer::Service::Action|Moleculer::RemoteService::Action

Gets the named action from the registry. If a local action exists it will return the local one instead of a remote action.

Parameters:

  • action_name (String)

    the name of the action

Returns:



195
196
197
# File 'lib/moleculer/registry.rb', line 195

def fetch_action(action_name)
  @actions.fetch_action(action_name)
end

#fetch_action_for_node_id(action_name, node_id) ⇒ Moleculer::Service::Action

Gets the named action from the registry for the given node. Raises an error if the node does not exist or the node does not have the specified action.

Parameters:

  • action_name (String)

    the name of the action

  • node_id (String)

    the id of the node from which to get the action

Returns:

Raises:

  • (Moleculer::NodeNotFound)

    raised when the specified node_id was not found

  • (Moleculer::ActionNotFound)

    raised when the specified action was not found on the specified node



245
246
247
248
# File 'lib/moleculer/registry.rb', line 245

def fetch_action_for_node_id(action_name, node_id)
  node = fetch_node(node_id)
  fetch_action_from_node(action_name, node)
end

#fetch_events_for_emit(event_name) ⇒ Array<Moleculer::Service::Event>

Fetches all the events for the given event name that should be used to emit an event. This is load balanced, and will fetch a different node for nodes that duplicate the service/event

Parameters:

  • event_name (String)

    the name of the even to fetch

Returns:



206
207
208
# File 'lib/moleculer/registry.rb', line 206

def fetch_events_for_emit(event_name)
  @events.fetch_events(event_name)
end

#fetch_node(node_id) ⇒ Moleculer::Node

It fetches the given node, and raises an exception if the node does not exist

Parameters:

  • node_id (String)

    the id of the node to fetch

Returns:

Raises:



217
218
219
220
221
# File 'lib/moleculer/registry.rb', line 217

def fetch_node(node_id)
  @nodes.fetch_node(node_id)
rescue KeyError
  raise Errors::NodeNotFound, "The node with the id '#{node_id}' was not found."
end

#missing_services(*services) ⇒ Object



250
251
252
# File 'lib/moleculer/registry.rb', line 250

def missing_services(*services)
  services - @services.keys
end

#register_node(node) ⇒ Moleculer::Node

Registers the node with the registry and updates the action/event handler lists.

Parameters:

Returns:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/moleculer/registry.rb', line 169

def register_node(node)
  return local_node if @local_node && node.id == @local_node.id

  if node.local?
    raise Errors::LocalNodeAlreadyRegistered, "A LOCAL node has already been registered" if @local_node

    @logger.info "registering LOCAL node '#{node.id}'"
    @local_node = node
  end
  @logger.info "registering node #{node.id}" unless node.local?
  @nodes.add_node(node)
  update_actions(node)
  update_events(node)
  # always call this last, as it will immediately cause processes waiting for services to show it as ready even if
  # the actions or events lists have not been updated.
  update_services(node)
  node
end

#remove_node(node_id, reason = nil) ⇒ Object

Removes the node with the given id. Because this must act across multiple indexes this action uses a semaphore to reduce the chance of race conditions.

Parameters:

  • node_id (String)

    the node to remove



259
260
261
262
263
264
265
266
267
# File 'lib/moleculer/registry.rb', line 259

def remove_node(node_id, reason = nil)
  @remove_semaphore.acquire
  @logger.info "removing node '#{node_id}'" unless reason
  @logger.info "removing node '#{node_id}' because #{reason}"
  @nodes.remove_node(node_id)
  @actions.remove_node(node_id)
  @events.remove_node(node_id)
  @remove_semaphore.release
end

#safe_fetch_node(node_id) ⇒ Moleculer::Node

Fetches the given node, and returns nil if the node was not found

Parameters:

  • node_id (String)

    the id of the node to fetch

Returns:



229
230
231
232
233
# File 'lib/moleculer/registry.rb', line 229

def safe_fetch_node(node_id)
  fetch_node(node_id)
rescue Errors::NodeNotFound
  nil
end