Class: RelaxedJob::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/relaxed_job/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(couchdb_url) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
# File 'lib/relaxed_job/queue.rb', line 8

def initialize(couchdb_url)
  @couchdb_url = couchdb_url
  @options = options
end

Instance Attribute Details

#couchdb_urlObject (readonly)

Returns the value of attribute couchdb_url.



5
6
7
# File 'lib/relaxed_job/queue.rb', line 5

def couchdb_url
  @couchdb_url
end

#optionsObject (readonly)

Returns the value of attribute options.



6
7
8
# File 'lib/relaxed_job/queue.rb', line 6

def options
  @options
end

Instance Method Details

#clear_locks!Object



36
37
38
39
40
41
42
# File 'lib/relaxed_job/queue.rb', line 36

def clear_locks!
  locked_jobs.each do |job|
    job['state'] = 'pending'
    job.delete('locked_by')
    couchdb.save_doc(job)
  end
end

#completed_jobsObject

job types #################################################################



80
81
82
# File 'lib/relaxed_job/queue.rb', line 80

def completed_jobs
  jobs_by_type(:completed)
end

#enqueue(object) ⇒ Object



13
14
15
# File 'lib/relaxed_job/queue.rb', line 13

def enqueue(object)
  enqueue_with_method object, :perform
end

#enqueue_with_method(object, method, *args) ⇒ Object



17
18
19
20
21
22
23
24
25
26
# File 'lib/relaxed_job/queue.rb', line 17

def enqueue_with_method(object, method, *args)
  couchdb.save_doc({
    'class'     => 'job',
    'state'     => 'pending',
    'method'    => method.to_s,
    'arguments' => args.to_a,
    'object'    => Marshal.dump(object),
    'queued_at' => Time.now.utc
  })
end

#errored_jobsObject



84
85
86
# File 'lib/relaxed_job/queue.rb', line 84

def errored_jobs
  jobs_by_type(:errored)
end

#lock(count) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/relaxed_job/queue.rb', line 28

def lock(count)
  pending_jobs(count).each do |job|
    job['state']     = 'locked'
    job['locked_by'] = worker_name
    couchdb.save_doc(job)
  end
end

#locked_jobsObject



88
89
90
# File 'lib/relaxed_job/queue.rb', line 88

def locked_jobs
  jobs_by_type(:locked, :key => worker_name)
end

#pending_jobs(count) ⇒ Object



92
93
94
# File 'lib/relaxed_job/queue.rb', line 92

def pending_jobs(count)
  jobs_by_type(:pending, :limit => count)
end

#work(worker_name = worker_name) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/relaxed_job/queue.rb', line 44

def work(worker_name=worker_name)
  lock 3

  counts = { :complete => 0, :error => 0 }

  locked_jobs.each do |job|
    begin
      object = Marshal.load(job['object'])
      object.send(job['method'], *(job['arguments']))

      counts[:complete] += 1

      job['state']        = 'complete'
      job['completed_at'] = Time.now.utc
      couchdb.save_doc(job)
    rescue StandardError => ex
      counts[:error] += 1

      job['state']      = 'error'
      job['exception']  = Marshal.dump(ex)
      job['errored_at'] = Time.now.utc
      couchdb.save_doc(job)
    end
  end

  counts
end

#worker_nameObject



72
73
74
75
76
# File 'lib/relaxed_job/queue.rb', line 72

def worker_name
  "host:#{Socket.gethostname} pid:#{$$}"
rescue
  "pid:#{$$}"
end