Class: ZMachine::ConnectionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/zmachine/connection_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(selector) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.



12
13
14
15
16
17
18
19
# File 'lib/zmachine/connection_manager.rb', line 12

def initialize(selector)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @selector = selector
  @connections = Set.new
  @zmq_connections = Set.new
  @new_connections = Set.new
  @unbound_connections = Set.new
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



10
11
12
# File 'lib/zmachine/connection_manager.rb', line 10

def connections
  @connections
end

Instance Method Details

#add_new_connectionsObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/zmachine/connection_manager.rb', line 84

def add_new_connections
  @new_connections.each do |connection|
    ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug
    begin
      connection.register(@selector)
      @connections << connection
      if connection.channel.is_a?(ZMQChannel)
        @zmq_connections << connection
        connection.connection_completed
      end
    rescue ClosedChannelException => e
      @unbound_connections << [connection, e]
    end
  end
  @new_connections.clear
end

#bind(address, port_or_type, handler, *args, &block) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/zmachine/connection_manager.rb', line 32

def bind(address, port_or_type, handler, *args, &block)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
  connection = build_connection(handler, *args)
  connection.bind(address, port_or_type, &block)
  @new_connections << connection
  connection
end

#cleanupObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/zmachine/connection_manager.rb', line 105

def cleanup
  return if @unbound_connections.empty?
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @unbound_connections.each do |connection|
    reason = nil
    connection, reason = *connection if connection.is_a?(Array)
    begin
      @connections.delete(connection)
      @zmq_connections.delete(connection)
      if connection.method(:unbind).arity != 0
        connection.unbind(reason)
      else
        connection.unbind
      end
      connection.channel.close!
    rescue Exception => e
      ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug
    end
  end
  @unbound_connections.clear
end

#close_connection(connection, reason = nil) ⇒ Object



79
80
81
82
# File 'lib/zmachine/connection_manager.rb', line 79

def close_connection(connection, reason = nil)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, reason: reason.inspect) if ZMachine.debug
  @unbound_connections << [connection, reason]
end

#connect(address, port_or_type, handler, *args, &block) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/zmachine/connection_manager.rb', line 40

def connect(address, port_or_type, handler, *args, &block)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
  connection = build_connection(handler, *args)
  connection.connect(address, port_or_type, &block)
  @new_connections << connection
  connection
rescue java.nio.channels.UnresolvedAddressException
  raise ZMachine::ConnectionError.new('unable to resolve server address')
end

#idle?Boolean

Returns:

  • (Boolean)


21
22
23
24
# File 'lib/zmachine/connection_manager.rb', line 21

def idle?
  @new_connections.size == 0 and
  @zmq_connections.none? {|c| c.channel.can_recv? } # see comment in #process
end

#is_connected?(connection) ⇒ Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/zmachine/connection_manager.rb', line 101

def is_connected?(connection)
  @connections.include?(connection)
end

#processObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/zmachine/connection_manager.rb', line 50

def process
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  add_new_connections
  it = @selector.selected_keys.iterator
  while it.has_next
    process_connection(it.next.attachment)
    it.remove
  end
  # super ugly, but ZMQ only triggers the FD if and only if you
  # have read every message from the socket. under load however
  # there will always be new messages in the mailbox between last
  # recv and next select, which causes the FD never to be
  # triggered again.
  # the only mitigation strategy i came up with is iterating over all
  # channels. performance impact shouldn't be too huge, since ZMQ takes
  # care of all the multiplexing and we only have a small amount of ZMQ
  # connections in the reactor
  @zmq_connections.each do |connection|
    connection.readable! if connection.channel.can_recv?
  end
end

#process_connection(connection) ⇒ Object



72
73
74
75
76
77
# File 'lib/zmachine/connection_manager.rb', line 72

def process_connection(connection)
  new_connection = connection.process_events
  @new_connections << new_connection if new_connection
rescue IOException => e
  close_connection(connection, e)
end

#shutdownObject



26
27
28
29
30
# File 'lib/zmachine/connection_manager.rb', line 26

def shutdown
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @unbound_connections += @connections
  cleanup
end