Class: QC::ConnAdapter
- Inherits:
-
Object
- Object
- QC::ConnAdapter
- Defined in:
- lib/queue_classic/conn_adapter.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
Instance Method Summary collapse
- #disconnect ⇒ Object
- #execute(stmt, *params) ⇒ Object
-
#initialize(c = nil) ⇒ ConnAdapter
constructor
A new instance of ConnAdapter.
- #wait(time, *channels) ⇒ Object
Constructor Details
#initialize(c = nil) ⇒ ConnAdapter
Returns a new instance of ConnAdapter.
8 9 10 11 |
# File 'lib/queue_classic/conn_adapter.rb', line 8 def initialize(c=nil) @connection = c.nil? ? establish_new : validate!(c) @mutex = Mutex.new end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
7 8 9 |
# File 'lib/queue_classic/conn_adapter.rb', line 7 def connection @connection end |
Instance Method Details
#disconnect ⇒ Object
41 42 43 44 45 46 47 48 49 |
# File 'lib/queue_classic/conn_adapter.rb', line 41 def disconnect @mutex.synchronize do begin @connection.close rescue => e QC.log(:at => 'disconnect', :error => e.) end end end |
#execute(stmt, *params) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/queue_classic/conn_adapter.rb', line 13 def execute(stmt, *params) @mutex.synchronize do QC.log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = @connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PGError => e QC.log(:error => e.inspect) @connection.reset raise end end end |
#wait(time, *channels) ⇒ Object
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/queue_classic/conn_adapter.rb', line 30 def wait(time, *channels) @mutex.synchronize do listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'} @connection.exec(listen_cmds.join(';')) wait_for_notify(time) unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'} @connection.exec(unlisten_cmds.join(';')) drain_notify end end |