Class: RelaxedJob::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/relaxed_job/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(couchdb_url) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
# File 'lib/relaxed_job/queue.rb', line 8

def initialize(couchdb_url)
  @couchdb_url = couchdb_url
  @options = options
end

Instance Attribute Details

#couchdb_urlObject (readonly)

Returns the value of attribute couchdb_url.



5
6
7
# File 'lib/relaxed_job/queue.rb', line 5

def couchdb_url
  @couchdb_url
end

#optionsObject (readonly)

Returns the value of attribute options.



6
7
8
# File 'lib/relaxed_job/queue.rb', line 6

def options
  @options
end

Instance Method Details

#clear_locks!Object



40
41
42
43
44
45
46
# File 'lib/relaxed_job/queue.rb', line 40

def clear_locks!
  locked_jobs.each do |job|
    job['state'] = 'pending'
    job.delete('locked_by')
    couchdb.save_doc(job)
  end
end

#completed_jobsObject

job types #################################################################



85
86
87
# File 'lib/relaxed_job/queue.rb', line 85

def completed_jobs
  jobs_by_type(:completed)
end

#enqueue(object, *args) ⇒ Object



17
18
19
# File 'lib/relaxed_job/queue.rb', line 17

def enqueue(object, *args)
  enqueue_with_method object, :perform, *args
end

#enqueue_with_method(object, method, *args) ⇒ Object



21
22
23
24
25
26
27
28
29
30
# File 'lib/relaxed_job/queue.rb', line 21

def enqueue_with_method(object, method, *args)
  couchdb.save_doc({
    'class'     => 'job',
    'state'     => 'pending',
    'method'    => method.to_s,
    'arguments' => args.to_a,
    'object'    => Marshal.dump(object),
    'queued_at' => Time.now.utc
  })
end

#errored_jobsObject



89
90
91
# File 'lib/relaxed_job/queue.rb', line 89

def errored_jobs
  jobs_by_type(:errored)
end

#job(id) ⇒ Object



13
14
15
# File 'lib/relaxed_job/queue.rb', line 13

def job(id)
  couchdb.get(id)
end

#lock(count) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/relaxed_job/queue.rb', line 32

def lock(count)
  pending_jobs(count).each do |job|
    job['state']     = 'locked'
    job['locked_by'] = worker_name
    couchdb.save_doc(job)
  end
end

#locked_jobsObject



93
94
95
# File 'lib/relaxed_job/queue.rb', line 93

def locked_jobs
  jobs_by_type(:locked, :key => worker_name)
end

#pending_jobs(count) ⇒ Object



97
98
99
# File 'lib/relaxed_job/queue.rb', line 97

def pending_jobs(count)
  jobs_by_type(:pending, :limit => count)
end

#work(worker_name = worker_name) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/relaxed_job/queue.rb', line 48

def work(worker_name=worker_name)
  lock 3

  counts = { :complete => 0, :error => 0 }

  locked_jobs.each do |job|
    begin
      object = Marshal.load(job['object'])
      retval = object.send(job['method'], *(job['arguments']))

      counts[:complete] += 1

      job['retval']       = retval
      job['state']        = 'complete'
      job['completed_at'] = Time.now.utc
      couchdb.save_doc(job)
    rescue StandardError => ex
      counts[:error] += 1

      job['state']      = 'error'
      job['exception']  = Marshal.dump(ex)
      job['errored_at'] = Time.now.utc
      couchdb.save_doc(job)
    end
  end

  counts
end

#worker_nameObject



77
78
79
80
81
# File 'lib/relaxed_job/queue.rb', line 77

def worker_name
  "host:#{Socket.gethostname} pid:#{$$}"
rescue
  "pid:#{$$}"
end