Class: IProto::EMConnection
Defined Under Namespace
Modules: FixedLengthProtocol
Constant Summary
ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::EMPTY_STR, ConnectionAPI::HEADER_SIZE, ConnectionAPI::PING, ConnectionAPI::PING_ID
Instance Attribute Summary collapse
Instance Method Summary
collapse
#buffer_reset, #post_init, #receive_data
#next_request_id, #pack_request, #send_request
Constructor Details
#initialize(host, port, reconnect = true) ⇒ EMConnection
Returns a new instance of EMConnection.
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/iproto/em.rb', line 40
def initialize(host, port, reconnect = true)
@host = host
@port = port
@reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
@should_reconnect = !!reconnect
@reconnect_timer = nil
@ping_timer = nil
@connected = :init_waiting
@waiting_requests = {}
@waiting_for_connect = []
@shutdown_hook = false
@inactivity_timeout = 0
init_protocol
shutdown_hook
end
|
Instance Attribute Details
#_needed_size ⇒ Object
Returns the value of attribute _needed_size.
99
100
101
|
# File 'lib/iproto/em.rb', line 99
def _needed_size
@_needed_size
end
|
#host ⇒ Object
Returns the value of attribute host.
38
39
40
|
# File 'lib/iproto/em.rb', line 38
def host
@host
end
|
#port ⇒ Object
Returns the value of attribute port.
38
39
40
|
# File 'lib/iproto/em.rb', line 38
def port
@port
end
|
Instance Method Details
#_do_send_request(request_type, body, request) ⇒ Object
191
192
193
194
195
|
# File 'lib/iproto/em.rb', line 191
def _do_send_request(request_type, body, request)
while @waiting_requests.include?(request_id = next_request_id); end
send_data pack_request(request_type, request_id, body)
@waiting_requests[request_id] = request
end
|
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/iproto/em.rb', line 174
def _perform_waiting_for_connect(real)
if real
@waiting_for_connect.each do |request_type, body, request|
::EM.next_tick{
_do_send_request(request_type, body, request)
}
end
else
i = -1
@waiting_for_connect.each do |request_type, body, request|
@waiting_requests[i] = request
i -= 1
end
end
@waiting_for_connect.clear
end
|
#_ping ⇒ Object
106
107
108
109
|
# File 'lib/iproto/em.rb', line 106
def _ping
send_data pack_request(PING, PING_ID, EMPTY_STR)
@ping_timer = nil
end
|
#_send_request(request_type, body, request) ⇒ Object
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
# File 'lib/iproto/em.rb', line 157
def _send_request(request_type, body, request)
if @connected == true
_do_send_request(request_type, body, request)
elsif could_be_connected?
@waiting_for_connect << [request_type, body, request]
if @connected == :force
_setup_reconnect_timer(0)
end
elsif ::EM.reactor_running?
EM.next_tick{
do_response(request, IProto::Disconnected.new("connection is closed"))
}
else
do_response(request, IProto::Disconnected.new("connection is closed"))
end
end
|
#_setup_reconnect_timer(timeout) ⇒ Object
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/iproto/em.rb', line 142
def _setup_reconnect_timer(timeout)
if @reconnect_timer.nil?
@reconnect_timer = :waiting
shutdown_hook
if timeout == 0
@connected = :waiting
reconnect @host, @port
else
@reconnect_timer = ::EM.add_timer(timeout) do
reconnect @host, @port
end
end
end
end
|
#_stop_pinger ⇒ Object
56
57
58
59
60
61
|
# File 'lib/iproto/em.rb', line 56
def _stop_pinger
if @ping_timer
EM.cancel_timer @ping_timer
@ping_timer = nil
end
end
|
#close ⇒ Object
197
198
199
|
# File 'lib/iproto/em.rb', line 197
def close
close_connection(false)
end
|
#close_connection(*args) ⇒ Object
201
202
203
204
205
206
207
208
209
210
211
212
213
|
# File 'lib/iproto/em.rb', line 201
def close_connection(*args)
@should_reconnect = nil
if Integer === @reconnect_timer
::EM.cancel_timer @reconnect_timer
end
@reconnect_timer = nil
if @connected == true
super(*args)
end
@connected = false
discard_requests
end
|
#comm_inactivity_timeout=(t) ⇒ Object
63
64
65
66
67
|
# File 'lib/iproto/em.rb', line 63
def comm_inactivity_timeout=(t)
@inactivity_timeout = t
super
_ping
end
|
#connected? ⇒ Boolean
69
70
71
|
# File 'lib/iproto/em.rb', line 69
def connected?
@connected == true
end
|
#connection_completed ⇒ Object
91
92
93
94
95
96
97
|
# File 'lib/iproto/em.rb', line 91
def connection_completed
@reconnect_timer = nil
@connected = true
init_protocol
self.comm_inactivity_timeout= @inactivity_timeout
_perform_waiting_for_connect(true)
end
|
#could_be_connected? ⇒ Boolean
73
74
75
|
# File 'lib/iproto/em.rb', line 73
def could_be_connected?
@connected && (@connected != :force || ::EM.reactor_running?)
end
|
#discard_requests ⇒ Object
215
216
217
218
219
220
221
222
|
# File 'lib/iproto/em.rb', line 215
def discard_requests
exc = IProto::Disconnected.new("discarded cause of disconnect")
_perform_waiting_for_connect(false)
@waiting_requests.keys.each do |req|
request = @waiting_requests.delete req
do_response request, exc
end
end
|
#do_response(request, data) ⇒ Object
138
139
140
|
# File 'lib/iproto/em.rb', line 138
def do_response(request, data)
request.call data
end
|
#init_protocol ⇒ Object
100
101
102
103
104
|
# File 'lib/iproto/em.rb', line 100
def init_protocol
@_needed_size = HEADER_SIZE
@_state = :receive_header
buffer_reset
end
|
#receive_chunk(chunk) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/iproto/em.rb', line 111
def receive_chunk(chunk)
if @_state == :receive_header
body_size = ::BinUtils.get_int32_le(chunk, 4)
@request_id = ::BinUtils.get_int32_le(chunk, 8)
if body_size > 0
@_needed_size = body_size
@_state = :receive_body
return
else
chunk = ''
end
end
if @request_id == PING_ID
@_needed_size = HEADER_SIZE
@_state = :receive_header
if @ping_timer == nil && @inactivity_timeout > 0
@ping_timer = ::EM.add_timer(@inactivity_timeout / 4.0, method(:_ping))
end
return
end
request = @waiting_requests.delete @request_id
raise IProto::UnexpectedResponse.new("For request id #{@request_id}") unless request
@_needed_size = HEADER_SIZE
@_state = :receive_header
do_response(request, chunk)
end
|
#shutdown_hook ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/iproto/em.rb', line 77
def shutdown_hook
unless @shutdown_hook
::EM.add_shutdown_hook {
@connected = @should_reconnect ? :force : false
if Integer === @reconnect_timer
::EM.cancel_timer @reconnect_timer
end
@reconnect_timer = nil
@shutdown_hook = false
}
@shutdown_hook = true
end
end
|
#unbind ⇒ Object
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
# File 'lib/iproto/em.rb', line 224
def unbind
_stop_pinger
prev_connected = @connected
@connected = false
discard_requests
@connected = prev_connected
case @should_reconnect
when true
@reconnect_timer = nil
unless @connected == :force
@connected = false
_setup_reconnect_timer(@reconnect_timeout)
end
when false
if @connected == :init_waiting
raise IProto::CouldNotConnect
else
raise IProto::Disconnected
end
when nil
end
end
|