Class: Roby::Log::Server
- Extended by:
- Logger::Forward
- Defined in:
- lib/roby/log/server.rb
Constant Summary collapse
- RING_PORT =
48904- POLLING_TIMEOUT =
Default value for #polling_timeout
0.1
Class Attribute Summary collapse
-
.logger ⇒ Object
Returns the value of attribute logger.
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
A
remote_server => [queue, thread]hash which contains the set of connection parameters for each connected peer. -
#polling_timeout ⇒ Object
readonly
Returns the value of attribute polling_timeout.
-
#ring_server ⇒ Object
readonly
The Distributed::RingServer object which publishes this display server on the network.
-
#subscriptions ⇒ Object
readonly
A
stream_id => [remote_server, ...]hash containing the set of subscribed remote peer for each stream.
Class Method Summary collapse
-
.available_servers ⇒ Object
Returns the set of servers that have been discovered by the discovery mechanism at this time.
-
.disable_discovery ⇒ Object
Stops the discovery thread if it is running.
-
.enable_discovery(broadcast, port = RING_PORT, period = 10) ⇒ Object
Start an asynchronous discovery mechanism.
- .synchronize ⇒ Object
Instance Method Summary collapse
-
#added_stream(stream) ⇒ Object
New stream.
- #connect(remote) ⇒ Object
- #disconnect(remote) ⇒ Object
-
#initialize(port = RING_PORT, polling_timeout = POLLING_TIMEOUT) ⇒ Server
constructor
A new instance of Server.
-
#polling ⇒ Object
Polls all data sources and pushes the samples to the subscribed clients.
- #quit ⇒ Object
-
#removed_stream(id) ⇒ Object
Stream
idhas stopped. -
#streams ⇒ Object
Returns a set of Roby::Log::DataStream objects describing the available data sources on this stream.
-
#subscribe(id, remote) ⇒ Object
Make
remotesubscribe to the stream identified byid. - #synchronize ⇒ Object
-
#unsubscribe(id, remote) ⇒ Object
Rmoves a subscription of
remoteonid.
Constructor Details
#initialize(port = RING_PORT, polling_timeout = POLLING_TIMEOUT) ⇒ Server
Returns a new instance of Server.
106 107 108 109 110 111 112 113 114 |
# File 'lib/roby/log/server.rb', line 106 def initialize(port = RING_PORT, polling_timeout = POLLING_TIMEOUT) @ring_server = Distributed::RingServer.new(DRbObject.new(self), :port => port) @mutex = Mutex.new @streams = Array.new @connections = Hash.new @subscriptions = Hash.new { |h, k| h[k] = Set.new } @polling_timeout = polling_timeout @polling = Thread.new(&method(:polling)) end |
Class Attribute Details
.logger ⇒ Object
Returns the value of attribute logger.
13 14 15 |
# File 'lib/roby/log/server.rb', line 13 def logger @logger end |
Instance Attribute Details
#connections ⇒ Object (readonly)
A remote_server => [queue, thread] hash which contains the set of connection parameters for each connected peer
95 96 97 |
# File 'lib/roby/log/server.rb', line 95 def connections @connections end |
#polling_timeout ⇒ Object (readonly)
Returns the value of attribute polling_timeout.
104 105 106 |
# File 'lib/roby/log/server.rb', line 104 def polling_timeout @polling_timeout end |
#ring_server ⇒ Object (readonly)
The Distributed::RingServer object which publishes this display server on the network
99 100 101 |
# File 'lib/roby/log/server.rb', line 99 def ring_server @ring_server end |
#subscriptions ⇒ Object (readonly)
A stream_id => [remote_server, ...] hash containing the set of subscribed remote peer for each stream
92 93 94 |
# File 'lib/roby/log/server.rb', line 92 def subscriptions @subscriptions end |
Class Method Details
.available_servers ⇒ Object
Returns the set of servers that have been discovered by the discovery mechanism at this time
See also enable_discovery and disable_discovery
30 31 32 33 34 |
# File 'lib/roby/log/server.rb', line 30 def self.available_servers synchronize do @available_servers.dup end end |
.disable_discovery ⇒ Object
Stops the discovery thread if it is running
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/roby/log/server.rb', line 77 def self.disable_discovery Control.finalizers.delete(method(:disable_discovery)) if @discovery_thread @discovery_thread.raise Interrupt, "quitting" @discovery_thread.join @discovery_thread = nil synchronize do @available_servers.clear end end end |
.enable_discovery(broadcast, port = RING_PORT, period = 10) ⇒ Object
Start an asynchronous discovery mechanism. This will fill the #available_servers set of servers. broadcast is an array of addresses on which discovery should be done and period is the discovery period in seconds
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/roby/log/server.rb', line 40 def self.enable_discovery(broadcast, port = RING_PORT, period = 10) if @discovery_thread raise ArgumentError, "already enabled discovery" end finger = Rinda::RingFinger.new(broadcast, port) discovered_displays = Array.new @available_servers = Array.new # Add disable_discovery in the list of finalizers Control.finalizers << method(:disable_discovery) @discovery_thread = Thread.new do begin loop do finger.lookup_ring(period) do |remote| synchronize do unless @available_servers.include?(remote) @available_servers << remote end discovered_displays << remote end end sleep(period) synchronize do @available_servers, discovered_displays = discovered_displays, @available_servers discovered_displays.clear end end rescue Interrupt end end end |
.synchronize ⇒ Object
22 23 24 |
# File 'lib/roby/log/server.rb', line 22 def self.synchronize @mutex.synchronize { yield } end |
Instance Method Details
#added_stream(stream) ⇒ Object
New stream
208 209 210 211 212 213 214 215 |
# File 'lib/roby/log/server.rb', line 208 def added_stream(stream) synchronize do @streams << stream connections.each_value do |queue, _| queue.push [:added_stream, stream.class.name, stream.id, stream.name, stream.type] end end end |
#connect(remote) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/roby/log/server.rb', line 120 def connect(remote) synchronize do queue = Queue.new receiver_thread = pushing_loop(remote, queue) connections[remote] = [queue, receiver_thread] Server.info "#{remote.__drburi} connected" end streams.map do |s| [s.class.name, s.id, s.name, s.type] end end |
#disconnect(remote) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/roby/log/server.rb', line 133 def disconnect(remote) thread = synchronize do queue, thread = connections[remote] if thread thread.raise Interrupt, "quitting" thread end end thread.join if thread Server.info "#{remote.__drburi} disconnected" end |
#polling ⇒ Object
Polls all data sources and pushes the samples to the subscribed clients
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/roby/log/server.rb', line 148 def polling loop do s, data = nil done_sth = false synchronize do @streams.each do |s| done_sth ||= if s.reinit? Roby::Log::Server.info "reinitializing #{s}" s.reinit! reinit(s.id) true elsif s.has_sample? if Roby::Log::Server.logger.debug? Roby::Log::Server.debug "new sample for #{s} at #{s.current_time.to_hms}" end push(s.id, s.current_time, s.read) true end end end unless done_sth sleep(polling_timeout) end end rescue Interrupt end |
#quit ⇒ Object
295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/roby/log/server.rb', line 295 def quit if @polling @polling.raise Interrupt, "quitting" @polling.join end connections.each_value do |queue, thread| queue.push [:quit] thread.join end end |
#removed_stream(id) ⇒ Object
Stream id has stopped
218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/roby/log/server.rb', line 218 def removed_stream(id) synchronize do found = false @streams.delete_if { |s| found ||= (s.id == id) } unless found raise ArgumentError, "no such stream" end connections.each_value do |queue, _| queue.push [:removed_stream, id] end subscriptions.delete(id) end end |
#streams ⇒ Object
Returns a set of Roby::Log::DataStream objects describing the available data sources on this stream
235 236 237 238 239 |
# File 'lib/roby/log/server.rb', line 235 def streams synchronize do @streams.dup end end |
#subscribe(id, remote) ⇒ Object
Make remote subscribe to the stream identified by id. When new data is available, #push will be called on remote. The exact format of the pushed sample depends on the type of the stream
If the stream stop existing (because it source has quit for instance), #removed_stream will be called on the remote object
248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/roby/log/server.rb', line 248 def subscribe(id, remote) synchronize do if s = @streams.find { |s| s.id == id } subscriptions[id] << remote if data = s.read_all remote.init(id, data) end else raise ArgumentError, "no such stream" end end end |
#synchronize ⇒ Object
116 117 118 |
# File 'lib/roby/log/server.rb', line 116 def synchronize @mutex.synchronize { yield } end |
#unsubscribe(id, remote) ⇒ Object
Rmoves a subscription of remote on id
262 263 264 265 266 267 268 |
# File 'lib/roby/log/server.rb', line 262 def unsubscribe(id, remote) synchronize do if subscriptions.has_key?(id) subscriptions[id].delete(remote) end end end |