Class: ZMachine::Connection
- Inherits:
-
Object
- Object
- ZMachine::Connection
- Extended by:
- Forwardable
- Defined in:
- lib/zmachine/connection.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
Class Method Summary collapse
Instance Method Summary collapse
-
#acceptable! ⇒ Object
triggers.
-
#bind(address, port_or_type, &block) ⇒ Object
channel type dispatch.
- #close_connection(after_writing = false) ⇒ Object (also: #close)
- #close_connection_after_writing ⇒ Object (also: #close_after_writing)
- #comm_inactivity_timeout ⇒ Object
- #comm_inactivity_timeout=(value) ⇒ Object (also: #set_comm_inactivity_timeout)
- #connect(address, port_or_type) {|_self| ... } ⇒ Object
- #connectable! ⇒ Object
-
#connection_accepted ⇒ Object
callbacks.
- #connection_completed ⇒ Object
- #current_events ⇒ Object
- #get_idle_time ⇒ Object
- #get_peername ⇒ Object
- #mark_active! ⇒ Object
- #notify_readable? ⇒ Boolean
- #notify_writable? ⇒ Boolean
- #pending_connect_timeout=(value) ⇒ Object (also: #set_pending_connect_timeout)
- #post_init ⇒ Object
- #process_events ⇒ Object
- #readable! ⇒ Object
- #receive_data(data) ⇒ Object
- #reconnect(server, port_or_type) ⇒ Object
-
#register(selector) ⇒ Object
selector registration.
- #renew_timer ⇒ Object
- #send_data(data) ⇒ Object
- #unbind ⇒ Object
- #update_events ⇒ Object
- #valid? ⇒ Boolean
- #writable! ⇒ Object
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
12 13 14 |
# File 'lib/zmachine/connection.rb', line 12 def channel @channel end |
Class Method Details
.new(*args) ⇒ Object
14 15 16 17 18 19 20 21 |
# File 'lib/zmachine/connection.rb', line 14 def self.new(*args) allocate.instance_eval do initialize(*args) @args = args post_init self end end |
Instance Method Details
#acceptable! ⇒ Object
triggers
129 130 131 132 133 134 135 136 137 |
# File 'lib/zmachine/connection.rb', line 129 def acceptable! client = @channel.accept ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, client: client) if ZMachine.debug connection = self.class.new(*@args) connection.channel = client @block.call(connection) if @block connection.connection_accepted connection end |
#bind(address, port_or_type, &block) ⇒ Object
channel type dispatch
25 26 27 28 29 30 31 32 33 |
# File 'lib/zmachine/connection.rb', line 25 def bind(address, port_or_type, &block) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug klass = (address =~ %r{\w+://}) ? ZMQChannel : TCPChannel @channel = klass.new @channel.bind(address, port_or_type) @block = block @block.call(self) if @block && @channel.is_a?(ZMQChannel) self end |
#close_connection(after_writing = false) ⇒ Object Also known as: close
68 69 70 71 72 |
# File 'lib/zmachine/connection.rb', line 68 def close_connection(after_writing = false) @channel.close(after_writing) do ZMachine.close_connection(self) end end |
#close_connection_after_writing ⇒ Object Also known as: close_after_writing
76 77 78 |
# File 'lib/zmachine/connection.rb', line 76 def close_connection_after_writing close_connection(true) end |
#comm_inactivity_timeout ⇒ Object
82 83 84 |
# File 'lib/zmachine/connection.rb', line 82 def comm_inactivity_timeout @inactivity_timeout end |
#comm_inactivity_timeout=(value) ⇒ Object Also known as: set_comm_inactivity_timeout
86 87 88 |
# File 'lib/zmachine/connection.rb', line 86 def comm_inactivity_timeout=(value) @inactivity_timeout = value end |
#connect(address, port_or_type) {|_self| ... } ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/zmachine/connection.rb', line 35 def connect(address, port_or_type, &block) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug klass = (address.nil? || address =~ %r{\w+://}) ? ZMQChannel : TCPChannel @channel = klass.new @channel.connect(address, port_or_type) if address yield self if block_given? renew_timer self end |
#connectable! ⇒ Object
139 140 141 142 143 144 145 146 147 |
# File 'lib/zmachine/connection.rb', line 139 def connectable! ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug @channel.finish_connecting @timer.cancel if @timer # cancel pending connect timer mark_active! connection_completed if @channel.connected? update_events nil end |
#connection_accepted ⇒ Object
callbacks
46 47 |
# File 'lib/zmachine/connection.rb', line 46 def connection_accepted end |
#connection_completed ⇒ Object
49 50 |
# File 'lib/zmachine/connection.rb', line 49 def connection_completed end |
#current_events ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/zmachine/connection.rb', line 183 def current_events if @channel.is_a?(ZMQChannel) return SelectionKey::OP_READ end if bound? return SelectionKey::OP_ACCEPT end if connection_pending? return SelectionKey::OP_CONNECT end events = 0 events |= SelectionKey::OP_READ if notify_readable? events |= SelectionKey::OP_WRITE if notify_writable? return events end |
#get_idle_time ⇒ Object
92 93 94 |
# File 'lib/zmachine/connection.rb', line 92 def get_idle_time (System.nano_time - @last_activity) / 1_000_000 end |
#get_peername ⇒ Object
96 97 98 99 100 |
# File 'lib/zmachine/connection.rb', line 96 def get_peername if peer = @channel.peer ::Socket.pack_sockaddr_in(*peer) end end |
#mark_active! ⇒ Object
220 221 222 223 |
# File 'lib/zmachine/connection.rb', line 220 def mark_active! @last_activity = System.nano_time renew_timer if @inactivity_timeout end |
#notify_readable? ⇒ Boolean
102 103 104 |
# File 'lib/zmachine/connection.rb', line 102 def notify_readable? true end |
#notify_writable? ⇒ Boolean
106 107 108 |
# File 'lib/zmachine/connection.rb', line 106 def notify_writable? @channel.can_send? end |
#pending_connect_timeout=(value) ⇒ Object Also known as: set_pending_connect_timeout
110 111 112 |
# File 'lib/zmachine/connection.rb', line 110 def pending_connect_timeout=(value) @connect_timeout = value end |
#post_init ⇒ Object
52 53 |
# File 'lib/zmachine/connection.rb', line 52 def post_init end |
#process_events ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/zmachine/connection.rb', line 204 def process_events return unless valid? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug if @channel_key.connectable? connectable! elsif @channel_key.acceptable? acceptable! else writable! if @channel_key.writable? readable! if @channel_key.readable? end rescue Java::JavaNioChannels::CancelledKeyException # channel may have been closed by write handler. ignore exception and # wait for cleanup end |
#readable! ⇒ Object
149 150 151 152 153 154 155 |
# File 'lib/zmachine/connection.rb', line 149 def readable! ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug mark_active! data = @channel.read_inbound_data receive_data(data) if data nil end |
#receive_data(data) ⇒ Object
55 56 |
# File 'lib/zmachine/connection.rb', line 55 def receive_data(data) end |
#reconnect(server, port_or_type) ⇒ Object
116 117 118 |
# File 'lib/zmachine/connection.rb', line 116 def reconnect(server, port_or_type) ZMachine.reconnect(server, port_or_type, self) end |
#register(selector) ⇒ Object
selector registration
167 168 169 170 |
# File 'lib/zmachine/connection.rb', line 167 def register(selector) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, fd: @channel.selectable_fd) if ZMachine.debug @channel_key = @channel.selectable_fd.register(selector, current_events, self) end |
#renew_timer ⇒ Object
225 226 227 228 229 230 231 232 |
# File 'lib/zmachine/connection.rb', line 225 def renew_timer @timer.cancel if @timer if connection_pending? && @connect_timeout @timer = ZMachine.add_timer(@connect_timeout) { ZMachine.close_connection(self, Errno::ETIMEDOUT) } elsif @inactivity_timeout @timer = ZMachine.add_timer(@inactivity_timeout) { ZMachine.close_connection(self, Errno::ETIMEDOUT) } end end |
#send_data(data) ⇒ Object
120 121 122 123 124 125 |
# File 'lib/zmachine/connection.rb', line 120 def send_data(data) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug data = data.to_java_bytes if data.is_a?(String) # EM compat @channel.send_data(data) update_events end |
#unbind ⇒ Object
58 59 |
# File 'lib/zmachine/connection.rb', line 58 def unbind end |
#update_events ⇒ Object
177 178 179 180 181 |
# File 'lib/zmachine/connection.rb', line 177 def update_events return unless valid? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug @channel_key.interest_ops(current_events) end |
#valid? ⇒ Boolean
172 173 174 175 |
# File 'lib/zmachine/connection.rb', line 172 def valid? @channel_key && @channel_key.valid? end |