Module: PG::EM::Client::Watcher
- Defined in:
- lib/pg/em.rb
Instance Method Summary collapse
- #initialize(client, deferrable, send_proc) ⇒ Object
- #notify_readable ⇒ Object
- #setup_timer(timeout, adjustment = 0) ⇒ Object
Instance Method Details
#initialize(client, deferrable, send_proc) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/pg/em.rb', line 186 def initialize(client, deferrable, send_proc) @last_result = nil @client = client @deferrable = deferrable @send_proc = send_proc if (timeout = client.query_timeout) > 0 @notify_timestamp = Time.now setup_timer timeout else @timer = nil end end |
#notify_readable ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/pg/em.rb', line 213 def notify_readable result = false @client.consume_input until @client.is_busy if (single_result = @client.get_result).nil? result = @last_result Result.check_result(@client, result) detach @timer.cancel if @timer break end @last_result.clear if @last_result @last_result = single_result end rescue Exception => e detach @timer.cancel if @timer if e.is_a?(PG::Error) @client.async_autoreconnect!(@deferrable, e, &@send_proc) else @deferrable.fail(e) end else if result == false @notify_timestamp = Time.now if @timer else @deferrable.succeed(result) end end |
#setup_timer(timeout, adjustment = 0) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/pg/em.rb', line 199 def setup_timer(timeout, adjustment = 0) @timer = ::EM::Timer.new(timeout - adjustment) do if (last_interval = Time.now - @notify_timestamp) >= timeout detach @client.async_command_aborted = true @deferrable.protect do raise PG::Error, "query timeout expired (async)" end else setup_timer timeout, last_interval end end end |