Class: EQ::Queueing::Backends::Sequel
- Inherits:
-
Object
- Object
- EQ::Queueing::Backends::Sequel
- Includes:
- Logging
- Defined in:
- lib/eq-queueing/backends/sequel.rb
Overview
this class provides a queueing backend via Sequel ORM mapper basically any database adapter known by Sequel is supported configure via EQ::conig
Constant Summary collapse
- TABLE_NAME =
:jobs
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
- #clear ⇒ Object
- #count(name = nil) ⇒ Object
-
#initialize(config) ⇒ Sequel
constructor
establishes the connection to the database and ensures that the jobs table is created.
- #iterator ⇒ Object
-
#jobs ⇒ Object
list of all jobs.
-
#pop(id) ⇒ TrueClass, FalseClass
finishes a job in the working queue.
-
#push(eq_job) ⇒ Fixnum
Id of the job.
-
#requeue_timed_out_jobs ⇒ Fixnum
this re-enqueues jobs that timed out.
-
#reserve ⇒ Array<Fixnum, String>
pulls a job from the waiting stack and moves it to the working stack.
-
#update_job!(changed_job) ⇒ Object
updates a changed job object, uses the :id key to identify the job.
-
#waiting ⇒ Object
list of jobs waiting to be worked on.
-
#working ⇒ Object
list of jobs currentyl being worked on.
Methods included from Logging
Constructor Details
#initialize(config) ⇒ Sequel
establishes the connection to the database and ensures that the jobs table is created
17 18 19 20 |
# File 'lib/eq-queueing/backends/sequel.rb', line 17 def initialize config connect config create_table_if_not_exists! end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
13 14 15 |
# File 'lib/eq-queueing/backends/sequel.rb', line 13 def db @db end |
Instance Method Details
#clear ⇒ Object
112 113 114 |
# File 'lib/eq-queueing/backends/sequel.rb', line 112 def clear jobs.delete end |
#count(name = nil) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/eq-queueing/backends/sequel.rb', line 101 def count name=nil case name when :waiting waiting.count when :working working.count else jobs.count end end |
#iterator ⇒ Object
116 117 118 119 120 121 |
# File 'lib/eq-queueing/backends/sequel.rb', line 116 def iterator jobs.each do |job| job[:payload] = Marshal.load(job[:payload]) if job[:payload] yield job end end |
#jobs ⇒ Object
list of all jobs
79 80 81 82 83 |
# File 'lib/eq-queueing/backends/sequel.rb', line 79 def jobs db[TABLE_NAME] rescue ::Sequel::DatabaseError => e retry if on_error e end |
#pop(id) ⇒ TrueClass, FalseClass
finishes a job in the working queue
58 59 60 61 62 |
# File 'lib/eq-queueing/backends/sequel.rb', line 58 def pop id jobs.where(id: id).delete == 1 rescue ::Sequel::DatabaseError => e retry if on_error e end |
#push(eq_job) ⇒ Fixnum
Returns id of the job.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/eq-queueing/backends/sequel.rb', line 24 def push eq_job job = {queue: eq_job.queue} job[:payload] = Marshal.dump(eq_job.payload).to_sequel_blob unless eq_job.payload.nil? if eq_job.unique? && jobs.where(job).count > 0 false else job[:created_at] = Time.now jobs.insert job end rescue ::Sequel::DatabaseError => e retry if on_error e end |
#requeue_timed_out_jobs ⇒ Fixnum
this re-enqueues jobs that timed out
95 96 97 98 99 |
# File 'lib/eq-queueing/backends/sequel.rb', line 95 def requeue_timed_out_jobs # older than x jobs.where{started_working_at <= (Time.now - EQ.config.job_timeout)}\ .update(started_working_at: nil) end |
#reserve ⇒ Array<Fixnum, String>
pulls a job from the waiting stack and moves it to the working stack. sets a timestamp :started_working_at so that the working duration can be tracked.
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/eq-queueing/backends/sequel.rb', line 42 def reserve db.transaction do if job = waiting.order(:id).last # asc job[:started_working_at] = Time.now update_job!(job) payload = job[:payload].nil? ? nil : Marshal.load(job[:payload]) EQ::Job.new(job[:id], job[:queue], payload) end end rescue ::Sequel::DatabaseError => e retry if on_error e end |
#update_job!(changed_job) ⇒ Object
updates a changed job object, uses the :id key to identify the job
87 88 89 90 91 |
# File 'lib/eq-queueing/backends/sequel.rb', line 87 def update_job! changed_job jobs.where(id: changed_job[:id]).update(changed_job) rescue ::Sequel::DatabaseError => e retry if on_error e end |
#waiting ⇒ Object
list of jobs waiting to be worked on
65 66 67 68 69 |
# File 'lib/eq-queueing/backends/sequel.rb', line 65 def waiting jobs.where(started_working_at: nil) rescue ::Sequel::DatabaseError => e retry if on_error e end |
#working ⇒ Object
list of jobs currentyl being worked on
72 73 74 75 76 |
# File 'lib/eq-queueing/backends/sequel.rb', line 72 def working waiting.invert rescue ::Sequel::DatabaseError => e retry if on_error e end |