Module: PG::EM::Client::Watcher

Defined in:
lib/pg/em.rb

Instance Method Summary collapse

Instance Method Details

#initialize(client, deferrable, send_proc) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/pg/em.rb', line 186

def initialize(client, deferrable, send_proc)
  @last_result = nil
  @client = client
  @deferrable = deferrable
  @send_proc = send_proc
  if (timeout = client.query_timeout) > 0
    @notify_timestamp = Time.now
    setup_timer timeout
  else
    @timer = nil
  end
end

#notify_readableObject



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/pg/em.rb', line 213

def notify_readable
  result = false
  @client.consume_input
  until @client.is_busy
    if (single_result = @client.get_result).nil?
      result = @last_result
      Result.check_result(@client, result)
      detach
      @timer.cancel if @timer
      break
    end
    @last_result.clear if @last_result
    @last_result = single_result
  end
rescue Exception => e
  detach
  @timer.cancel if @timer
  if e.is_a?(PG::Error)
    @client.async_autoreconnect!(@deferrable, e, &@send_proc)
  else
    @deferrable.fail(e)
  end
else
  if result == false
    @notify_timestamp = Time.now if @timer
  else
    @deferrable.succeed(result) 
  end
end

#setup_timer(timeout, adjustment = 0) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/pg/em.rb', line 199

def setup_timer(timeout, adjustment = 0)
  @timer = ::EM::Timer.new(timeout - adjustment) do
    if (last_interval = Time.now - @notify_timestamp) >= timeout
      detach
      @client.async_command_aborted = true
      @deferrable.protect do
        raise PG::Error, "query timeout expired (async)"
      end
    else
      setup_timer timeout, last_interval
    end
  end
end