Class: Sqew::Backend::LevelDB
- Inherits:
-
Qu::Backend::Base
- Object
- Qu::Backend::Base
- Sqew::Backend::LevelDB
- Defined in:
- lib/sqew/backend/leveldb.rb
Instance Attribute Summary collapse
-
#db ⇒ Object
Returns the value of attribute db.
Instance Method Summary collapse
- #clear(*queues) ⇒ Object
- #clear_running ⇒ Object
- #close ⇒ Object
- #completed(payload) ⇒ Object
- #delete(id) ⇒ Object
- #enqueue(payload) ⇒ Object
- #failed(payload, error) ⇒ Object
- #failed_jobs ⇒ Object
- #length ⇒ Object
- #queued_jobs ⇒ Object
- #register_worker ⇒ Object
- #release ⇒ Object
- #reserve(_, options = {block:false}) ⇒ Object
- #running_jobs ⇒ Object
- #unregister_worker ⇒ Object
Instance Attribute Details
#db ⇒ Object
Returns the value of attribute db.
4 5 6 |
# File 'lib/sqew/backend/leveldb.rb', line 4 def db @db end |
Instance Method Details
#clear(*queues) ⇒ Object
15 16 17 18 |
# File 'lib/sqew/backend/leveldb.rb', line 15 def clear(*queues) drop_all(queue) if queues.include?("queue") || queues.empty? drop_all(errors) if queues.include?("failed") || queues.empty? end |
#clear_running ⇒ Object
20 21 22 |
# File 'lib/sqew/backend/leveldb.rb', line 20 def clear_running drop_all(running) end |
#close ⇒ Object
79 80 81 82 83 84 |
# File 'lib/sqew/backend/leveldb.rb', line 79 def close queue.close running.close errors.close @queue, @running, @errors = nil end |
#completed(payload) ⇒ Object
49 50 51 |
# File 'lib/sqew/backend/leveldb.rb', line 49 def completed(payload) running.delete(payload.id, :sync => true) end |
#delete(id) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/sqew/backend/leveldb.rb', line 24 def delete(id) if queue.exists?(id) queue.delete(id) elsif errors.exists?(id) errors.delete(id) end end |
#enqueue(payload) ⇒ Object
6 7 8 9 |
# File 'lib/sqew/backend/leveldb.rb', line 6 def enqueue(payload) id = Time.now.to_f.to_s queue.put(id, MultiJson.encode(klass:payload.klass.to_s, args:payload.args), :sync => true) end |
#failed(payload, error) ⇒ Object
53 54 55 56 |
# File 'lib/sqew/backend/leveldb.rb', line 53 def failed(payload, error) running.delete(payload.id, :sync => true) errors.put(payload.id, MultiJson.encode("klass" => payload.klass.to_s, "args" => payload.args, "error" => error., "backtrace" => error.backtrace.join("\n")), :sync => true) end |
#failed_jobs ⇒ Object
58 59 60 |
# File 'lib/sqew/backend/leveldb.rb', line 58 def failed_jobs errors.to_a.map {|k,v| MultiJson.decode(v).update("id" => k) } end |
#length ⇒ Object
11 12 13 |
# File 'lib/sqew/backend/leveldb.rb', line 11 def length(*) queue.keys.length end |
#queued_jobs ⇒ Object
66 67 68 |
# File 'lib/sqew/backend/leveldb.rb', line 66 def queued_jobs queue.to_a.map {|k,v| MultiJson.decode(v).update("id" => k) } end |
#register_worker ⇒ Object
73 74 |
# File 'lib/sqew/backend/leveldb.rb', line 73 def register_worker(*) end |
#release ⇒ Object
70 71 |
# File 'lib/sqew/backend/leveldb.rb', line 70 def release(*) end |
#reserve(_, options = {block:false}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/sqew/backend/leveldb.rb', line 32 def reserve(_, = {block:false}) loop do if raw = queue.first id, job = raw queue.delete(id, :sync => true) running.put(id, job, :sync => true) return Sqew::Payload.new(MultiJson.decode(job).update(id:id)) end if [:block] sleep 3 else break end end end |
#running_jobs ⇒ Object
62 63 64 |
# File 'lib/sqew/backend/leveldb.rb', line 62 def running_jobs running.to_a.map {|k,v| MultiJson.decode(v).update("id" => k) } end |
#unregister_worker ⇒ Object
76 77 |
# File 'lib/sqew/backend/leveldb.rb', line 76 def unregister_worker(*) end |