Class: ZMachine::Connection
- Inherits:
-
Object
- Object
- ZMachine::Connection
- Extended by:
- Forwardable
- Defined in:
- lib/zmachine/connection.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#timer ⇒ Object
readonly
Returns the value of attribute timer.
Class Method Summary collapse
Instance Method Summary collapse
-
#acceptable! ⇒ Object
triggers.
-
#bind(address, port_or_type, &block) ⇒ Object
channel type dispatch.
- #close! ⇒ Object
- #close_connection(after_writing = false) ⇒ Object (also: #close)
- #close_connection_after_writing ⇒ Object (also: #close_after_writing)
- #comm_inactivity_timeout ⇒ Object
- #comm_inactivity_timeout=(value) ⇒ Object (also: #set_comm_inactivity_timeout)
- #connect(address, port_or_type) {|_self| ... } ⇒ Object
- #connectable! ⇒ Object
-
#connection_accepted ⇒ Object
callbacks.
- #connection_completed ⇒ Object
- #current_events ⇒ Object
- #get_idle_time ⇒ Object
- #get_peername ⇒ Object
- #mark_active! ⇒ Object
- #notify_readable? ⇒ Boolean
- #notify_writable? ⇒ Boolean
- #pending_connect_timeout=(value) ⇒ Object (also: #set_pending_connect_timeout)
- #post_init ⇒ Object
- #process_events ⇒ Object
- #readable! ⇒ Object
- #receive_data(data) ⇒ Object
- #reconnect(server, port_or_type) ⇒ Object
-
#register(selector) ⇒ Object
selector registration.
- #renew_timer ⇒ Object
- #send_data(data) ⇒ Object
- #unbind ⇒ Object
- #update_events ⇒ Object
- #valid? ⇒ Boolean
- #writable! ⇒ Object
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
12 13 14 |
# File 'lib/zmachine/connection.rb', line 12 def channel @channel end |
#timer ⇒ Object (readonly)
Returns the value of attribute timer.
13 14 15 |
# File 'lib/zmachine/connection.rb', line 13 def timer @timer end |
Class Method Details
.new(*args) ⇒ Object
15 16 17 18 19 20 21 22 |
# File 'lib/zmachine/connection.rb', line 15 def self.new(*args) allocate.instance_eval do initialize(*args) @args = args post_init self end end |
Instance Method Details
#acceptable! ⇒ Object
triggers
134 135 136 137 138 139 140 141 142 |
# File 'lib/zmachine/connection.rb', line 134 def acceptable! client = @channel.accept ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, client: client) if ZMachine.debug connection = self.class.new(*@args) connection.channel = client @block.call(connection) if @block connection.connection_accepted connection end |
#bind(address, port_or_type, &block) ⇒ Object
channel type dispatch
26 27 28 29 30 31 32 33 34 |
# File 'lib/zmachine/connection.rb', line 26 def bind(address, port_or_type, &block) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug klass = (address =~ %r{\w+://}) ? ZMQChannel : TCPChannel @channel = klass.new @channel.bind(address, port_or_type) @block = block @block.call(self) if @block && @channel.is_a?(ZMQChannel) self end |
#close! ⇒ Object
82 83 84 85 |
# File 'lib/zmachine/connection.rb', line 82 def close! @timer.cancel if @timer @channel.close! end |
#close_connection(after_writing = false) ⇒ Object Also known as: close
70 71 72 |
# File 'lib/zmachine/connection.rb', line 70 def close_connection(after_writing = false) ZMachine.close_connection(self, after_writing) end |
#close_connection_after_writing ⇒ Object Also known as: close_after_writing
76 77 78 |
# File 'lib/zmachine/connection.rb', line 76 def close_connection_after_writing close_connection(true) end |
#comm_inactivity_timeout ⇒ Object
87 88 89 |
# File 'lib/zmachine/connection.rb', line 87 def comm_inactivity_timeout @inactivity_timeout end |
#comm_inactivity_timeout=(value) ⇒ Object Also known as: set_comm_inactivity_timeout
91 92 93 |
# File 'lib/zmachine/connection.rb', line 91 def comm_inactivity_timeout=(value) @inactivity_timeout = value end |
#connect(address, port_or_type) {|_self| ... } ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/zmachine/connection.rb', line 36 def connect(address, port_or_type, &block) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug klass = (address.nil? || address =~ %r{\w+://}) ? ZMQChannel : TCPChannel @channel = klass.new @channel.connect(address, port_or_type) if address yield self if block_given? renew_timer self end |
#connectable! ⇒ Object
144 145 146 147 148 149 150 151 152 |
# File 'lib/zmachine/connection.rb', line 144 def connectable! ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug @channel.finish_connecting @timer.cancel if @timer # cancel pending connect timer mark_active! connection_completed if @channel.connected? update_events nil end |
#connection_accepted ⇒ Object
callbacks
47 48 |
# File 'lib/zmachine/connection.rb', line 47 def connection_accepted end |
#connection_completed ⇒ Object
50 51 |
# File 'lib/zmachine/connection.rb', line 50 def connection_completed end |
#current_events ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/zmachine/connection.rb', line 188 def current_events if @channel.is_a?(ZMQChannel) return SelectionKey::OP_READ end if bound? return SelectionKey::OP_ACCEPT end if connection_pending? return SelectionKey::OP_CONNECT end events = 0 events |= SelectionKey::OP_READ if notify_readable? events |= SelectionKey::OP_WRITE if notify_writable? return events end |
#get_idle_time ⇒ Object
97 98 99 |
# File 'lib/zmachine/connection.rb', line 97 def get_idle_time (System.nano_time - @last_activity) / 1_000_000 end |
#get_peername ⇒ Object
101 102 103 104 105 |
# File 'lib/zmachine/connection.rb', line 101 def get_peername if peer = @channel.peer ::Socket.pack_sockaddr_in(*peer) end end |
#mark_active! ⇒ Object
224 225 226 227 |
# File 'lib/zmachine/connection.rb', line 224 def mark_active! @last_activity = System.nano_time renew_timer if @inactivity_timeout end |
#notify_readable? ⇒ Boolean
107 108 109 |
# File 'lib/zmachine/connection.rb', line 107 def notify_readable? true end |
#notify_writable? ⇒ Boolean
111 112 113 |
# File 'lib/zmachine/connection.rb', line 111 def notify_writable? @channel.can_send? end |
#pending_connect_timeout=(value) ⇒ Object Also known as: set_pending_connect_timeout
115 116 117 |
# File 'lib/zmachine/connection.rb', line 115 def pending_connect_timeout=(value) @connect_timeout = value end |
#post_init ⇒ Object
53 54 |
# File 'lib/zmachine/connection.rb', line 53 def post_init end |
#process_events ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/zmachine/connection.rb', line 209 def process_events return unless valid? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug if @channel_key.connectable? connectable! elsif @channel_key.acceptable? acceptable! else writable! if @channel_key.writable? readable! if @channel_key.readable? end rescue Java::JavaNioChannels::CancelledKeyException ZMachine.close_connection(self) end |
#readable! ⇒ Object
154 155 156 157 158 159 160 |
# File 'lib/zmachine/connection.rb', line 154 def readable! ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug mark_active! data = @channel.read_inbound_data receive_data(data) if data nil end |
#receive_data(data) ⇒ Object
56 57 |
# File 'lib/zmachine/connection.rb', line 56 def receive_data(data) end |
#reconnect(server, port_or_type) ⇒ Object
121 122 123 |
# File 'lib/zmachine/connection.rb', line 121 def reconnect(server, port_or_type) ZMachine.reconnect(server, port_or_type, self) end |
#register(selector) ⇒ Object
selector registration
172 173 174 175 |
# File 'lib/zmachine/connection.rb', line 172 def register(selector) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, fd: @channel.selectable_fd) if ZMachine.debug @channel_key = @channel.selectable_fd.register(selector, current_events, self) end |
#renew_timer ⇒ Object
229 230 231 232 233 234 235 236 |
# File 'lib/zmachine/connection.rb', line 229 def renew_timer @timer.cancel if @timer if connection_pending? && @connect_timeout @timer = ZMachine.add_timer(@connect_timeout) { ZMachine.close_connection(self, true, Errno::ETIMEDOUT) } elsif @inactivity_timeout @timer = ZMachine.add_timer(@inactivity_timeout) { ZMachine.close_connection(self, true, Errno::ETIMEDOUT) } end end |
#send_data(data) ⇒ Object
125 126 127 128 129 130 |
# File 'lib/zmachine/connection.rb', line 125 def send_data(data) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug data = data.to_java_bytes if data.is_a?(String) # EM compat @channel.send_data(data) update_events end |
#unbind ⇒ Object
59 60 |
# File 'lib/zmachine/connection.rb', line 59 def unbind end |
#update_events ⇒ Object
182 183 184 185 186 |
# File 'lib/zmachine/connection.rb', line 182 def update_events return unless valid? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug @channel_key.interest_ops(current_events) end |
#valid? ⇒ Boolean
177 178 179 180 |
# File 'lib/zmachine/connection.rb', line 177 def valid? @channel_key && @channel_key.valid? end |