Class: Monque::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/monque.rb

Instance Method Summary collapse

Constructor Details

#initialize(queues) ⇒ Worker

Returns a new instance of Worker.



74
75
76
77
# File 'lib/monque.rb', line 74

def initialize(queues)
  @queues = queues
  @jobs = Monque.jobs_collection
end

Instance Method Details

#mark_finished(job) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/monque.rb', line 123

def mark_finished(job)
  ok = @jobs.find_and_modify(
    :query => {
      '_id' => job['_id'],
      'proc_attempts' => job['proc_attempts']
    },
    :update => job.merge({'finished' => Time.now.to_f})
  )
rescue Mongo::OperationFailure => e
  if e.message =~ /No matching object found/
    raise "This shouldn't happen - job was marked as finished though I had it reserved."
  else
    raise e
  end
end

#reserveObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/monque.rb', line 83

def reserve
  @queues.each do |q|
    speculative_job = @jobs.find(
      'queue' => q.to_s,
      'started' => {'$lte' => (Time.now.to_f - RETRY_DELAY)},
      'finished' => {'$exists' => false}
    ).sort('added', :ascending).limit(1).first

    next unless speculative_job
    
    old_procid = speculative_job['procid']
            
    gotted_job = @jobs.find_and_modify(
      :query => {
        '_id' => speculative_job['_id'],
        'proc_attempts' => speculative_job['proc_attempts']
      },
      :update => {
        '$set' =>  {'started' => Time.now.to_f},
        '$push' => {
          'proc_attempts' => {
              'id' => sprintf("%x", rand(1024**3)),
              'by' => worker_id,
              'time' => Time.now.to_f
          }
        }
      },
      :new => true
    )
    
    if gotted_job
      return gotted_job
    else
      nil
    end
  end
  
  nil
end

#workObject



139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/monque.rb', line 139

def work
  loop do
    job = reserve
    
    if job         
      cls = Kernel.fetch_class(job['class'])
      $stderr.puts "#{worker_id}: processing #{job.inspect}"
      cls.send(:perform, *JSON.load(job['data']))
      mark_finished(job)
    end
    
    sleep 5
  end
end

#worker_idObject



79
80
81
# File 'lib/monque.rb', line 79

def worker_id
  @worker_id ||= "#{`hostname`.strip}-#{$$}-#{Time.now.to_f}"
end