Class: Rinda::RingServer
- Inherits:
-
Object
- Object
- Rinda::RingServer
- Includes:
- DRbUndumped
- Defined in:
- lib/rinda/ring.rb
Overview
A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. Default service location uses the following steps:
-
A RingServer begins listening on the network broadcast UDP address.
-
A RingFinger sends a UDP packet containing the DRb URI where it will listen for a reply.
-
The RingServer receives the UDP packet and connects back to the provided DRb URI with the DRb service.
A RingServer requires a TupleSpace:
ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new
RingServer can also listen on multicast addresses for announcements. This allows multiple RingServers to run on the same host. To use network broadcast and multicast:
ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]
Defined Under Namespace
Classes: Renewer
Instance Method Summary collapse
-
#do_reply ⇒ Object
Pulls lookup tuples out of the TupleSpace and sends their DRb object the address of the local TupleSpace.
-
#do_write(msg) ⇒ Object
Extracts the response URI from
msg
and adds it to TupleSpace where it will be picked up byreply_service
for notification. -
#initialize(ts, addresses = [Socket::INADDR_ANY], port = Ring_PORT) ⇒ RingServer
constructor
Advertises
ts
on the givenaddresses
atport
. -
#make_socket(address, interface_address = nil, multicast_interface = 0) ⇒ Object
Creates a socket at
address
. -
#reply_service ⇒ Object
Creates a thread that notifies waiting clients from the TupleSpace.
-
#shutdown ⇒ Object
Shuts down the RingServer.
-
#write_services ⇒ Object
Creates threads that pick up UDP packets and passes them to do_write for decoding.
Constructor Details
#initialize(ts, addresses = [Socket::INADDR_ANY], port = Ring_PORT) ⇒ RingServer
Advertises ts
on the given addresses
at port
.
If addresses
is omitted only the UDP broadcast address is used.
addresses
can contain multiple addresses. If a multicast address is given in addresses
then the RingServer will listen for multicast queries.
If you use IPv4 multicast you may need to set an address of the inbound interface which joins a multicast group.
ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])
You can set addresses as an Array Object. The first element of the Array is a multicast address and the second is an inbound interface address. If the second is omitted then ‘0.0.0.0’ is used.
If you use IPv6 multicast you may need to set both the local interface address and the inbound interface index:
rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])
The first element is a multicast address and the second is an inbound interface address. The third is an inbound interface index.
At this time there is no easy way to get an interface index by name.
If the second is omitted then ‘::1’ is used. If the third is omitted then 0 (default interface) is used.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/rinda/ring.rb', line 94 def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) @port = port if Integer === addresses then addresses, @port = [Socket::INADDR_ANY], addresses end @renewer = Renewer.new @ts = ts @sockets = [] addresses.each do |address| if Array === address make_socket(*address) else make_socket(address) end end @w_services = write_services @r_service = reply_service end |
Instance Method Details
#do_reply ⇒ Object
Pulls lookup tuples out of the TupleSpace and sends their DRb object the address of the local TupleSpace.
218 219 220 221 222 |
# File 'lib/rinda/ring.rb', line 218 def do_reply tuple = @ts.take([:lookup_ring, nil], @renewer) Thread.new { tuple[1].call(@ts) rescue nil} rescue end |
#do_write(msg) ⇒ Object
Extracts the response URI from msg
and adds it to TupleSpace where it will be picked up by reply_service
for notification.
193 194 195 196 197 198 199 200 201 |
# File 'lib/rinda/ring.rb', line 193 def do_write(msg) Thread.new do begin tuple, sec = Marshal.load(msg) @ts.write(tuple, sec) rescue end end end |
#make_socket(address, interface_address = nil, multicast_interface = 0) ⇒ Object
Creates a socket at address
If address
is multicast address then interface_address
and multicast_interface
can be set as optional.
A created socket is bound to interface_address
. If you use IPv4 multicast then the interface of interface_address
is used as the inbound interface. If interface_address
is omitted or nil then ‘0.0.0.0’ or ‘::1’ is used.
If you use IPv6 multicast then multicast_interface
is used as the inbound interface. multicast_interface
is a network interface index. If multicast_interface
is omitted then 0 (default interface) is used.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/rinda/ring.rb', line 132 def make_socket(address, interface_address=nil, multicast_interface=0) addrinfo = Addrinfo.udp(address, @port) socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then if Socket.const_defined?(:SO_REUSEPORT) then socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) else socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) end if addrinfo.ipv4_multicast? then interface_address = '0.0.0.0' if interface_address.nil? socket.bind(Addrinfo.udp(interface_address, @port)) mreq = IPAddr.new(addrinfo.ip_address).hton + IPAddr.new(interface_address).hton socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) else interface_address = '::1' if interface_address.nil? socket.bind(Addrinfo.udp(interface_address, @port)) mreq = IPAddr.new(addrinfo.ip_address).hton + [multicast_interface].pack('I') socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) end else socket.bind(addrinfo) end socket rescue socket = socket.close if socket raise ensure @sockets << socket if socket end |
#reply_service ⇒ Object
Creates a thread that notifies waiting clients from the TupleSpace.
206 207 208 209 210 211 212 |
# File 'lib/rinda/ring.rb', line 206 def reply_service Thread.new do loop do do_reply end end end |
#shutdown ⇒ Object
Shuts down the RingServer
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/rinda/ring.rb', line 227 def shutdown @renewer.renew = false @w_services.each do |thread| thread.kill thread.join end @sockets.each do |socket| socket.close end @r_service.kill @r_service.join end |
#write_services ⇒ Object
Creates threads that pick up UDP packets and passes them to do_write for decoding.
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/rinda/ring.rb', line 178 def write_services @sockets.map do |s| Thread.new(s) do |socket| loop do msg = socket.recv(1024) do_write(msg) end end end end |