Module: Sweatshop
- Extended by:
- Sweatshop
- Included in:
- Sweatshop
- Defined in:
- lib/sweatshop.rb,
lib/sweatshop/sweatd.rb,
lib/sweatshop/worker.rb
Defined Under Namespace
Instance Method Summary collapse
- #cluster_info ⇒ Object
- #config ⇒ Object
- #constantize(str) ⇒ Object
- #daemon? ⇒ Boolean
- #daemonize ⇒ Object
- #do_all_tasks ⇒ Object
- #do_default_tasks ⇒ Object
- #do_tasks(workers) ⇒ Object
- #enabled? ⇒ Boolean
- #flush_all_queues ⇒ Object
- #log(msg) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #pp_sizes ⇒ Object
- #queue(type = 'default') ⇒ Object
- #queue=(queue, type = 'default') ⇒ Object
- #queue_groups ⇒ Object
- #queue_sizes ⇒ Object
- #queues ⇒ Object
- #stop(&block) ⇒ Object
- #stop? ⇒ Boolean
- #workers ⇒ Object
- #workers=(workers) ⇒ Object
- #workers_in_group(groups) ⇒ Object
Instance Method Details
#cluster_info ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/sweatshop.rb', line 146 def cluster_info servers = [] queue_groups.each do |group| qconfig = config[group] next unless qconfig next unless qconfig['cluster'] servers << qconfig['cluster'] end servers = servers.flatten.uniq servers.each do |server| puts "\nQueue sizes on #{server}" queue = MessageQueue::Rabbit.new('host' => server) workers.each do |worker| worker.queue = queue end pp_sizes puts end workers.each{|w| w.queue = nil} nil end |
#config ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/sweatshop.rb', line 66 def config @config ||= begin defaults = YAML.load_file(File.dirname(__FILE__) + '/../config/defaults.yml') if defined?(RAILS_ROOT) file = RAILS_ROOT + '/config/sweatshop.yml' if File.exist?(file) YAML.load_file(file)[RAILS_ENV || 'development'] else defaults['enable'] = false defaults end else defaults end end end |
#constantize(str) ⇒ Object
186 187 188 |
# File 'lib/sweatshop.rb', line 186 def constantize(str) Object.module_eval("#{str}", __FILE__, __LINE__) end |
#daemon? ⇒ Boolean
83 84 85 |
# File 'lib/sweatshop.rb', line 83 def daemon? @daemon end |
#daemonize ⇒ Object
87 88 89 |
# File 'lib/sweatshop.rb', line 87 def daemonize @daemon = true end |
#do_all_tasks ⇒ Object
54 55 56 57 58 |
# File 'lib/sweatshop.rb', line 54 def do_all_tasks do_tasks( workers_in_group(:all) ) end |
#do_default_tasks ⇒ Object
60 61 62 63 64 |
# File 'lib/sweatshop.rb', line 60 def do_default_tasks do_tasks( workers_in_group(:default) ) end |
#do_tasks(workers) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/sweatshop.rb', line 33 def do_tasks(workers) loop do wait = true workers.each do |worker| if task = worker.dequeue worker.do_task(task) wait = false end end if stop? workers.each do |worker| worker.stop end queue.stop @stop.call if @stop.kind_of?(Proc) exit end sleep 1 if wait end end |
#enabled? ⇒ Boolean
169 170 171 |
# File 'lib/sweatshop.rb', line 169 def enabled? !!config['enable'] end |
#flush_all_queues ⇒ Object
133 134 135 136 137 |
# File 'lib/sweatshop.rb', line 133 def flush_all_queues workers.each do |worker| worker.flush_queue end end |
#log(msg) ⇒ Object
173 174 175 176 |
# File 'lib/sweatshop.rb', line 173 def log(msg) return if logger == :silent logger ? logger.debug(msg) : puts(msg) end |
#logger ⇒ Object
178 179 180 |
# File 'lib/sweatshop.rb', line 178 def logger @logger end |
#logger=(logger) ⇒ Object
182 183 184 |
# File 'lib/sweatshop.rb', line 182 def logger=(logger) @logger = logger end |
#pp_sizes ⇒ Object
139 140 141 142 143 144 |
# File 'lib/sweatshop.rb', line 139 def pp_sizes max_width = workers.collect{|w| w.to_s.size}.max puts '-' * (max_width + 10) puts queue_sizes.collect{ |p| sprintf("%-#{max_width}s %2s", p.first, p.last) }.join("\n") puts '-' * (max_width + 10) end |
#queue(type = 'default') ⇒ Object
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/sweatshop.rb', line 110 def queue(type = 'default') type = config[type] ? type : 'default' return queues[type] if queues[type] qconfig = config[type] qtype = qconfig['queue'] || 'rabbit' queue = constantize("MessageQueue::#{qtype.capitalize}") queues[type] = queue.new(qconfig) end |
#queue=(queue, type = 'default') ⇒ Object
121 122 123 |
# File 'lib/sweatshop.rb', line 121 def queue=(queue, type = 'default') queues[type] = queue end |
#queue_groups ⇒ Object
129 130 131 |
# File 'lib/sweatshop.rb', line 129 def queue_groups @queue_groups ||= (workers.collect{|w| w.queue_group.to_s} << 'default').uniq end |
#queue_sizes ⇒ Object
103 104 105 106 107 108 |
# File 'lib/sweatshop.rb', line 103 def queue_sizes workers.inject([]) do |all, worker| all << [worker, worker.queue_size] all end end |
#queues ⇒ Object
125 126 127 |
# File 'lib/sweatshop.rb', line 125 def queues @queues ||= {} end |
#stop(&block) ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/sweatshop.rb', line 91 def stop(&block) if block_given? @stop = block else @stop = true end end |
#stop? ⇒ Boolean
99 100 101 |
# File 'lib/sweatshop.rb', line 99 def stop? @stop end |
#workers ⇒ Object
14 15 16 |
# File 'lib/sweatshop.rb', line 14 def workers @workers ||= [] end |
#workers=(workers) ⇒ Object
18 19 20 |
# File 'lib/sweatshop.rb', line 18 def workers=(workers) @workers = workers end |
#workers_in_group(groups) ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/sweatshop.rb', line 22 def workers_in_group(groups) groups = [groups] unless groups.is_a?(Array) if groups.include?(:all) workers else workers.select do |worker| groups.include?(worker.queue_group) end end end |