Class: Telegraph::Switchboard

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/telegraph/switchboard.rb

Instance Method Summary collapse

Methods included from Logging

#debug, logger

Instance Method Details

#add_wire(wire) ⇒ Object



38
39
40
# File 'lib/telegraph/switchboard.rb', line 38

def add_wire(wire)
  using_wires {|w| w << wire }
end

#any_live_wires?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/telegraph/switchboard.rb', line 30

def any_live_wires?
  live_wires.any?
end

#close_all_wiresObject



42
43
44
45
# File 'lib/telegraph/switchboard.rb', line 42

def close_all_wires
  debug { "Closing all wires" }
  using_wires {|w| w.each { |wire| wire.close rescue nil } }
end

#drop_wire(wire) ⇒ Object



34
35
36
# File 'lib/telegraph/switchboard.rb', line 34

def drop_wire(wire)
  using_wires {|w| w.delete wire }
end

#live_wiresObject



47
48
49
# File 'lib/telegraph/switchboard.rb', line 47

def live_wires
  using_wires {|w| w.select {|wire| !wire.closed?}}
end

#next_message(options = {:timeout => 0}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/telegraph/switchboard.rb', line 11

def next_message(options = {:timeout => 0})
  debug { "Waiting for next message on any of #{live_wires.size} wires for #{options[:timeout]} seconds" }

  if live_wires.empty?
    sleep 0.01
    Thread.pass 
    raise NoMessageAvailable 
  end

  readers, = IO.select live_wires.map {|w| w.stream}, nil, nil, options[:timeout]
  raise NoMessageAvailable unless readers

  wire = using_wires {|wires| wires.detect {|w| w.stream == readers.first} }
  return wire.next_message(options.merge(:timeout => 0)), wire
rescue LineDead => e
  debug { "LineDead: #{e.message} while reading message from wire" }
  raise NoMessageAvailable
end

#process_messages(options = {:timeout => 0}) ⇒ Object



5
6
7
8
9
# File 'lib/telegraph/switchboard.rb', line 5

def process_messages(options = {:timeout => 0})
  yield next_message(options) while true
rescue NoMessageAvailable
  retry
end

#using_wiresObject



51
52
53
54
55
# File 'lib/telegraph/switchboard.rb', line 51

def using_wires
  @wires ||= []
  @wires_mutex ||= Mutex.new
  @wires_mutex.synchronize { yield @wires }
end