Class: Que::Connection
- Inherits:
-
Object
- Object
- Que::Connection
- Extended by:
- Forwardable
- Defined in:
- lib/que/connection.rb
Instance Attribute Summary collapse
-
#wrapped_connection ⇒ Object
readonly
Returns the value of attribute wrapped_connection.
Class Method Summary collapse
Instance Method Summary collapse
- #drain_notifications ⇒ Object
- #execute(command, params = []) ⇒ Object
- #execute_prepared(command, params = nil) ⇒ Object
- #in_transaction? ⇒ Boolean
-
#initialize(connection) ⇒ Connection
constructor
A new instance of Connection.
- #next_notification ⇒ Object
- #server_version ⇒ Object
Constructor Details
#initialize(connection) ⇒ Connection
Returns a new instance of Connection.
44 45 46 47 |
# File 'lib/que/connection.rb', line 44 def initialize(connection) @wrapped_connection = connection @prepared_statements = Set.new end |
Instance Attribute Details
#wrapped_connection ⇒ Object (readonly)
Returns the value of attribute wrapped_connection.
23 24 25 |
# File 'lib/que/connection.rb', line 23 def wrapped_connection @wrapped_connection end |
Class Method Details
.wrap(conn) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/que/connection.rb', line 28 def wrap(conn) case conn when self conn when PG::Connection if conn.instance_variable_defined?(:@que_wrapper) conn.instance_variable_get(:@que_wrapper) else conn.instance_variable_set(:@que_wrapper, new(conn)) end else raise Error, "Unsupported input for Connection.wrap: #{conn.class}" end end |
Instance Method Details
#drain_notifications ⇒ Object
118 119 120 |
# File 'lib/que/connection.rb', line 118 def drain_notifications loop { break if next_notification.nil? } end |
#execute(command, params = []) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/que/connection.rb', line 49 def execute(command, params = []) sql = case command when Symbol then SQL[command] when String then command else raise Error, "Bad command! #{command.inspect}" end params = convert_params(params) result = Que.run_sql_middleware(sql, params) do # Some versions of the PG gem dislike an empty/nil params argument. if params.empty? wrapped_connection.async_exec(sql) else wrapped_connection.async_exec_params(sql, params) end end Que.internal_log :connection_execute, self do { backend_pid: backend_pid, command: command, params: params, ntuples: result.ntuples, } end convert_result(result) end |
#execute_prepared(command, params = nil) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/que/connection.rb', line 81 def execute_prepared(command, params = nil) Que.assert(Symbol, command) if !Que.use_prepared_statements || in_transaction? return execute(command, params) end name = "que_#{command}" begin unless @prepared_statements.include?(command) wrapped_connection.prepare(name, SQL[command]) @prepared_statements.add(command) prepared_just_now = true end convert_result( wrapped_connection.exec_prepared(name, params) ) rescue ::PG::InvalidSqlStatementName => error # Reconnections on ActiveRecord can cause the same connection # objects to refer to new backends, so recover as well as we can. unless prepared_just_now Que.log level: :warn, event: :reprepare_statement, command: command @prepared_statements.delete(command) retry end raise error end end |
#in_transaction? ⇒ Boolean
126 127 128 |
# File 'lib/que/connection.rb', line 126 def in_transaction? wrapped_connection.transaction_status != ::PG::PQTRANS_IDLE end |
#next_notification ⇒ Object
114 115 116 |
# File 'lib/que/connection.rb', line 114 def next_notification wrapped_connection.notifies end |
#server_version ⇒ Object
122 123 124 |
# File 'lib/que/connection.rb', line 122 def server_version wrapped_connection.server_version end |