Class: IProto::EMConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
ConnectionAPI, FixedLengthProtocol
Defined in:
lib/iproto/em.rb

Direct Known Subclasses

EMCallbackConnection, EMFiberedConnection

Defined Under Namespace

Modules: FixedLengthProtocol

Constant Summary

Constants included from ConnectionAPI

ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::EMPTY_STR, ConnectionAPI::HEADER_SIZE, ConnectionAPI::PING, ConnectionAPI::PING_ID

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FixedLengthProtocol

#buffer_reset, #post_init, #receive_data

Methods included from ConnectionAPI

#next_request_id, #pack_request, #send_request

Constructor Details

#initialize(host, port, reconnect = true) ⇒ EMConnection

Returns a new instance of EMConnection.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/iproto/em.rb', line 40

def initialize(host, port, reconnect = true)
  @host = host
  @port = port
  @reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
  @should_reconnect = !!reconnect
  @reconnect_timer = nil
  @ping_timer = nil
  @connected = :init_waiting
  @waiting_requests = {}
  @waiting_for_connect = []
  @shutdown_hook = false
  @inactivity_timeout = 0
  init_protocol
  shutdown_hook
end

Instance Attribute Details

#_needed_sizeObject (readonly)

Returns the value of attribute _needed_size.



99
100
101
# File 'lib/iproto/em.rb', line 99

def _needed_size
  @_needed_size
end

#hostObject (readonly)

Returns the value of attribute host.



38
39
40
# File 'lib/iproto/em.rb', line 38

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



38
39
40
# File 'lib/iproto/em.rb', line 38

def port
  @port
end

Instance Method Details

#_do_send_request(request_type, body, request) ⇒ Object



191
192
193
194
195
# File 'lib/iproto/em.rb', line 191

def _do_send_request(request_type, body, request)
  while @waiting_requests.include?(request_id = next_request_id); end
  send_data pack_request(request_type, request_id, body)
  @waiting_requests[request_id] = request
end

#_perform_waiting_for_connect(real) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/iproto/em.rb', line 174

def _perform_waiting_for_connect(real)
  if real
    @waiting_for_connect.each do |request_type, body, request|
      ::EM.next_tick{
      _do_send_request(request_type, body, request)
      }
    end
  else
    i = -1
    @waiting_for_connect.each do |request_type, body, request|
      @waiting_requests[i] = request
      i -= 1
    end
  end
  @waiting_for_connect.clear
end

#_pingObject



106
107
108
109
# File 'lib/iproto/em.rb', line 106

def _ping
  send_data pack_request(PING, PING_ID, EMPTY_STR)
  @ping_timer = nil
end

#_send_request(request_type, body, request) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/iproto/em.rb', line 157

def _send_request(request_type, body, request)
  if @connected == true
    _do_send_request(request_type, body, request)
  elsif could_be_connected?
    @waiting_for_connect << [request_type, body, request]
    if @connected == :force
      _setup_reconnect_timer(0)
    end
  elsif ::EM.reactor_running?
    EM.next_tick{
      do_response(request, IProto::Disconnected.new("connection is closed"))
    }
  else
    do_response(request, IProto::Disconnected.new("connection is closed"))
  end
end

#_setup_reconnect_timer(timeout) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/iproto/em.rb', line 142

def _setup_reconnect_timer(timeout)
  if @reconnect_timer.nil?
    @reconnect_timer = :waiting
    shutdown_hook
    if timeout == 0
      @connected = :waiting
      reconnect @host, @port
    else
      @reconnect_timer = ::EM.add_timer(timeout) do
        reconnect @host, @port
      end
    end
  end
end

#_stop_pingerObject



56
57
58
59
60
61
# File 'lib/iproto/em.rb', line 56

def _stop_pinger
  if @ping_timer
    EM.cancel_timer @ping_timer
    @ping_timer = nil
  end
end

#closeObject



197
198
199
# File 'lib/iproto/em.rb', line 197

def close
  close_connection(false)
end

#close_connection(*args) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/iproto/em.rb', line 201

def close_connection(*args)
  @should_reconnect = nil
  if Integer === @reconnect_timer
    ::EM.cancel_timer @reconnect_timer
  end
  @reconnect_timer = nil

  if @connected == true
    super(*args)
  end
  @connected = false
  discard_requests
end

#comm_inactivity_timeout=(t) ⇒ Object



63
64
65
66
67
# File 'lib/iproto/em.rb', line 63

def comm_inactivity_timeout=(t)
  @inactivity_timeout = t
  super
  _ping
end

#connected?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/iproto/em.rb', line 69

def connected?
  @connected == true
end

#connection_completedObject



91
92
93
94
95
96
97
# File 'lib/iproto/em.rb', line 91

def connection_completed
  @reconnect_timer = nil
  @connected = true
  init_protocol
  self.comm_inactivity_timeout= @inactivity_timeout
  _perform_waiting_for_connect(true)
end

#could_be_connected?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/iproto/em.rb', line 73

def could_be_connected?
  @connected && (@connected != :force || ::EM.reactor_running?)
end

#discard_requestsObject



215
216
217
218
219
220
221
222
# File 'lib/iproto/em.rb', line 215

def discard_requests
  exc = IProto::Disconnected.new("discarded cause of disconnect")
  _perform_waiting_for_connect(false)
  @waiting_requests.keys.each do |req|
    request = @waiting_requests.delete req
    do_response request, exc
  end
end

#do_response(request, data) ⇒ Object



138
139
140
# File 'lib/iproto/em.rb', line 138

def do_response(request, data)
  request.call data
end

#init_protocolObject



100
101
102
103
104
# File 'lib/iproto/em.rb', line 100

def init_protocol
  @_needed_size = HEADER_SIZE
  @_state = :receive_header
  buffer_reset
end

#receive_chunk(chunk) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/iproto/em.rb', line 111

def receive_chunk(chunk)
  if @_state == :receive_header
    body_size = ::BinUtils.get_int32_le(chunk, 4)
    @request_id = ::BinUtils.get_int32_le(chunk, 8)
    if body_size > 0
      @_needed_size = body_size
      @_state = :receive_body
      return
    else
      chunk = ''
    end
  end
  if @request_id == PING_ID
    @_needed_size = HEADER_SIZE
    @_state = :receive_header
    if @ping_timer == nil && @inactivity_timeout > 0
      @ping_timer = ::EM.add_timer(@inactivity_timeout / 4.0, method(:_ping))
    end
    return
  end
  request = @waiting_requests.delete @request_id
  raise IProto::UnexpectedResponse.new("For request id #{@request_id}") unless request
  @_needed_size = HEADER_SIZE
  @_state = :receive_header
  do_response(request, chunk)
end

#shutdown_hookObject



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/iproto/em.rb', line 77

def shutdown_hook
  unless @shutdown_hook
    ::EM.add_shutdown_hook {
      @connected = @should_reconnect ? :force : false
      if Integer === @reconnect_timer
        ::EM.cancel_timer @reconnect_timer
      end
      @reconnect_timer = nil
      @shutdown_hook = false
    }
    @shutdown_hook = true
  end
end

#unbindObject



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/iproto/em.rb', line 224

def unbind
  _stop_pinger
  prev_connected = @connected
  @connected = false
  discard_requests
  @connected = prev_connected

  case @should_reconnect
  when true
    @reconnect_timer = nil
    unless @connected == :force
      @connected = false
      _setup_reconnect_timer(@reconnect_timeout)
    end
  when false
    if @connected == :init_waiting
      raise IProto::CouldNotConnect
    else
      raise IProto::Disconnected
    end
  when nil
    # do nothing cause we explicitely disconnected
  end
end