Class: QC::ConnAdapter
- Inherits:
-
Object
- Object
- QC::ConnAdapter
- Defined in:
- lib/queue_classic/conn_adapter.rb
Instance Method Summary collapse
- #connection ⇒ Object
- #disconnect ⇒ Object
- #execute(stmt, *params) ⇒ Object
-
#initialize(args = {}) ⇒ ConnAdapter
constructor
A new instance of ConnAdapter.
- #server_version ⇒ Object
- #wait(time, *channels) ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ ConnAdapter
Returns a new instance of ConnAdapter.
9 10 11 12 13 |
# File 'lib/queue_classic/conn_adapter.rb', line 9 def initialize(args={}) @active_record_connection_share = args[:active_record_connection_share] @_connection = args[:connection] @mutex = Mutex.new end |
Instance Method Details
#connection ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/queue_classic/conn_adapter.rb', line 15 def connection if @active_record_connection_share && Object.const_defined?('ActiveRecord') ActiveRecord::Base.connection.raw_connection else @_connection ||= establish_new end end |
#disconnect ⇒ Object
51 52 53 54 55 56 57 58 59 |
# File 'lib/queue_classic/conn_adapter.rb', line 51 def disconnect @mutex.synchronize do begin connection.close rescue => e QC.log(:at => 'disconnect', :error => e.) end end end |
#execute(stmt, *params) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/queue_classic/conn_adapter.rb', line 23 def execute(stmt, *params) @mutex.synchronize do QC.log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PG::Error => e QC.log(:error => e.inspect) connection.reset raise end end end |
#server_version ⇒ Object
61 62 63 64 65 66 |
# File 'lib/queue_classic/conn_adapter.rb', line 61 def server_version @server_version ||= begin version = execute("SHOW server_version_num;")["server_version_num"] version && version.to_i end end |
#wait(time, *channels) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/queue_classic/conn_adapter.rb', line 40 def wait(time, *channels) @mutex.synchronize do listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'} connection.exec(listen_cmds.join(';')) wait_for_notify(time) unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'} connection.exec(unlisten_cmds.join(';')) drain_notify end end |