Class: QC::Database
- Inherits:
-
Object
- Object
- QC::Database
- Defined in:
- lib/queue_classic/database.rb
Constant Summary collapse
- @@connection =
nil
Instance Attribute Summary collapse
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
-
#top_boundary ⇒ Object
readonly
Returns the value of attribute top_boundary.
Instance Method Summary collapse
- #connect ⇒ Object
- #connection ⇒ Object
- #disconnect ⇒ Object
- #drain_notify ⇒ Object
- #escape(string) ⇒ Object
- #execute(sql, *params) ⇒ Object
-
#initialize(queue_name = nil) ⇒ Database
constructor
A new instance of Database.
- #listen ⇒ Object
- #load_functions ⇒ Object
- #log(msg) ⇒ Object
- #notify ⇒ Object
- #set_application_name ⇒ Object
- #transaction ⇒ Object
- #transaction_idle? ⇒ Boolean
- #unlisten ⇒ Object
- #unload_functions ⇒ Object
- #wait_for_notify(t) ⇒ Object
Constructor Details
#initialize(queue_name = nil) ⇒ Database
Returns a new instance of Database.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/queue_classic/database.rb', line 9 def initialize(queue_name=nil) log("initialized") @top_boundary = (ENV["QC_TOP_BOUND"] || 9).to_i log("top_boundary=#{@top_boundary}") @table_name = queue_name || "queue_classic_jobs" log("table_name=#{@table_name}") @channel_name = @table_name log("channel_name=#{@channel_name}") db_url = (ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"]) @db_params = URI.parse(db_url) log("uri=#{db_url}") end |
Instance Attribute Details
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
6 7 8 |
# File 'lib/queue_classic/database.rb', line 6 def table_name @table_name end |
#top_boundary ⇒ Object (readonly)
Returns the value of attribute top_boundary.
7 8 9 |
# File 'lib/queue_classic/database.rb', line 7 def top_boundary @top_boundary end |
Instance Method Details
#connect ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/queue_classic/database.rb', line 96 def connect log("establishing connection") conn = PGconn.connect( @db_params.host, @db_params.port || 5432, nil, '', #opts, tty @db_params.path.gsub("/",""), # database name @db_params.user, @db_params.password ) if conn.status != PGconn::CONNECTION_OK log("connection error=#{conn.error}") end conn end |
#connection ⇒ Object
87 88 89 |
# File 'lib/queue_classic/database.rb', line 87 def connection @@connection ||= connect end |
#disconnect ⇒ Object
91 92 93 94 |
# File 'lib/queue_classic/database.rb', line 91 def disconnect connection.finish @@connection = nil end |
#drain_notify ⇒ Object
49 50 51 52 53 |
# File 'lib/queue_classic/database.rb', line 49 def drain_notify until connection.notifies.nil? log("draining notifications") end end |
#escape(string) ⇒ Object
30 31 32 |
# File 'lib/queue_classic/database.rb', line 30 def escape(string) connection.escape(string) end |
#execute(sql, *params) ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/queue_classic/database.rb', line 76 def execute(sql, *params) log("executing #{sql.inspect}, #{params.inspect}") begin params = nil if params.empty? connection.exec(sql, params) rescue PGError => e log("execute exception=#{e.inspect}") raise end end |
#listen ⇒ Object
39 40 41 42 |
# File 'lib/queue_classic/database.rb', line 39 def listen log("LISTEN") execute("LISTEN #{@channel_name}") end |
#load_functions ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/queue_classic/database.rb', line 112 def load_functions execute(<<-EOD) -- We are declaring the return type to be queue_classic_jobs. -- This is ok since I am assuming that all of the users added queues will -- have identical columns to queue_classic_jobs. -- When QC supports queues with columns other than the default, we will have to change this. CREATE OR REPLACE FUNCTION lock_head(tname name, top_boundary integer) RETURNS SETOF queue_classic_jobs AS $$ DECLARE unlocked integer; relative_top integer; job_count integer; BEGIN -- The purpose is to release contention for the first spot in the table. -- The select count(*) is going to slow down dequeue performance but allow -- for more workers. Would love to see some optimization here... EXECUTE 'SELECT count(*) FROM ' || '(SELECT * FROM ' || quote_ident(tname) || ' LIMIT ' || quote_literal(top_boundary) || ') limited' INTO job_count; SELECT TRUNC(random() * top_boundary + 1) INTO relative_top; IF job_count < top_boundary THEN relative_top = 0; END IF; LOOP BEGIN EXECUTE 'SELECT id FROM ' || quote_ident(tname) || ' WHERE locked_at IS NULL' || ' ORDER BY id ASC' || ' LIMIT 1' || ' OFFSET ' || quote_literal(relative_top) || ' FOR UPDATE NOWAIT' INTO unlocked; EXIT; EXCEPTION WHEN lock_not_available THEN -- do nothing. loop again and hope we get a lock END; END LOOP; RETURN QUERY EXECUTE 'UPDATE ' || quote_ident(tname) || ' SET locked_at = (CURRENT_TIMESTAMP)' || ' WHERE id = $1' || ' AND locked_at is NULL' || ' RETURNING *' USING unlocked; RETURN; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION lock_head(tname varchar) RETURNS SETOF queue_classic_jobs AS $$ BEGIN RETURN QUERY EXECUTE 'SELECT * FROM lock_head($1,10)' USING tname; END; $$ LANGUAGE plpgsql; EOD end |
#log(msg) ⇒ Object
183 184 185 |
# File 'lib/queue_classic/database.rb', line 183 def log(msg) Logger.puts(["database", msg].join(" ")) end |
#notify ⇒ Object
34 35 36 37 |
# File 'lib/queue_classic/database.rb', line 34 def notify log("NOTIFY") execute("NOTIFY #{@channel_name}") end |
#set_application_name ⇒ Object
26 27 28 |
# File 'lib/queue_classic/database.rb', line 26 def set_application_name execute("SET application_name = 'queue_classic'") end |
#transaction ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/queue_classic/database.rb', line 61 def transaction begin execute 'BEGIN' yield execute 'COMMIT' rescue Exception execute 'ROLLBACK' raise end end |
#transaction_idle? ⇒ Boolean
72 73 74 |
# File 'lib/queue_classic/database.rb', line 72 def transaction_idle? connection.transaction_status == PGconn::PQTRANS_IDLE end |
#unlisten ⇒ Object
44 45 46 47 |
# File 'lib/queue_classic/database.rb', line 44 def unlisten log("UNLISTEN") execute("UNLISTEN #{@channel_name}") end |
#unload_functions ⇒ Object
176 177 178 179 180 181 |
# File 'lib/queue_classic/database.rb', line 176 def unload_functions execute(<<-EOD) DROP FUNCTION IF EXISTS lock_head(tname varchar); DROP FUNCTION IF EXISTS lock_head(tname name, top_boundary integer); EOD end |
#wait_for_notify(t) ⇒ Object
55 56 57 58 59 |
# File 'lib/queue_classic/database.rb', line 55 def wait_for_notify(t) log("waiting for notify timeout=#{t}") connection.wait_for_notify(t) {|event, pid, msg| log("received notification #{event}")} log("done waiting for notify") end |