Class: Litequeue
- Inherits:
-
Object
- Object
- Litequeue
- Includes:
- Litesupport::Liteconnection
- Defined in:
- lib/litestack/litequeue.rb
Overview
Litequeue is a simple queueing system for Ruby applications that allows you to push and pop values from a queue. It provides a straightforward API for creating and managing named queues, and for adding and removing values from those queues. Additionally, it offers options for scheduling pops at a certain time in the future, which can be useful for delaying processing until a later time.
Litequeue is built on top of SQLite, which makes it very fast and efficient, even when handling large volumes of data. This lightweight and easy-to-use queueing system serves as a good foundation for building more advanced job processing frameworks that require basic queuing capabilities.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_OPTIONS =
{ path: Litesupport.root.join("queue.sqlite3"), mmap_size: 32 * 1024, sync: 0 }
Instance Method Summary collapse
-
#clear(queue = nil) ⇒ Object
deletes all the entries in all queues, or if a queue name is given, deletes all entries in that specific queue.
-
#count(queue = nil) ⇒ Object
returns a count of entries in all queues, or if a queue name is given, returns the count of entries in that queue.
-
#delete(id) ⇒ Object
delete an item from the queue queue = Litequeue.new id = queue.push(“somevalue”) queue.delete(id) # => “somevalue” queue.pop # => nil.
- #find(opts = {}) ⇒ Object
-
#initialize(options = {}) ⇒ Litequeue
constructor
create a new instance of the litequeue object accepts an optional options hash which will be merged with the DEFAULT_OPTIONS queue = Litequeue.new queue.push(“somevalue”, 2) # the value will be ready to pop in 2 seconds queue.pop # => nil sleep 2 queue.pop # => “somevalue”.
-
#pop(queue = "default", limit = 1) ⇒ Object
pop an item from the queue, optionally with a specific queue name (default queue name is ‘default’).
-
#push(value, delay = 0, queue = "default") ⇒ Object
(also: #<<)
push an item to the queue, optionally specifying the queue name (defaults to default) and after how many seconds it should be ready to pop (defaults to zero) a unique job id is returned from this method, can be used later to delete it before it fires.
-
#queues_info ⇒ Object
return the size of the queue file on disk def size run_sql(“SELECT size.page_size * count.page_count FROM pragma_page_size() AS size, pragma_page_count() AS count”)[0] end.
- #repush(id, value, delay = 0, queue = "default") ⇒ Object (also: #<<<)
- #snapshot ⇒ Object
Methods included from Litesupport::Liteconnection
#close, #journal_mode, #options, #path, #size, #synchronous
Methods included from Litesupport::Forkable
Constructor Details
#initialize(options = {}) ⇒ Litequeue
create a new instance of the litequeue object accepts an optional options hash which will be merged with the DEFAULT_OPTIONS
queue = Litequeue.new
queue.push("somevalue", 2) # the value will be ready to pop in 2 seconds
queue.pop # => nil
sleep 2
queue.pop # => "somevalue"
38 39 40 |
# File 'lib/litestack/litequeue.rb', line 38 def initialize( = {}) init() end |
Instance Method Details
#clear(queue = nil) ⇒ Object
deletes all the entries in all queues, or if a queue name is given, deletes all entries in that specific queue
79 80 81 |
# File 'lib/litestack/litequeue.rb', line 79 def clear(queue = nil) run_sql("DELETE FROM queue WHERE iif(?1 IS NOT NULL, name = ?1, TRUE)", queue) end |
#count(queue = nil) ⇒ Object
returns a count of entries in all queues, or if a queue name is given, returns the count of entries in that queue
84 85 86 |
# File 'lib/litestack/litequeue.rb', line 84 def count(queue = nil) run_sql("SELECT count(*) FROM queue WHERE iif(?1 IS NOT NULL, name = ?1, TRUE)", queue)[0][0] end |
#delete(id) ⇒ Object
74 75 76 |
# File 'lib/litestack/litequeue.rb', line 74 def delete(id) run_stmt(:delete, id)[0] end |
#find(opts = {}) ⇒ Object
115 116 117 |
# File 'lib/litestack/litequeue.rb', line 115 def find(opts = {}) run_stmt(:search, (opts)) end |
#pop(queue = "default", limit = 1) ⇒ Object
pop an item from the queue, optionally with a specific queue name (default queue name is ‘default’)
62 63 64 65 66 67 |
# File 'lib/litestack/litequeue.rb', line 62 def pop(queue = "default", limit = 1) res = run_stmt(:pop, queue, limit) return res[0] if res.length == 1 return nil if res.empty? res end |
#push(value, delay = 0, queue = "default") ⇒ Object Also known as: <<
push an item to the queue, optionally specifying the queue name (defaults to default) and after how many seconds it should be ready to pop (defaults to zero) a unique job id is returned from this method, can be used later to delete it before it fires. You can push string, integer, float, true, false or nil values
45 46 47 48 49 50 51 52 |
# File 'lib/litestack/litequeue.rb', line 45 def push(value, delay = 0, queue = "default") # @todo - check if queue is busy, back off if it is # also bring back the synchronize block, to prevent # a race condition if a thread hits the busy handler # before the current thread proceeds after a backoff # id = SecureRandom.uuid # this is somehow expensive, can we improve? run_stmt(:push, queue, delay, value)[0] end |
#queues_info ⇒ Object
return the size of the queue file on disk def size
run_sql("SELECT size.page_size * count.page_count FROM pragma_page_size() AS size, pragma_page_count() AS count")[0][0]
end
93 94 95 |
# File 'lib/litestack/litequeue.rb', line 93 def queues_info run_stmt(:info) end |
#repush(id, value, delay = 0, queue = "default") ⇒ Object Also known as: <<<
54 55 56 |
# File 'lib/litestack/litequeue.rb', line 54 def repush(id, value, delay = 0, queue = "default") run_stmt(:repush, id, queue, delay, value)[0] end |
#snapshot ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/litestack/litequeue.rb', line 97 def snapshot queues = {} queues_info.each do |qc| # queues[qc[0]] = {count: qc[1], time_in_queue: {avg: qc[2], min: qc[3], max: qc[4]}} queues[qc[0]] = qc[1] end { summary: { path: path, journal_mode: journal_mode, synchronous: synchronous, size: size, jobs: count }, queues: queues } end |