Module: SweatShop
- Extended by:
- SweatShop
- Included in:
- SweatShop
- Defined in:
- lib/sweat_shop.rb,
lib/sweat_shop/sweatd.rb,
lib/sweat_shop/worker.rb
Defined Under Namespace
Instance Method Summary collapse
- #cluster_info ⇒ Object
- #config ⇒ Object
- #constantize(str) ⇒ Object
- #do_all_tasks ⇒ Object
- #do_default_tasks ⇒ Object
- #do_tasks(workers) ⇒ Object
- #log(msg) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #pp_sizes ⇒ Object
- #queue(type = 'default') ⇒ Object
- #queue=(queue, type = 'default') ⇒ Object
- #queue_groups ⇒ Object
- #queue_sizes ⇒ Object
- #queues ⇒ Object
- #stop ⇒ Object
- #stop? ⇒ Boolean
- #workers ⇒ Object
- #workers=(workers) ⇒ Object
- #workers_in_group(groups) ⇒ Object
Instance Method Details
#cluster_info ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/sweat_shop.rb', line 136 def cluster_info servers = [] queue_groups.each do |group| qconfig = config[group] next unless qconfig next unless qconfig['cluster'] servers << qconfig['cluster'] end servers.flatten! servers.each do |server| puts "\nQueue sizes on #{server}" queue = MessageQueue::Rabbit.new('host' => server) queue_groups.each do |group| queues[group] = queue end pp_sizes puts end @queues = {} nil end |
#config ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/sweat_shop.rb', line 73 def config @config ||= begin defaults = YAML.load_file(File.dirname(__FILE__) + '/../config/defaults.yml') if defined?(RAILS_ROOT) file = RAILS_ROOT + '/config/sweatshop.yml' if File.exist?(file) YAML.load_file(file)[RAILS_ENV || 'development'] else defaults['enable'] = false defaults end else defaults end end end |
#constantize(str) ⇒ Object
172 173 174 |
# File 'lib/sweat_shop.rb', line 172 def constantize(str) Object.module_eval("#{str}", __FILE__, __LINE__) end |
#do_all_tasks ⇒ Object
61 62 63 64 65 |
# File 'lib/sweat_shop.rb', line 61 def do_all_tasks do_tasks( workers_in_group(:all) ) end |
#do_default_tasks ⇒ Object
67 68 69 70 71 |
# File 'lib/sweat_shop.rb', line 67 def do_default_tasks do_tasks( workers_in_group(:default) ) end |
#do_tasks(workers) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/sweat_shop.rb', line 33 def do_tasks(workers) if queue.subscribe? EM.run do workers.each do |worker| worker.subscribe end end else loop do wait = true workers.each do |worker| if task = worker.dequeue worker.do_task(task) wait = false end end if stop? workers.each do |worker| worker.stop end queue.stop exit end sleep 1 if wait end end end |
#log(msg) ⇒ Object
159 160 161 162 |
# File 'lib/sweat_shop.rb', line 159 def log(msg) return if logger == :silent logger ? logger.debug(msg) : puts(msg) end |
#logger ⇒ Object
164 165 166 |
# File 'lib/sweat_shop.rb', line 164 def logger @logger end |
#logger=(logger) ⇒ Object
168 169 170 |
# File 'lib/sweat_shop.rb', line 168 def logger=(logger) @logger = logger end |
#pp_sizes ⇒ Object
129 130 131 132 133 134 |
# File 'lib/sweat_shop.rb', line 129 def pp_sizes max_width = workers.collect{|w| w.to_s.size}.max puts '-' * (max_width + 10) puts queue_sizes.collect{ |p| sprintf("%-#{max_width}s %2s", p.first, p.last) }.join("\n") puts '-' * (max_width + 10) end |
#queue(type = 'default') ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/sweat_shop.rb', line 106 def queue(type = 'default') type = config[type] ? type : 'default' return queues[type] if queues[type] qconfig = config[type] qtype = qconfig['queue'] || 'rabbit' queue = constantize("MessageQueue::#{qtype.capitalize}") queues[type] = queue.new(qconfig) end |
#queue=(queue, type = 'default') ⇒ Object
117 118 119 |
# File 'lib/sweat_shop.rb', line 117 def queue=(queue, type = 'default') queues[type] = queue end |
#queue_groups ⇒ Object
125 126 127 |
# File 'lib/sweat_shop.rb', line 125 def queue_groups @queue_groups ||= workers.collect{|w| w.queue_group} << 'default' end |
#queue_sizes ⇒ Object
99 100 101 102 103 104 |
# File 'lib/sweat_shop.rb', line 99 def queue_sizes workers.inject([]) do |all, worker| all << [worker, worker.queue_size] all end end |
#queues ⇒ Object
121 122 123 |
# File 'lib/sweat_shop.rb', line 121 def queues @queues ||= {} end |
#stop ⇒ Object
90 91 92 93 |
# File 'lib/sweat_shop.rb', line 90 def stop @stop = true queue.stop if queue.subscribe? end |
#stop? ⇒ Boolean
95 96 97 |
# File 'lib/sweat_shop.rb', line 95 def stop? @stop end |
#workers ⇒ Object
14 15 16 |
# File 'lib/sweat_shop.rb', line 14 def workers @workers ||= [] end |
#workers=(workers) ⇒ Object
18 19 20 |
# File 'lib/sweat_shop.rb', line 18 def workers=(workers) @workers = workers end |
#workers_in_group(groups) ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/sweat_shop.rb', line 22 def workers_in_group(groups) groups = [groups] unless groups.is_a?(Array) if groups.include?(:all) workers else workers.select do |worker| groups.include?(worker.queue_group) end end end |