Class: ZMachine::ConnectionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/zmachine/connection_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(selector) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.



12
13
14
15
16
17
18
19
# File 'lib/zmachine/connection_manager.rb', line 12

def initialize(selector)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @selector = selector
  @connections = Set.new
  @zmq_connections = Set.new
  @new_connections = Set.new
  @closing_connections = []
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



10
11
12
# File 'lib/zmachine/connection_manager.rb', line 10

def connections
  @connections
end

Instance Method Details

#add_new_connectionsObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/zmachine/connection_manager.rb', line 84

def add_new_connections
  @new_connections.each do |connection|
    ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug
    begin
      connection.register(@selector)
      @connections << connection
      if connection.channel.is_a?(ZMQChannel)
        @zmq_connections << connection
        connection.connection_completed
      end
    rescue ClosedChannelException => e
      @closing_connections << [connection, false, e]
    end
  end
  @new_connections.clear
end

#bind(address, port_or_type, handler, *args, &block) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/zmachine/connection_manager.rb', line 32

def bind(address, port_or_type, handler, *args, &block)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
  connection = build_connection(handler, *args)
  connection.bind(address, port_or_type, &block)
  @new_connections << connection
  connection
end

#cleanupObject



105
106
107
108
109
110
111
112
113
# File 'lib/zmachine/connection_manager.rb', line 105

def cleanup
  return if @closing_connections.empty?
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  closing_connections = @closing_connections
  @closing_connections = []
  closing_connections.each do |connection|
    unbind_connection(connection)
  end
end

#close_connection(connection, after_writing = false, reason = nil) ⇒ Object



79
80
81
82
# File 'lib/zmachine/connection_manager.rb', line 79

def close_connection(connection, after_writing = false, reason = nil)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, reason: reason.inspect) if ZMachine.debug
  @closing_connections << [connection, after_writing, reason]
end

#connect(address, port_or_type, handler, *args, &block) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/zmachine/connection_manager.rb', line 40

def connect(address, port_or_type, handler, *args, &block)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
  connection = build_connection(handler, *args)
  connection.connect(address, port_or_type, &block)
  @new_connections << connection
  connection
rescue java.nio.channels.UnresolvedAddressException
  raise ZMachine::ConnectionError.new('unable to resolve server address')
end

#idle?Boolean

Returns:

  • (Boolean)


21
22
23
24
# File 'lib/zmachine/connection_manager.rb', line 21

def idle?
  @new_connections.size == 0 and
  @zmq_connections.none? {|c| c.channel.can_recv? } # see comment in #process
end

#is_connected?(connection) ⇒ Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/zmachine/connection_manager.rb', line 101

def is_connected?(connection)
  @connections.include?(connection)
end

#processObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/zmachine/connection_manager.rb', line 50

def process
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  add_new_connections
  it = @selector.selected_keys.iterator
  while it.has_next
    process_connection(it.next.attachment)
    it.remove
  end
  # super ugly, but ZMQ only triggers the FD if and only if you
  # have read every message from the socket. under load however
  # there will always be new messages in the mailbox between last
  # recv and next select, which causes the FD never to be
  # triggered again.
  # the only mitigation strategy i came up with is iterating over all
  # channels. performance impact shouldn't be too huge, since ZMQ takes
  # care of all the multiplexing and we only have a small amount of ZMQ
  # connections in the reactor
  @zmq_connections.each do |connection|
    connection.readable! if connection.channel.can_recv?
  end
end

#process_connection(connection) ⇒ Object



72
73
74
75
76
77
# File 'lib/zmachine/connection_manager.rb', line 72

def process_connection(connection)
  new_connection = connection.process_events
  @new_connections << new_connection if new_connection
rescue IOException => e
  close_connection(connection, false, e)
end

#shutdownObject



26
27
28
29
30
# File 'lib/zmachine/connection_manager.rb', line 26

def shutdown
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @closing_connections += @connections.to_a
  cleanup
end

#unbind_connection(connection) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/zmachine/connection_manager.rb', line 115

def unbind_connection(connection)
  after_writing = false
  reason = nil
  connection, after_writing, reason = *connection if connection.is_a?(Array)
  if connection.method(:unbind).arity != 0
    connection.unbind(reason)
  else
    connection.unbind
  end
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, can_send: connection.can_send?) if ZMachine.debug
  if after_writing && connection.can_send?
    ZMachine.close_connection(connection, true)
  else
    connection.close!
    @connections.delete(connection)
    @zmq_connections.delete(connection)
  end
rescue Exception => e
  ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug
end