Class: Moleculer::Registry
- Inherits:
-
Object
- Object
- Moleculer::Registry
- Includes:
- Support
- Defined in:
- lib/moleculer/registry.rb
Overview
The Registry manages the available services on the network
Instance Attribute Summary collapse
-
#local_node ⇒ Object
readonly
Returns the value of attribute local_node.
Instance Method Summary collapse
-
#expire_nodes ⇒ Object
Looks for nodes that have stopped reporting heartbeats and removes them from the registry.
-
#fetch_action(action_name) ⇒ Moleculer::Service::Action|Moleculer::RemoteService::Action
Gets the named action from the registry.
-
#fetch_action_for_node_id(action_name, node_id) ⇒ Moleculer::Service::Action
Gets the named action from the registry for the given node.
-
#fetch_events_for_emit(event_name) ⇒ Array<Moleculer::Service::Event>
Fetches all the events for the given event name that should be used to emit an event.
-
#fetch_node(node_id) ⇒ Moleculer::Node
It fetches the given node, and raises an exception if the node does not exist.
-
#initialize(config) ⇒ Registry
constructor
A new instance of Registry.
- #missing_services(*services) ⇒ Object
-
#register_node(node) ⇒ Moleculer::Node
Registers the node with the registry and updates the action/event handler lists.
-
#remove_node(node_id, reason = nil) ⇒ Object
Removes the node with the given id.
-
#safe_fetch_node(node_id) ⇒ Moleculer::Node
Fetches the given node, and returns nil if the node was not found.
Constructor Details
#initialize(config) ⇒ Registry
Returns a new instance of Registry.
153 154 155 156 157 158 159 160 161 |
# File 'lib/moleculer/registry.rb', line 153 def initialize(config) @config = config @nodes = NodeList.new(@config.heartbeat_interval) @actions = ActionList.new(@config.heartbeat_interval) @events = EventList.new(@config.heartbeat_interval) @services = Concurrent::Hash.new @logger = @config.logger.get_child("[REGISTRY]") @remove_semaphore = Concurrent::Semaphore.new(1) end |
Instance Attribute Details
#local_node ⇒ Object (readonly)
Returns the value of attribute local_node.
149 150 151 |
# File 'lib/moleculer/registry.rb', line 149 def local_node @local_node end |
Instance Method Details
#expire_nodes ⇒ Object
Looks for nodes that have stopped reporting heartbeats and removes them from the registry
271 272 273 |
# File 'lib/moleculer/registry.rb', line 271 def expire_nodes @nodes.expired_nodes.each { |node| remove_node(node[:node].id, "it expired after 10 minutes") } end |
#fetch_action(action_name) ⇒ Moleculer::Service::Action|Moleculer::RemoteService::Action
Gets the named action from the registry. If a local action exists it will return the local one instead of a remote action.
195 196 197 |
# File 'lib/moleculer/registry.rb', line 195 def fetch_action(action_name) @actions.fetch_action(action_name) end |
#fetch_action_for_node_id(action_name, node_id) ⇒ Moleculer::Service::Action
Gets the named action from the registry for the given node. Raises an error if the node does not exist or the node does not have the specified action.
245 246 247 248 |
# File 'lib/moleculer/registry.rb', line 245 def fetch_action_for_node_id(action_name, node_id) node = fetch_node(node_id) fetch_action_from_node(action_name, node) end |
#fetch_events_for_emit(event_name) ⇒ Array<Moleculer::Service::Event>
Fetches all the events for the given event name that should be used to emit an event. This is load balanced, and will fetch a different node for nodes that duplicate the service/event
206 207 208 |
# File 'lib/moleculer/registry.rb', line 206 def fetch_events_for_emit(event_name) @events.fetch_events(event_name) end |
#fetch_node(node_id) ⇒ Moleculer::Node
It fetches the given node, and raises an exception if the node does not exist
217 218 219 220 221 |
# File 'lib/moleculer/registry.rb', line 217 def fetch_node(node_id) @nodes.fetch_node(node_id) rescue KeyError raise Errors::NodeNotFound, "The node with the id '#{node_id}' was not found." end |
#missing_services(*services) ⇒ Object
250 251 252 |
# File 'lib/moleculer/registry.rb', line 250 def missing_services(*services) services - @services.keys end |
#register_node(node) ⇒ Moleculer::Node
Registers the node with the registry and updates the action/event handler lists.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/moleculer/registry.rb', line 169 def register_node(node) return local_node if @local_node && node.id == @local_node.id if node.local? raise Errors::LocalNodeAlreadyRegistered, "A LOCAL node has already been registered" if @local_node @logger.info "registering LOCAL node '#{node.id}'" @local_node = node end @logger.info "registering node #{node.id}" unless node.local? @nodes.add_node(node) update_actions(node) update_events(node) # always call this last, as it will immediately cause processes waiting for services to show it as ready even if # the actions or events lists have not been updated. update_services(node) node end |
#remove_node(node_id, reason = nil) ⇒ Object
Removes the node with the given id. Because this must act across multiple indexes this action uses a semaphore to reduce the chance of race conditions.
259 260 261 262 263 264 265 266 267 |
# File 'lib/moleculer/registry.rb', line 259 def remove_node(node_id, reason = nil) @remove_semaphore.acquire @logger.info "removing node '#{node_id}'" unless reason @logger.info "removing node '#{node_id}' because #{reason}" @nodes.remove_node(node_id) @actions.remove_node(node_id) @events.remove_node(node_id) @remove_semaphore.release end |
#safe_fetch_node(node_id) ⇒ Moleculer::Node
Fetches the given node, and returns nil if the node was not found
229 230 231 232 233 |
# File 'lib/moleculer/registry.rb', line 229 def safe_fetch_node(node_id) fetch_node(node_id) rescue Errors::NodeNotFound nil end |