Module: Packet::Core::CommonMethods
- Includes:
- NbioHelper
- Defined in:
- lib/packet/packet_core.rb
Overview
end of module#ClassMethods
Instance Method Summary
collapse
-
#accept_connection(sock_opts) ⇒ Object
-
#add_periodic_timer(interval, &block) ⇒ Object
-
#add_timer(elapsed_time, &block) ⇒ Object
-
#binding_str ⇒ Object
-
#cancel_timer(t_timer) ⇒ Object
-
#cancel_write(t_sock) ⇒ Object
-
#check_for_timer_events ⇒ Object
-
#close_connection(sock = nil) ⇒ Object
close the connection with internal specified socket.
-
#complete_connection(t_sock, sock_opts) ⇒ Object
-
#connect(ip, port, t_module, &block) ⇒ Object
-
#decorate_handler(t_socket, actually_connected, sock_addr, t_module, &block) ⇒ Object
-
#handle_external_messages(t_sock) ⇒ Object
-
#handle_internal_messages(t_sock) ⇒ Object
-
#handle_read_event(p_ready_fds) ⇒ Object
-
#handle_write_event(p_ready_fds) ⇒ Object
-
#immediate_complete(t_socket, sock_addr, t_module, &block) ⇒ Object
-
#initialize ⇒ Object
-
#initialize_handler(p_module) ⇒ Object
-
#next_turn(&block) ⇒ Object
-
#read_external_socket(t_sock) ⇒ Object
-
#reconnect(server, port, handler) ⇒ Object
-
#remove_connection(t_sock) ⇒ Object
method removes the connection and closes the socket.
-
#schedule_write(t_sock, internal_instance = nil) ⇒ Object
-
#shutdown ⇒ Object
-
#start_reactor ⇒ Object
method starts event loop in the process.
-
#start_server(ip, port, t_module, &block) ⇒ Object
method opens a socket for listening.
-
#terminate_me ⇒ Object
-
#unix? ⇒ Boolean
-
#windows? ⇒ Boolean
Methods included from NbioHelper
#dump_object, #gen_worker_key, #object_dump, #packet_classify, #read_data, #write_and_schedule, #write_once
Instance Method Details
#accept_connection(sock_opts) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/packet/packet_core.rb', line 69
def accept_connection(sock_opts)
sock_io = sock_opts[:socket]
begin
client_socket,client_sockaddr = sock_io.accept_nonblock
client_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR
return
end
read_ios << client_socket
decorate_handler(client_socket,true,client_sockaddr,sock_opts[:module],&sock_opts[:block])
end
|
#add_periodic_timer(interval, &block) ⇒ Object
237
238
239
240
241
|
# File 'lib/packet/packet_core.rb', line 237
def add_periodic_timer(interval,&block)
t_timer = PeriodicEvent.new(interval,&block)
@timer_hash[t_timer.timer_signature] = t_timer
return t_timer
end
|
#add_timer(elapsed_time, &block) ⇒ Object
243
244
245
246
247
248
|
# File 'lib/packet/packet_core.rb', line 243
def add_timer(elapsed_time,&block)
t_timer = Event.new(elapsed_time,&block)
@timer_hash[t_timer.timer_signature] = t_timer
return t_timer
end
|
#binding_str ⇒ Object
254
255
256
257
|
# File 'lib/packet/packet_core.rb', line 254
def binding_str
@binding += 1
"BIND_#{@binding}"
end
|
#cancel_timer(t_timer) ⇒ Object
250
251
252
|
# File 'lib/packet/packet_core.rb', line 250
def cancel_timer(t_timer)
@timer_hash.delete(t_timer.timer_signature)
end
|
#cancel_write(t_sock) ⇒ Object
165
166
167
168
169
170
171
172
173
174
175
|
# File 'lib/packet/packet_core.rb', line 165
def cancel_write(t_sock)
if !t_sock.closed?
fileno = t_sock.fileno
if UNIXSocket === t_sock
internal_scheduled_write.delete(fileno)
else
write_scheduled.delete(fileno)
end
end
write_ios.delete(t_sock)
end
|
#check_for_timer_events ⇒ Object
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
|
# File 'lib/packet/packet_core.rb', line 288
def check_for_timer_events
ready_timers = @timer_hash.collect { |key,timer| timer if timer.run_now? }.compact
ready_timers.each { |timer| timer.run }
@timer_hash.delete_if { |key,timer|
timer.cancel_flag || (!timer.respond_to?(:interval) && ready_timers.include?(timer)) || false
}
end
|
#close_connection(sock = nil) ⇒ Object
close the connection with internal specified socket
308
309
310
311
312
313
314
|
# File 'lib/packet/packet_core.rb', line 308
def close_connection(sock = nil)
begin
read_ios.delete(sock.fileno)
write_ios.delete(sock.fileno)
sock.close
rescue; end
end
|
#complete_connection(t_sock, sock_opts) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/packet/packet_core.rb', line 81
def complete_connection(t_sock,sock_opts)
actually_connected = true
begin
t_sock.connect_nonblock(sock_opts[:sock_addr])
rescue Errno::EISCONN
puts "Socket already connected"
rescue Errno::ECONNREFUSED
actually_connected = false
end
connection_completion_awaited.delete(t_sock.fileno)
read_ios << t_sock if actually_connected
write_ios.delete(t_sock)
decorate_handler(t_sock,actually_connected,sock_opts[:sock_addr],\
sock_opts[:module],&sock_opts[:block])
end
|
#connect(ip, port, t_module, &block) ⇒ Object
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/packet/packet_core.rb', line 39
def connect(ip,port,t_module,&block)
t_socket = Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)
t_sock_addr = Socket.sockaddr_in(port,ip)
t_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
connection_completion_awaited[t_socket.fileno] =
{ :sock_addr => t_sock_addr, :module => t_module,:block => block }
begin
t_socket.connect_nonblock(t_sock_addr)
immediate_complete(t_socket,t_sock_addr,t_module,&block)
rescue Errno::EINPROGRESS
write_ios << t_socket
end
end
|
#decorate_handler(t_socket, actually_connected, sock_addr, t_module, &block) ⇒ Object
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
# File 'lib/packet/packet_core.rb', line 327
def decorate_handler(t_socket,actually_connected,sock_addr,t_module,&block)
handler_instance = initialize_handler(t_module)
after_connection_callbacks = connection_callbacks ? connection_callbacks[:after_connection] : nil
after_connection_callbacks && after_connection_callbacks.each { |t_callback| self.send(t_callback,handler_instance,t_socket)}
handler_instance.worker = self
handler_instance.connection = t_socket
handler_instance.reactor = self
handler_instance.invoke_init unless handler_instance.initialized
unless actually_connected
handler_instance.unbind
remove_connection(t_socket)
return
end
handler_instance.signature = binding_str
connection_data = { :socket => t_socket,:instance => handler_instance,:signature => binding_str,:sock_addr => sock_addr }
connections[t_socket.fileno] = connection_data
block.call(handler_instance) if block
handler_instance.connection_completed handler_instance
end
|
#handle_external_messages(t_sock) ⇒ Object
217
218
219
220
221
222
223
224
|
# File 'lib/packet/packet_core.rb', line 217
def handle_external_messages(t_sock)
sock_fd = t_sock.fileno
if sock_opts = listen_sockets[sock_fd]
accept_connection(sock_opts)
else
read_external_socket(t_sock)
end
end
|
#handle_internal_messages(t_sock) ⇒ Object
213
214
215
|
# File 'lib/packet/packet_core.rb', line 213
def handle_internal_messages(t_sock)
raise "Method should be implemented by concerned classes"
end
|
#handle_read_event(p_ready_fds) ⇒ Object
190
191
192
193
194
195
196
197
198
199
|
# File 'lib/packet/packet_core.rb', line 190
def handle_read_event(p_ready_fds)
ready_fds = p_ready_fds.flatten.compact
ready_fds.each do |t_sock|
if(t_sock.is_a?(UNIXSocket))
handle_internal_messages(t_sock)
else
handle_external_messages(t_sock)
end
end
end
|
#handle_write_event(p_ready_fds) ⇒ Object
177
178
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/packet/packet_core.rb', line 177
def handle_write_event(p_ready_fds)
p_ready_fds.each do |sock_fd|
fileno = sock_fd.fileno
if UNIXSocket === sock_fd && (internal_instance = internal_scheduled_write[fileno])
internal_instance.write_and_schedule(sock_fd)
elsif extern_opts = connection_completion_awaited[fileno]
complete_connection(sock_fd,extern_opts)
elsif handler_instance = write_scheduled[fileno]
handler_instance.write_and_schedule(sock_fd)
end
end
end
|
62
63
64
65
66
67
|
# File 'lib/packet/packet_core.rb', line 62
def immediate_complete(t_socket,sock_addr,t_module,&block)
read_ios << t_socket
write_ios.delete(t_socket)
decorate_handler(t_socket,true,sock_addr,t_module,&block)
connection_completion_awaited.delete(t_socket.fileno)
end
|
#initialize ⇒ Object
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
# File 'lib/packet/packet_core.rb', line 259
def initialize
@read_ios ||= []
@write_ios ||= []
@connection_completion_awaited ||= {}
@write_scheduled ||= {}
@internal_scheduled_write ||= {}
@outbound_data = []
@connections ||= {}
@listen_sockets ||= {}
@binding = 0
@on_next_tick = nil
@timer_hash ||= {}
@windows_flag = windows?
@reactor = self
end
|
#initialize_handler(p_module) ⇒ Object
316
317
318
319
320
321
322
323
324
325
|
# File 'lib/packet/packet_core.rb', line 316
def initialize_handler(p_module)
return p_module if(!p_module.is_a?(Class) and !p_module.is_a?(Module))
handler =
if(p_module and p_module.is_a?(Class))
p_module and p_module.send(:include,Connection)
else
Class.new { include Connection; include p_module; }
end
return handler.new
end
|
#next_turn(&block) ⇒ Object
112
113
114
|
# File 'lib/packet/packet_core.rb', line 112
def next_turn &block
@on_next_tick = block
end
|
#read_external_socket(t_sock) ⇒ Object
226
227
228
229
230
231
232
233
234
235
|
# File 'lib/packet/packet_core.rb', line 226
def read_external_socket(t_sock)
handler_instance = connections[t_sock.fileno][:instance]
begin
t_data = read_data(t_sock)
handler_instance.receive_data(t_data)
rescue DisconnectError => sock_error
handler_instance.receive_data(sock_error.data) unless (sock_error.data).empty?
handler_instance.close_connection
end
end
|
#reconnect(server, port, handler) ⇒ Object
54
55
56
57
58
59
60
|
# File 'lib/packet/packet_core.rb', line 54
def reconnect(server,port,handler)
raise "invalid handler" unless handler.respond_to?(:connection_completed)
if !handler.connection.closed? && connections.keys.include?(handler.connection.fileno)
return handler
end
connect(server,port,handler)
end
|
#remove_connection(t_sock) ⇒ Object
method removes the connection and closes the socket
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/packet/packet_core.rb', line 99
def remove_connection(t_sock)
read_ios.delete(t_sock)
write_ios.delete(t_sock)
begin
unless t_sock.closed?
connections.delete(t_sock.fileno)
t_sock.close
end
rescue
puts "#{$!.message}"
end
end
|
#schedule_write(t_sock, internal_instance = nil) ⇒ Object
154
155
156
157
158
159
160
161
162
163
|
# File 'lib/packet/packet_core.rb', line 154
def schedule_write(t_sock,internal_instance = nil)
fileno = t_sock.fileno
if UNIXSocket === t_sock && internal_scheduled_write[fileno].nil?
write_ios << t_sock
internal_scheduled_write[t_sock.fileno] ||= internal_instance
elsif write_scheduled[fileno].nil? && !(t_sock.is_a?(UNIXSocket))
write_ios << t_sock
write_scheduled[fileno] ||= connections[fileno][:instance]
end
end
|
#shutdown ⇒ Object
207
208
209
210
211
|
# File 'lib/packet/packet_core.rb', line 207
def shutdown
exit
end
|
#start_reactor ⇒ Object
method starts event loop in the process
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/packet/packet_core.rb', line 136
def start_reactor
Signal.trap("TERM") { terminate_me }
Signal.trap("INT") { shutdown }
loop do
check_for_timer_events
@on_next_tick.call if @on_next_tick
ready_read_fds,ready_write_fds,read_error_fds = select(read_ios,write_ios,[],0.005)
if ready_read_fds && !ready_read_fds.empty?
handle_read_event(ready_read_fds)
elsif ready_write_fds && !ready_write_fds.empty?
handle_write_event(ready_write_fds)
end
end
end
|
#start_server(ip, port, t_module, &block) ⇒ Object
method opens a socket for listening
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/packet/packet_core.rb', line 117
def start_server(ip,port,t_module,&block)
BasicSocket.do_not_reverse_lookup = true
t_socket = Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)
t_socket.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR,true)
sockaddr = Socket.sockaddr_in(port.to_i,ip)
t_socket.bind(sockaddr)
t_socket.listen(50)
t_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
listen_sockets[t_socket.fileno] = { :socket => t_socket,:block => block,:module => t_module }
@read_ios << t_socket
end
|
#terminate_me ⇒ Object
201
202
203
204
205
|
# File 'lib/packet/packet_core.rb', line 201
def terminate_me
exit
end
|
#unix? ⇒ Boolean
284
285
286
|
# File 'lib/packet/packet_core.rb', line 284
def unix?
!@windows_flag
end
|
#windows? ⇒ Boolean
279
280
281
282
|
# File 'lib/packet/packet_core.rb', line 279
def windows?
return true if RUBY_PLATFORM =~ /win32/i
return false
end
|