Module: Sweatshop

Extended by:
Sweatshop
Included in:
Sweatshop
Defined in:
lib/sweatshop.rb,
lib/sweatshop/sweatd.rb,
lib/sweatshop/worker.rb

Defined Under Namespace

Classes: Sweatd, Worker

Instance Method Summary collapse

Instance Method Details

#cluster_infoObject



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/sweatshop.rb', line 146

def cluster_info
  servers = []
  queue_groups.each do |group|
    qconfig = config[group]
    next unless qconfig
    next unless qconfig['cluster']
    servers << qconfig['cluster']
  end
  servers = servers.flatten.uniq

  servers.each do |server|
    puts "\nQueue sizes on #{server}"
    queue = MessageQueue::Rabbit.new('host' => server)
    workers.each do |worker|
      worker.queue = queue
    end
    pp_sizes
    puts
  end
  workers.each{|w| w.queue = nil}
  nil
end

#configObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/sweatshop.rb', line 66

def config
  @config ||= begin
    defaults = YAML.load_file(File.dirname(__FILE__) + '/../config/defaults.yml')
    if defined?(RAILS_ROOT)
      file = RAILS_ROOT + '/config/sweatshop.yml'
      if File.exist?(file)
        YAML.load_file(file)[RAILS_ENV || 'development']
      else
        defaults['enable'] = false
        defaults
      end
    else
      defaults
    end
  end
end

#constantize(str) ⇒ Object



186
187
188
# File 'lib/sweatshop.rb', line 186

def constantize(str)
  Object.module_eval("#{str}", __FILE__, __LINE__)
end

#daemon?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/sweatshop.rb', line 83

def daemon?
  @daemon
end

#daemonizeObject



87
88
89
# File 'lib/sweatshop.rb', line 87

def daemonize
  @daemon = true
end

#do_all_tasksObject



54
55
56
57
58
# File 'lib/sweatshop.rb', line 54

def do_all_tasks
  do_tasks(
    workers_in_group(:all)
  )
end

#do_default_tasksObject



60
61
62
63
64
# File 'lib/sweatshop.rb', line 60

def do_default_tasks
  do_tasks(
    workers_in_group(:default)
  )
end

#do_tasks(workers) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/sweatshop.rb', line 33

def do_tasks(workers)
  loop do
    wait = true
    workers.each do |worker|
      if task = worker.dequeue
        worker.do_task(task)
        wait = false
      end
    end
    if stop?
      workers.each do |worker|
        worker.stop
      end
      queue.stop
      @stop.call if @stop.kind_of?(Proc)
      exit 
    end
    sleep 1 if wait
  end
end

#enabled?Boolean

Returns:

  • (Boolean)


169
170
171
# File 'lib/sweatshop.rb', line 169

def enabled?
  !!config['enable']
end

#flush_all_queuesObject



133
134
135
136
137
# File 'lib/sweatshop.rb', line 133

def flush_all_queues
  workers.each do |worker|
    worker.flush_queue
  end
end

#log(msg) ⇒ Object



173
174
175
176
# File 'lib/sweatshop.rb', line 173

def log(msg)
  return if logger == :silent
  logger ? logger.debug(msg) : puts(msg)
end

#loggerObject



178
179
180
# File 'lib/sweatshop.rb', line 178

def logger
  @logger
end

#logger=(logger) ⇒ Object



182
183
184
# File 'lib/sweatshop.rb', line 182

def logger=(logger)
  @logger = logger
end

#pp_sizesObject



139
140
141
142
143
144
# File 'lib/sweatshop.rb', line 139

def pp_sizes
  max_width = workers.collect{|w| w.to_s.size}.max
  puts '-' * (max_width + 10)
  puts queue_sizes.collect{ |p| sprintf("%-#{max_width}s %2s", p.first, p.last) }.join("\n")
  puts '-' * (max_width + 10)
end

#queue(type = 'default') ⇒ Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/sweatshop.rb', line 110

def queue(type = 'default')
  type = config[type] ? type : 'default'
  return queues[type] if queues[type]

  qconfig = config[type]
  qtype   = qconfig['queue'] || 'rabbit'
  queue   = constantize("MessageQueue::#{qtype.capitalize}")

  queues[type] = queue.new(qconfig)
end

#queue=(queue, type = 'default') ⇒ Object



121
122
123
# File 'lib/sweatshop.rb', line 121

def queue=(queue, type = 'default')
  queues[type] = queue
end

#queue_groupsObject



129
130
131
# File 'lib/sweatshop.rb', line 129

def queue_groups
  @queue_groups ||= (workers.collect{|w| w.queue_group.to_s} << 'default').uniq
end

#queue_sizesObject



103
104
105
106
107
108
# File 'lib/sweatshop.rb', line 103

def queue_sizes
  workers.inject([]) do |all, worker|
    all << [worker, worker.queue_size]
    all
  end
end

#queuesObject



125
126
127
# File 'lib/sweatshop.rb', line 125

def queues
  @queues ||= {}
end

#stop(&block) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/sweatshop.rb', line 91

def stop(&block)
  if block_given?
    @stop = block
  else
    @stop = true
  end
end

#stop?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/sweatshop.rb', line 99

def stop?
  @stop
end

#workersObject



14
15
16
# File 'lib/sweatshop.rb', line 14

def workers
  @workers ||= []
end

#workers=(workers) ⇒ Object



18
19
20
# File 'lib/sweatshop.rb', line 18

def workers=(workers)
  @workers = workers 
end

#workers_in_group(groups) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/sweatshop.rb', line 22

def workers_in_group(groups)
  groups = [groups] unless groups.is_a?(Array)
  if groups.include?(:all)
    workers
  else
    workers.select do |worker|
      groups.include?(worker.queue_group)
    end
  end
end