Module: Wires::Cluster
- Defined in:
- lib/wires/cluster.rb,
lib/wires/cluster/udp.rb,
lib/wires/cluster/json.rb
Defined Under Namespace
Modules: UDP
Constant Summary collapse
- PORT =
4567
- GROUP =
"224.0.1.33"
Class Method Summary collapse
-
._load_json(str) ⇒ Object
Perform rudimentary JSON verification to make sure str is not a partial payload due to missing or yet-to-come packets, then actually load in the JSON objects and return them.
-
._rx_loop ⇒ Object
Loop through incoming messages and deploy valid firable events.
-
.listen(action = :start) ⇒ Object
Start the UDP receiving thread (or stop it, if action==:stop).
-
.listen! ⇒ Object
Block indefinitely while receiving messages.
-
.spout(action = :start) ⇒ Object
Enable UDP sending of fired events (or disable, if action==:stop).
Class Method Details
._load_json(str) ⇒ Object
Perform rudimentary JSON verification to make sure str is not
a partial payload due to missing or yet-to-come packets,
then actually load in the JSON objects and return them
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/wires/cluster/json.rb', line 71 def _load_json(str) ref_parser = JSON::Pure::Parser # Ignore json comments and strings for open/close count verification stripped_str = str.gsub(ref_parser::IGNORE, '') .gsub(ref_parser::STRING, '') # Missing tail if any quote symbols remain after string purge raise JSON::MissingTailError if stripped_str.match /(?<!\\)"/ # Make sure open/close symbol counts match [[ref_parser::OBJECT_OPEN, ref_parser::OBJECT_CLOSE], [ref_parser::ARRAY_OPEN, ref_parser::ARRAY_CLOSE]].each do |a, z| case (stripped_str.scan(a).size) <=> (stripped_str.scan(z).size) when 1; raise JSON::MissingTailError when -1; raise JSON::MissingHeadError end end # Try to load objects from the string and return the result JSON.load(str) end |
._rx_loop ⇒ Object
Loop through incoming messages and deploy valid firable events
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/wires/cluster.rb', line 48 def _rx_loop ongoing = {} loop do msg = @rx.gets ongoing[msg.source] ||= '' ongoing[msg.source] += msg begin data = _load_json(ongoing[msg.source]) if (data[0].is_a? Event) and (data[1].is_a? Channel) event, chan = data event.instance_variable_set(:@cluster_source, msg.source) Convenience::fire *data end ongoing[msg.source] = nil rescue JSON::MissingTailError rescue JSON::ParserError ongoing[msg.source] = nil end end end |
.listen(action = :start) ⇒ Object
Start the UDP receiving thread (or stop it, if action==:stop)
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/wires/cluster.rb', line 21 def listen(action=:start) case action when :start @rx ||= UDP::RX.new GROUP, PORT raise IOError, "Probably firewalled..." unless @rx.test! @rx_thread ||= Thread.new { _rx_loop } when :stop @rx_thread.kill; @rx_thread = nil @rx.close; @rx = nil end end |
.listen! ⇒ Object
Block indefinitely while receiving messages
15 16 17 18 |
# File 'lib/wires/cluster.rb', line 15 def listen! listen @rx_thread.join end |
.spout(action = :start) ⇒ Object
Enable UDP sending of fired events (or disable, if action==:stop)
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/wires/cluster.rb', line 34 def spout(action=:start) case action when :start @tx ||= UDP::TX.new GROUP, PORT @tx_proc = Wires::Channel.after_fire(true) do |e,c| @tx.puts JSON.dump [e,c] unless e.cluster_source end when :stop Wires::Channel.remove_hook :@after_fire, @tx_proc @tx.close; @tx = nil end end |