Class: QC::Queue
- Inherits:
-
Object
- Object
- QC::Queue
- Defined in:
- lib/queue_classic/queue.rb
Overview
The queue class maps a queue abstraction onto a database table.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#top_bound ⇒ Object
readonly
Returns the value of attribute top_bound.
Instance Method Summary collapse
- #conn_adapter ⇒ Object
- #conn_adapter=(a) ⇒ Object
- #count ⇒ Object
- #delete(id) ⇒ Object
- #delete_all ⇒ Object
-
#enqueue(method, *args) ⇒ Object
enqueue(m,a) inserts a row into the jobs table and trigger a notification.
-
#initialize(name, top_bound = nil) ⇒ Queue
constructor
A new instance of Queue.
- #lock ⇒ Object
- #unlock(id) ⇒ Object
Constructor Details
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/queue_classic/queue.rb', line 9 def name @name end |
#top_bound ⇒ Object (readonly)
Returns the value of attribute top_bound.
9 10 11 |
# File 'lib/queue_classic/queue.rb', line 9 def top_bound @top_bound end |
Instance Method Details
#conn_adapter ⇒ Object
19 20 21 |
# File 'lib/queue_classic/queue.rb', line 19 def conn_adapter @adapter ||= QC.default_conn_adapter end |
#conn_adapter=(a) ⇒ Object
15 16 17 |
# File 'lib/queue_classic/queue.rb', line 15 def conn_adapter=(a) @adapter = a end |
#count ⇒ Object
83 84 85 86 87 88 89 |
# File 'lib/queue_classic/queue.rb', line 83 def count QC.log_yield(:measure => 'queue.count') do s = "SELECT COUNT(*) FROM #{TABLE_NAME} WHERE q_name = $1" r = conn_adapter.execute(s, name) r["count"].to_i end end |
#delete(id) ⇒ Object
70 71 72 73 74 |
# File 'lib/queue_classic/queue.rb', line 70 def delete(id) QC.log_yield(:measure => 'queue.delete') do conn_adapter.execute("DELETE FROM #{TABLE_NAME} where id = $1", id) end end |
#delete_all ⇒ Object
76 77 78 79 80 81 |
# File 'lib/queue_classic/queue.rb', line 76 def delete_all QC.log_yield(:measure => 'queue.delete_all') do s = "DELETE FROM #{TABLE_NAME} WHERE q_name = $1" conn_adapter.execute(s, name) end end |
#enqueue(method, *args) ⇒ Object
enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job’s queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a ‘.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `’hello world’‘, `[’hello world’]‘, `’hello’, ‘world’‘.
37 38 39 40 41 42 |
# File 'lib/queue_classic/queue.rb', line 37 def enqueue(method, *args) QC.log_yield(:measure => 'queue.enqueue') do s="INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)" res = conn_adapter.execute(s, name, method, JSON.dump(args)) end end |
#lock ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/queue_classic/queue.rb', line 44 def lock QC.log_yield(:measure => 'queue.lock') do s = "SELECT * FROM lock_head($1, $2)" if r = conn_adapter.execute(s, name, top_bound) {}.tap do |job| job[:id] = r["id"] job[:q_name] = r["q_name"] job[:method] = r["method"] job[:args] = JSON.parse(r["args"]) if r["created_at"] job[:created_at] = Time.parse(r["created_at"]) ttl = Integer((Time.now - job[:created_at]) * 1000) QC.measure("time-to-lock=#{ttl}ms source=#{name}") end end end end end |
#unlock(id) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/queue_classic/queue.rb', line 63 def unlock(id) QC.log_yield(:measure => 'queue.unlock') do s = "UPDATE #{TABLE_NAME} set locked_at = null where id = $1" conn_adapter.execute(s, id) end end |