Class: Sweatshop::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sweatshop/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_task(&block) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/sweatshop/worker.rb', line 147

def self.after_task(&block)
  if block
    @after_task = block
  else
    @after_task
  end
end

.async?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/sweatshop/worker.rb', line 30

def self.async?
  Sweatshop.enabled?
end

.before_task(&block) ⇒ Object



139
140
141
142
143
144
145
# File 'lib/sweatshop/worker.rb', line 139

def self.before_task(&block)
  if block
    @before_task = block
  else
    @before_task
  end
end

.call_after_task(task) ⇒ Object



129
130
131
132
# File 'lib/sweatshop/worker.rb', line 129

def self.call_after_task(task)
  superclass.call_after_task(task) if superclass.respond_to?(:call_after_task)
  after_task.call(task) if after_task
end

.call_before_task(task) ⇒ Object



124
125
126
127
# File 'lib/sweatshop/worker.rb', line 124

def self.call_before_task(task)
  superclass.call_before_task(task) if superclass.respond_to?(:call_before_task)
  before_task.call(task) if before_task
end

.call_exception_handler(exception) ⇒ Object



134
135
136
137
# File 'lib/sweatshop/worker.rb', line 134

def self.call_exception_handler(exception)
  superclass.call_exception_handler(exception) if superclass.respond_to?(:call_exception_handler)
  on_exception.call(exception) if on_exception
end

.configObject



38
39
40
# File 'lib/sweatshop/worker.rb', line 38

def self.config
  Sweatshop.config
end

.confirmObject



66
67
68
# File 'lib/sweatshop/worker.rb', line 66

def self.confirm
  queue.confirm(queue_name)
end

.delete_queueObject



50
51
52
# File 'lib/sweatshop/worker.rb', line 50

def self.delete_queue
  queue.delete(queue_name)
end

.dequeueObject



62
63
64
# File 'lib/sweatshop/worker.rb', line 62

def self.dequeue
  queue.dequeue(queue_name)
end

.do_task(task) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/sweatshop/worker.rb', line 76

def self.do_task(task)
  begin
    call_before_task(task)

    queued_at = task[:queued_at] ? "(queued #{Time.at(task[:queued_at]).strftime('%Y/%m/%d %H:%M:%S')})" : ''
    log("Dequeuing #{queue_name}::#{task[:method]} #{queued_at}")
    task[:result] = instance.send(task[:method], *task[:args])

    call_after_task(task)
    confirm
  rescue SystemExit
    exit
  rescue Exception => e
    log("Caught Exception: #{e.message}, \n#{e.backtrace.join("\n")}")
    call_exception_handler(e)

    # the only way to re-queue messages with rabbitmq is to close and reopen the connection
    # putting a 'sleep 2' in here to give the administrator to fix peristent problems, otherwise
    # we'll hit an infinite loop
    #
    # THIS CODE IS PROBLEMATIC --- we need to put these tasks into a 'failed' queue so we don't run into infinite loops
    # will just 'confirm' for now
    #queue.stop
    #sleep 2
    confirm
  end
end

.do_tasksObject



70
71
72
73
74
# File 'lib/sweatshop/worker.rb', line 70

def self.do_tasks
  while task = dequeue
    do_task(task)
  end
end

.enqueue(task) ⇒ Object



58
59
60
# File 'lib/sweatshop/worker.rb', line 58

def self.enqueue(task)
  queue.enqueue(queue_name, task)
end

.flush_queueObject



46
47
48
# File 'lib/sweatshop/worker.rb', line 46

def self.flush_queue
  queue.flush_all(queue_name)
end

.inherited(subclass) ⇒ Object



5
6
7
# File 'lib/sweatshop/worker.rb', line 5

def self.inherited(subclass)
  self.workers << subclass
end

.instanceObject



34
35
36
# File 'lib/sweatshop/worker.rb', line 34

def self.instance
  @instance ||= new
end

.log(msg) ⇒ Object



120
121
122
# File 'lib/sweatshop/worker.rb', line 120

def self.log(msg)
  Sweatshop.log(msg)
end

.method_missing(method, *args, &block) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/sweatshop/worker.rb', line 9

def self.method_missing(method, *args, &block)
  if method.to_s =~ /^async_(.*)/
    method = $1
    check_arity!(instance.method(method), args)

    return instance.send(method, *args) unless async?

    uid  = ::Digest::MD5.hexdigest("#{name}:#{method}:#{args}:#{Time.now.to_f}")
    task = {:args => args, :method => method, :uid => uid, :queued_at => Time.now.to_i}

    log("Putting #{uid} on #{queue_name}")
    enqueue(task)

    uid
  elsif instance.respond_to?(method)
    instance.send(method, *args)
  else
    super
  end
end

.on_exception(&block) ⇒ Object



155
156
157
158
159
160
161
# File 'lib/sweatshop/worker.rb', line 155

def self.on_exception(&block)
  if block
    @on_exception = block
  else
    @on_exception
  end
end

.queueObject



108
109
110
# File 'lib/sweatshop/worker.rb', line 108

def self.queue
  @queue ||= Sweatshop.queue(queue_group.to_s)
end

.queue=(queue) ⇒ Object



104
105
106
# File 'lib/sweatshop/worker.rb', line 104

def self.queue=(queue)
  @queue = queue
end

.queue_group(group = nil) ⇒ Object



171
172
173
# File 'lib/sweatshop/worker.rb', line 171

def self.queue_group(group=nil)
  group ? meta_def(:_queue_group){ group } : _queue_group
end

.queue_nameObject



42
43
44
# File 'lib/sweatshop/worker.rb', line 42

def self.queue_name
  @queue_name ||= self.to_s
end

.queue_sizeObject



54
55
56
# File 'lib/sweatshop/worker.rb', line 54

def self.queue_size
  queue.queue_size(queue_name)
end

.stopObject



163
164
165
# File 'lib/sweatshop/worker.rb', line 163

def self.stop
  instance.stop
end

.workersObject



112
113
114
# File 'lib/sweatshop/worker.rb', line 112

def self.workers
  Sweatshop.workers
end

Instance Method Details

#stopObject

called before we exit – subclass can implement this method



168
# File 'lib/sweatshop/worker.rb', line 168

def stop; end