Module: Wires::Cluster

Defined in:
lib/wires/cluster.rb,
lib/wires/cluster/udp.rb,
lib/wires/cluster/json.rb

Defined Under Namespace

Modules: UDP

Constant Summary collapse

PORT =
4567
GROUP =
"224.0.1.33"

Class Method Summary collapse

Class Method Details

._load_json(str) ⇒ Object

Perform rudimentary JSON verification to make sure str is not

a partial payload due to missing or yet-to-come packets,
then actually load in the JSON objects and return them


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/wires/cluster/json.rb', line 71

def _load_json(str)
  ref_parser = JSON::Pure::Parser
  
  # Ignore json comments and strings for open/close count verification
  stripped_str = str.gsub(ref_parser::IGNORE, '')
                  .gsub(ref_parser::STRING, '')
  
  # Missing tail if any quote symbols remain after string purge
  raise JSON::MissingTailError if stripped_str.match /(?<!\\)"/
  
  # Make sure open/close symbol counts match
  [[ref_parser::OBJECT_OPEN, ref_parser::OBJECT_CLOSE],
   [ref_parser::ARRAY_OPEN,  ref_parser::ARRAY_CLOSE]].each do |a, z|
    case (stripped_str.scan(a).size) <=> (stripped_str.scan(z).size)
    when  1; raise JSON::MissingTailError
    when -1; raise JSON::MissingHeadError
    end
  end
  
  # Try to load objects from the string and return the result
  JSON.load(str)
end

._rx_loopObject

Loop through incoming messages and deploy valid firable events



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/wires/cluster.rb', line 48

def _rx_loop
  ongoing = {}
  loop do
    msg = @rx.gets
    ongoing[msg.source] ||= ''
    ongoing[msg.source]  += msg
    
    begin
      data = _load_json(ongoing[msg.source])
      if (data[0].is_a? Event) and (data[1].is_a? Channel)
        event, chan = data
        event.instance_variable_set(:@cluster_source, msg.source)
        Convenience::fire *data
      end
      ongoing[msg.source] = nil
    rescue JSON::MissingTailError
    rescue JSON::ParserError
      ongoing[msg.source] = nil
    end
    
  end
end

.listen(action = :start) ⇒ Object

Start the UDP receiving thread (or stop it, if action==:stop)



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/wires/cluster.rb', line 21

def listen(action=:start)
  case action
  when :start
    @rx ||= UDP::RX.new GROUP, PORT
    raise IOError, "Probably firewalled..." unless @rx.test!
    @rx_thread ||= Thread.new { _rx_loop }
  when :stop
    @rx_thread.kill; @rx_thread = nil
    @rx.close;       @rx        = nil
  end
end

.listen!Object

Block indefinitely while receiving messages



15
16
17
18
# File 'lib/wires/cluster.rb', line 15

def listen!
  listen
  @rx_thread.join
end

.spout(action = :start) ⇒ Object

Enable UDP sending of fired events (or disable, if action==:stop)



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/wires/cluster.rb', line 34

def spout(action=:start)
  case action
  when :start
    @tx ||= UDP::TX.new GROUP, PORT
    @tx_proc = Wires::Channel.after_fire(true) do |e,c|
      @tx.puts JSON.dump [e,c] unless e.cluster_source
    end
  when :stop
    Wires::Channel.remove_hook :@after_fire, @tx_proc
    @tx.close; @tx = nil
  end
end