Class: EQ::Queueing::Backends::Sequel

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/eq-queueing/backends/sequel.rb

Overview

this class provides a queueing backend via Sequel ORM mapper basically any database adapter known by Sequel is supported configure via EQ::conig

Constant Summary collapse

TABLE_NAME =
:jobs

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#debug, #info, #log_error

Constructor Details

#initialize(config) ⇒ Sequel

establishes the connection to the database and ensures that the jobs table is created



17
18
19
20
# File 'lib/eq-queueing/backends/sequel.rb', line 17

def initialize config
  connect config
  create_table_if_not_exists!
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



13
14
15
# File 'lib/eq-queueing/backends/sequel.rb', line 13

def db
  @db
end

Instance Method Details

#clearObject



112
113
114
# File 'lib/eq-queueing/backends/sequel.rb', line 112

def clear
  jobs.delete
end

#count(name = nil) ⇒ Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/eq-queueing/backends/sequel.rb', line 101

def count name=nil
  case name
  when :waiting
    waiting.count
  when :working
    working.count
  else
    jobs.count
  end
end

#iteratorObject



116
117
118
119
120
121
# File 'lib/eq-queueing/backends/sequel.rb', line 116

def iterator
  jobs.each do |job|
    job[:payload] = Marshal.load(job[:payload]) if job[:payload]
    yield job
  end
end

#jobsObject

list of all jobs



79
80
81
82
83
# File 'lib/eq-queueing/backends/sequel.rb', line 79

def jobs
  db[TABLE_NAME]
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#pop(id) ⇒ TrueClass, FalseClass

finishes a job in the working queue

Parameters:

  • id (Fixnum)

    of the job

Returns:

  • (TrueClass, FalseClass)

    true, when there was a job that could be deleted



58
59
60
61
62
# File 'lib/eq-queueing/backends/sequel.rb', line 58

def pop id
  jobs.where(id: id).delete == 1
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#push(eq_job) ⇒ Fixnum

Returns id of the job.

Parameters:

Returns:

  • (Fixnum)

    id of the job



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/eq-queueing/backends/sequel.rb', line 24

def push eq_job
  job = {queue: eq_job.queue}
  job[:payload] = Marshal.dump(eq_job.payload).to_sequel_blob unless eq_job.payload.nil?
  if eq_job.unique? && jobs.where(job).count > 0
    false
  else
    job[:created_at] = Time.now
    jobs.insert job
  end
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#requeue_timed_out_jobsFixnum

this re-enqueues jobs that timed out

Returns:

  • (Fixnum)

    number of jobs that were re-enqueued



95
96
97
98
99
# File 'lib/eq-queueing/backends/sequel.rb', line 95

def requeue_timed_out_jobs
  # older than x
  jobs.where{started_working_at <= (Time.now - EQ.config.job_timeout)}\
      .update(started_working_at: nil)
end

#reserveArray<Fixnum, String>

pulls a job from the waiting stack and moves it to the working stack. sets a timestamp :started_working_at so that the working duration can be tracked.

Parameters:

  • now (Time)

Returns:

  • (Array<Fixnum, String>)

    job data consisting of id and payload



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/eq-queueing/backends/sequel.rb', line 42

def reserve
  db.transaction do
    if job = waiting.order(:id).last # asc
      job[:started_working_at] = Time.now
      update_job!(job)
      payload = job[:payload].nil? ? nil : Marshal.load(job[:payload])
      EQ::Job.new(job[:id], job[:queue], payload)
    end
  end
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#update_job!(changed_job) ⇒ Object

updates a changed job object, uses the :id key to identify the job

Parameters:

  • changed (Hash)

    job



87
88
89
90
91
# File 'lib/eq-queueing/backends/sequel.rb', line 87

def update_job! changed_job
  jobs.where(id: changed_job[:id]).update(changed_job)
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#waitingObject

list of jobs waiting to be worked on



65
66
67
68
69
# File 'lib/eq-queueing/backends/sequel.rb', line 65

def waiting
  jobs.where(started_working_at: nil)
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#workingObject

list of jobs currentyl being worked on



72
73
74
75
76
# File 'lib/eq-queueing/backends/sequel.rb', line 72

def working
  waiting.invert
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end