Class: DCell::Node
- Inherits:
-
Object
- Object
- DCell::Node
- Extended by:
- Enumerable
- Includes:
- Celluloid, Celluloid::FSM
- Defined in:
- lib/dcell/node.rb
Overview
A node in a DCell cluster
Class Attribute Summary collapse
-
.heartbeat_rate ⇒ Object
readonly
Returns the value of attribute heartbeat_rate.
-
.heartbeat_timeout ⇒ Object
readonly
Returns the value of attribute heartbeat_timeout.
Instance Attribute Summary collapse
-
#addr ⇒ Object
readonly
Returns the value of attribute addr.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Class Method Summary collapse
-
.[] ⇒ Object
Find a node by its node ID.
-
.all ⇒ Object
Return all available nodes in the cluster.
-
.each ⇒ Object
Iterate across all available nodes.
-
.find(id) ⇒ Object
Find a node by its node ID.
Instance Method Summary collapse
-
#actors ⇒ Object
(also: #all)
List all registered actors on this node.
- #finalize ⇒ Object
-
#find(name) ⇒ Object
(also: #[])
Find an actor registered with a given name on this node.
-
#handle_heartbeat ⇒ Object
Handle an incoming heartbeat for this node.
-
#initialize(id, addr) ⇒ Node
constructor
A new instance of Node.
-
#inspect ⇒ Object
Friendlier inspection.
-
#send_heartbeat ⇒ Object
Send a heartbeat message after the given interval.
-
#send_message(message) ⇒ Object
(also: #<<)
Send a message to another DCell node.
-
#socket ⇒ Object
Obtain the node’s 0MQ socket.
Constructor Details
#initialize(id, addr) ⇒ Node
Returns a new instance of Node.
69 70 71 72 73 74 75 76 |
# File 'lib/dcell/node.rb', line 69 def initialize(id, addr) @id, @addr = id, addr @socket = nil @heartbeat = nil # Total hax to accommodate the new Celluloid::FSM API attach self end |
Class Attribute Details
.heartbeat_rate ⇒ Object (readonly)
Returns the value of attribute heartbeat_rate.
30 31 32 |
# File 'lib/dcell/node.rb', line 30 def heartbeat_rate @heartbeat_rate end |
.heartbeat_timeout ⇒ Object (readonly)
Returns the value of attribute heartbeat_timeout.
30 31 32 |
# File 'lib/dcell/node.rb', line 30 def heartbeat_timeout @heartbeat_timeout end |
Instance Attribute Details
#addr ⇒ Object (readonly)
Returns the value of attribute addr.
6 7 8 |
# File 'lib/dcell/node.rb', line 6 def addr @addr end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
6 7 8 |
# File 'lib/dcell/node.rb', line 6 def id @id end |
Class Method Details
.[] ⇒ Object
Find a node by its node ID
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/dcell/node.rb', line 66 def find(id) node = @lock.synchronize { @nodes[id] } return node if node addr = Directory[id] if addr if id == DCell.id node = DCell.me else node = Node.new(id, addr) end @lock.synchronize do @nodes[id] ||= node @nodes[id] end end end |
.all ⇒ Object
Return all available nodes in the cluster
33 34 35 36 37 |
# File 'lib/dcell/node.rb', line 33 def all Directory.all.map do |node_id| find node_id end end |
.each ⇒ Object
Iterate across all available nodes
40 41 42 43 44 |
# File 'lib/dcell/node.rb', line 40 def each Directory.all.each do |node_id| yield find node_id end end |
.find(id) ⇒ Object
Find a node by its node ID
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/dcell/node.rb', line 47 def find(id) node = @lock.synchronize { @nodes[id] } return node if node addr = Directory[id] if addr if id == DCell.id node = DCell.me else node = Node.new(id, addr) end @lock.synchronize do @nodes[id] ||= node @nodes[id] end end end |
Instance Method Details
#actors ⇒ Object Also known as: all
List all registered actors on this node
113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/dcell/node.rb', line 113 def actors request = Message::List.new(Thread.mailbox) request response = receive do |msg| msg.respond_to?(:request_id) && msg.request_id == request.id end abort response.value if response.is_a? ErrorResponse response.value end |
#finalize ⇒ Object
78 79 80 81 |
# File 'lib/dcell/node.rb', line 78 def finalize transition :shutdown @socket.close if socket end |
#find(name) ⇒ Object Also known as: []
Find an actor registered with a given name on this node
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/dcell/node.rb', line 99 def find(name) request = Message::Find.new(Thread.mailbox, name) request response = receive do |msg| msg.respond_to?(:request_id) && msg.request_id == request.id end abort response.value if response.is_a? ErrorResponse response.value end |
#handle_heartbeat ⇒ Object
Handle an incoming heartbeat for this node
153 154 155 156 |
# File 'lib/dcell/node.rb', line 153 def handle_heartbeat transition :connected transition :partitioned, :delay => self.class.heartbeat_timeout end |
#inspect ⇒ Object
Friendlier inspection
159 160 161 |
# File 'lib/dcell/node.rb', line 159 def inspect "#<DCell::Node[#{@id}] @addr=#{@addr.inspect}>" end |
#send_heartbeat ⇒ Object
Send a heartbeat message after the given interval
147 148 149 150 |
# File 'lib/dcell/node.rb', line 147 def send_heartbeat DCell::Message::Heartbeat.new @heartbeat = after(self.class.heartbeat_rate) { send_heartbeat } end |
#send_message(message) ⇒ Object Also known as: <<
Send a message to another DCell node
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/dcell/node.rb', line 127 def () begin string = Marshal.dump() rescue => ex abort ex end if ::ZMQ::Util.resultcode_ok? socket.send_string string # Ideally we could reset the heartbeat counter now because we've sent # a message. Heartbeats could work off all messages rather than just # DCell::Message::Heartbeat. Unfortunately this functionality is not # yet implemented, sorry! # @heartbeat.reset else raise "error sending 0MQ message: #{::ZMQ::Util.error_string}" end end |
#socket ⇒ Object
Obtain the node’s 0MQ socket
84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/dcell/node.rb', line 84 def socket return @socket if @socket @socket = DCell.zmq_context.socket(::ZMQ::PUSH) unless ::ZMQ::Util.resultcode_ok? @socket.connect @addr @socket.close @socket = nil raise "error connecting to #{addr}: #{::ZMQ::Util.error_string}" end transition :connected @socket end |