Class: Sweatshop::Worker
- Inherits:
-
Object
- Object
- Sweatshop::Worker
- Defined in:
- lib/sweatshop/worker.rb
Class Method Summary collapse
- .after_task(&block) ⇒ Object
- .async? ⇒ Boolean
- .before_task(&block) ⇒ Object
- .call_after_task(task) ⇒ Object
- .call_before_task(task) ⇒ Object
- .call_exception_handler(exception) ⇒ Object
- .config ⇒ Object
- .confirm ⇒ Object
- .delete_queue ⇒ Object
- .dequeue ⇒ Object
- .do_task(task) ⇒ Object
- .do_tasks ⇒ Object
- .enqueue(task) ⇒ Object
- .flush_queue ⇒ Object
- .inherited(subclass) ⇒ Object
- .instance ⇒ Object
- .log(msg) ⇒ Object
- .method_missing(method, *args, &block) ⇒ Object
- .on_exception(&block) ⇒ Object
- .queue ⇒ Object
- .queue=(queue) ⇒ Object
- .queue_group(group = nil) ⇒ Object
- .queue_name ⇒ Object
- .queue_size ⇒ Object
- .stop ⇒ Object
- .workers ⇒ Object
Instance Method Summary collapse
-
#stop ⇒ Object
called before we exit – subclass can implement this method.
Class Method Details
.after_task(&block) ⇒ Object
147 148 149 150 151 152 153 |
# File 'lib/sweatshop/worker.rb', line 147 def self.after_task(&block) if block @after_task = block else @after_task end end |
.async? ⇒ Boolean
30 31 32 |
# File 'lib/sweatshop/worker.rb', line 30 def self.async? Sweatshop.enabled? end |
.before_task(&block) ⇒ Object
139 140 141 142 143 144 145 |
# File 'lib/sweatshop/worker.rb', line 139 def self.before_task(&block) if block @before_task = block else @before_task end end |
.call_after_task(task) ⇒ Object
129 130 131 132 |
# File 'lib/sweatshop/worker.rb', line 129 def self.call_after_task(task) superclass.call_after_task(task) if superclass.respond_to?(:call_after_task) after_task.call(task) if after_task end |
.call_before_task(task) ⇒ Object
124 125 126 127 |
# File 'lib/sweatshop/worker.rb', line 124 def self.call_before_task(task) superclass.call_before_task(task) if superclass.respond_to?(:call_before_task) before_task.call(task) if before_task end |
.call_exception_handler(exception) ⇒ Object
134 135 136 137 |
# File 'lib/sweatshop/worker.rb', line 134 def self.call_exception_handler(exception) superclass.call_exception_handler(exception) if superclass.respond_to?(:call_exception_handler) on_exception.call(exception) if on_exception end |
.config ⇒ Object
38 39 40 |
# File 'lib/sweatshop/worker.rb', line 38 def self.config Sweatshop.config end |
.confirm ⇒ Object
66 67 68 |
# File 'lib/sweatshop/worker.rb', line 66 def self.confirm queue.confirm(queue_name) end |
.delete_queue ⇒ Object
50 51 52 |
# File 'lib/sweatshop/worker.rb', line 50 def self.delete_queue queue.delete(queue_name) end |
.dequeue ⇒ Object
62 63 64 |
# File 'lib/sweatshop/worker.rb', line 62 def self.dequeue queue.dequeue(queue_name) end |
.do_task(task) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/sweatshop/worker.rb', line 76 def self.do_task(task) begin call_before_task(task) queued_at = task[:queued_at] ? "(queued #{Time.at(task[:queued_at]).strftime('%Y/%m/%d %H:%M:%S')})" : '' log("Dequeuing #{queue_name}::#{task[:method]} #{queued_at}") task[:result] = instance.send(task[:method], *task[:args]) call_after_task(task) confirm rescue SystemExit exit rescue Exception => e log("Caught Exception: #{e.}, \n#{e.backtrace.join("\n")}") call_exception_handler(e) # the only way to re-queue messages with rabbitmq is to close and reopen the connection # putting a 'sleep 2' in here to give the administrator to fix peristent problems, otherwise # we'll hit an infinite loop # # THIS CODE IS PROBLEMATIC --- we need to put these tasks into a 'failed' queue so we don't run into infinite loops # will just 'confirm' for now #queue.stop #sleep 2 confirm end end |
.do_tasks ⇒ Object
70 71 72 73 74 |
# File 'lib/sweatshop/worker.rb', line 70 def self.do_tasks while task = dequeue do_task(task) end end |
.enqueue(task) ⇒ Object
58 59 60 |
# File 'lib/sweatshop/worker.rb', line 58 def self.enqueue(task) queue.enqueue(queue_name, task) end |
.flush_queue ⇒ Object
46 47 48 |
# File 'lib/sweatshop/worker.rb', line 46 def self.flush_queue queue.flush_all(queue_name) end |
.inherited(subclass) ⇒ Object
5 6 7 |
# File 'lib/sweatshop/worker.rb', line 5 def self.inherited(subclass) self.workers << subclass end |
.instance ⇒ Object
34 35 36 |
# File 'lib/sweatshop/worker.rb', line 34 def self.instance @instance ||= new end |
.log(msg) ⇒ Object
120 121 122 |
# File 'lib/sweatshop/worker.rb', line 120 def self.log(msg) Sweatshop.log(msg) end |
.method_missing(method, *args, &block) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/sweatshop/worker.rb', line 9 def self.method_missing(method, *args, &block) if method.to_s =~ /^async_(.*)/ method = $1 check_arity!(instance.method(method), args) return instance.send(method, *args) unless async? uid = ::Digest::MD5.hexdigest("#{name}:#{method}:#{args}:#{Time.now.to_f}") task = {:args => args, :method => method, :uid => uid, :queued_at => Time.now.to_i} log("Putting #{uid} on #{queue_name}") enqueue(task) uid elsif instance.respond_to?(method) instance.send(method, *args) else super end end |
.on_exception(&block) ⇒ Object
155 156 157 158 159 160 161 |
# File 'lib/sweatshop/worker.rb', line 155 def self.on_exception(&block) if block @on_exception = block else @on_exception end end |
.queue ⇒ Object
108 109 110 |
# File 'lib/sweatshop/worker.rb', line 108 def self.queue @queue ||= Sweatshop.queue(queue_group.to_s) end |
.queue=(queue) ⇒ Object
104 105 106 |
# File 'lib/sweatshop/worker.rb', line 104 def self.queue=(queue) @queue = queue end |
.queue_group(group = nil) ⇒ Object
171 172 173 |
# File 'lib/sweatshop/worker.rb', line 171 def self.queue_group(group=nil) group ? (:_queue_group){ group } : _queue_group end |
.queue_name ⇒ Object
42 43 44 |
# File 'lib/sweatshop/worker.rb', line 42 def self.queue_name @queue_name ||= self.to_s end |
.queue_size ⇒ Object
54 55 56 |
# File 'lib/sweatshop/worker.rb', line 54 def self.queue_size queue.queue_size(queue_name) end |
.stop ⇒ Object
163 164 165 |
# File 'lib/sweatshop/worker.rb', line 163 def self.stop instance.stop end |
Instance Method Details
#stop ⇒ Object
called before we exit – subclass can implement this method
168 |
# File 'lib/sweatshop/worker.rb', line 168 def stop; end |