Class: QC::ConnAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/conn_adapter.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ ConnAdapter

Returns a new instance of ConnAdapter.



9
10
11
12
13
# File 'lib/queue_classic/conn_adapter.rb', line 9

def initialize(args={})
  @active_record_connection_share = args[:active_record_connection_share]
  @_connection = args[:connection]
  @mutex = Mutex.new
end

Instance Method Details

#connectionObject



15
16
17
18
19
20
21
# File 'lib/queue_classic/conn_adapter.rb', line 15

def connection
  if @active_record_connection_share && Object.const_defined?('ActiveRecord')
    ActiveRecord::Base.connection.raw_connection
  else
    @_connection ||= establish_new
  end
end

#disconnectObject



51
52
53
54
55
56
57
58
59
# File 'lib/queue_classic/conn_adapter.rb', line 51

def disconnect
  @mutex.synchronize do
    begin
      connection.close
    rescue => e
      QC.log(:at => 'disconnect', :error => e.message)
    end
  end
end

#execute(stmt, *params) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/queue_classic/conn_adapter.rb', line 23

def execute(stmt, *params)
  @mutex.synchronize do
    QC.log(:at => "exec_sql", :sql => stmt.inspect)
    begin
      params = nil if params.empty?
      r = connection.exec(stmt, params)
      result = []
      r.each {|t| result << t}
      result.length > 1 ? result : result.pop
    rescue PG::Error => e
      QC.log(:error => e.inspect)
      connection.reset
      raise
    end
  end
end

#server_versionObject



61
62
63
64
65
66
# File 'lib/queue_classic/conn_adapter.rb', line 61

def server_version
  @server_version ||= begin
                        version = execute("SHOW server_version_num;")["server_version_num"]
                        version && version.to_i
                      end
end

#wait(time, *channels) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/queue_classic/conn_adapter.rb', line 40

def wait(time, *channels)
  @mutex.synchronize do
    listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'}
    connection.exec(listen_cmds.join(';'))
    wait_for_notify(time)
    unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'}
    connection.exec(unlisten_cmds.join(';'))
    drain_notify
  end
end