Class: QC::Queue
- Inherits:
-
Object
- Object
- QC::Queue
- Defined in:
- lib/queue_classic/queue.rb
Overview
The queue class maps a queue abstraction onto a database table.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#top_bound ⇒ Object
readonly
Returns the value of attribute top_bound.
Instance Method Summary collapse
- #conn_adapter ⇒ Object
- #conn_adapter=(a) ⇒ Object
-
#count ⇒ Object
Count the number of jobs in a specific queue.
-
#count_ready ⇒ Object
Count the number of jobs in a specific queue, except ones scheduled in the future.
-
#count_scheduled ⇒ Object
Count the number of jobs in a specific queue scheduled in the future.
- #delete(id) ⇒ Object
- #delete_all ⇒ Object
-
#enqueue(method, *args) ⇒ Object
enqueue(m,a) inserts a row into the jobs table and trigger a notification.
-
#enqueue_at(timestamp, method, *args) ⇒ Object
enqueue_at(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time.
-
#enqueue_in(seconds, method, *args) ⇒ Object
enqueue_in(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset.
-
#initialize(name, top_bound = nil) ⇒ Queue
constructor
A new instance of Queue.
- #lock ⇒ Object
- #unlock(id) ⇒ Object
Constructor Details
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
10 11 12 |
# File 'lib/queue_classic/queue.rb', line 10 def name @name end |
#top_bound ⇒ Object (readonly)
Returns the value of attribute top_bound.
10 11 12 |
# File 'lib/queue_classic/queue.rb', line 10 def top_bound @top_bound end |
Instance Method Details
#conn_adapter ⇒ Object
21 22 23 |
# File 'lib/queue_classic/queue.rb', line 21 def conn_adapter @adapter ||= QC.default_conn_adapter end |
#conn_adapter=(a) ⇒ Object
17 18 19 |
# File 'lib/queue_classic/queue.rb', line 17 def conn_adapter=(a) @adapter = a end |
#count ⇒ Object
Count the number of jobs in a specific queue. This returns all jobs, including ones that are scheduled in the future.
151 152 153 |
# File 'lib/queue_classic/queue.rb', line 151 def count _count('queue.count', "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1") end |
#count_ready ⇒ Object
Count the number of jobs in a specific queue, except ones scheduled in the future
156 157 158 |
# File 'lib/queue_classic/queue.rb', line 156 def count_ready _count('queue.count_scheduled', "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1 AND scheduled_at <= now()") end |
#count_scheduled ⇒ Object
Count the number of jobs in a specific queue scheduled in the future
161 162 163 |
# File 'lib/queue_classic/queue.rb', line 161 def count_scheduled _count('queue.count_scheduled', "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1 AND scheduled_at > now()") end |
#delete(id) ⇒ Object
136 137 138 139 140 |
# File 'lib/queue_classic/queue.rb', line 136 def delete(id) QC.log_yield(:measure => 'queue.delete') do conn_adapter.execute("DELETE FROM #{QC.table_name} WHERE id = $1", id) end end |
#delete_all ⇒ Object
142 143 144 145 146 147 |
# File 'lib/queue_classic/queue.rb', line 142 def delete_all QC.log_yield(:measure => 'queue.delete_all') do s = "DELETE FROM #{QC.table_name} WHERE q_name = $1" conn_adapter.execute(s, name) end end |
#enqueue(method, *args) ⇒ Object
enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job’s queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a ‘.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `’hello world’‘, `[’hello world’]‘, `’hello’, ‘world’‘. This method returns a hash with the id of the enqueued job.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/queue_classic/queue.rb', line 40 def enqueue(method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{QC.table_name} (q_name, method, args) VALUES ($1, $2, $3) RETURNING id" begin retries ||= 0 conn_adapter.execute(s, name, method, JSON.dump(args)) rescue PG::Error if (retries += 1) < 2 retry else raise end end end end |
#enqueue_at(timestamp, method, *args) ⇒ Object
enqueue_at(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time. The time argument must be a Time object or a float timestamp. The method and args argument must be in the form described in the documentation for the #enqueue method. This method returns a hash with the id of the enqueued job.
62 63 64 65 |
# File 'lib/queue_classic/queue.rb', line 62 def enqueue_at(, method, *args) offset = Time.at().to_i - Time.now.to_i enqueue_in(offset, method, *args) end |
#enqueue_in(seconds, method, *args) ⇒ Object
enqueue_in(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset. The seconds argument must be an integer. The method and args argument must be in the form described in the documentation for the #enqueue method. This method returns a hash with the id of the enqueued job.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/queue_classic/queue.rb', line 73 def enqueue_in(seconds, method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{QC.table_name} (q_name, method, args, scheduled_at) VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds') RETURNING id" begin retries ||= 0 conn_adapter.execute(s, name, method, JSON.dump(args)) rescue PG::Error if (retries += 1) < 2 retry else raise end end end end |
#lock ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/queue_classic/queue.rb', line 91 def lock QC.log_yield(:measure => 'queue.lock') do s = <<~SQL WITH selected_job AS ( SELECT id FROM queue_classic_jobs WHERE locked_at IS NULL AND q_name = $1 AND scheduled_at <= now() LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED ) UPDATE queue_classic_jobs SET locked_at = now(), locked_by = pg_backend_pid() FROM selected_job WHERE queue_classic_jobs.id = selected_job.id RETURNING * SQL if r = conn_adapter.execute(s, name) {}.tap do |job| job[:id] = r["id"] job[:q_name] = r["q_name"] job[:method] = r["method"] job[:args] = JSON.parse(r["args"]) if r["scheduled_at"] job[:scheduled_at] = r["scheduled_at"].kind_of?(Time) ? r["scheduled_at"] : Time.parse(r["scheduled_at"]) ttl = Integer((Time.now - job[:scheduled_at]) * 1000) QC.measure("time-to-lock=#{ttl}ms source=#{name}") end end end end end |
#unlock(id) ⇒ Object
129 130 131 132 133 134 |
# File 'lib/queue_classic/queue.rb', line 129 def unlock(id) QC.log_yield(:measure => 'queue.unlock') do s = "UPDATE #{QC.table_name} SET locked_at = NULL WHERE id = $1" conn_adapter.execute(s, id) end end |