Module: SweatShop

Extended by:
SweatShop
Included in:
SweatShop
Defined in:
lib/sweat_shop.rb,
lib/sweat_shop/sweatd.rb,
lib/sweat_shop/worker.rb

Defined Under Namespace

Classes: Sweatd, Worker

Instance Method Summary collapse

Instance Method Details

#cluster_infoObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/sweat_shop.rb', line 136

def cluster_info
  servers = []
  queue_groups.each do |group|
    qconfig = config[group]
    next unless qconfig
    next unless qconfig['cluster']
    servers << qconfig['cluster']
  end
  servers.flatten!

  servers.each do |server|
    puts "\nQueue sizes on #{server}"
    queue = MessageQueue::Rabbit.new('host' => server)
    queue_groups.each do |group|
      queues[group] = queue
    end
    pp_sizes
    puts
  end
  @queues = {}
  nil
end

#configObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/sweat_shop.rb', line 73

def config
  @config ||= begin
    defaults = YAML.load_file(File.dirname(__FILE__) + '/../config/defaults.yml')
    if defined?(RAILS_ROOT)
      file = RAILS_ROOT + '/config/sweatshop.yml'
      if File.exist?(file)
        YAML.load_file(file)[RAILS_ENV || 'development']
      else
        defaults['enable'] = false
        defaults
      end
    else
      defaults
    end
  end
end

#constantize(str) ⇒ Object



172
173
174
# File 'lib/sweat_shop.rb', line 172

def constantize(str)
  Object.module_eval("#{str}", __FILE__, __LINE__)
end

#do_all_tasksObject



61
62
63
64
65
# File 'lib/sweat_shop.rb', line 61

def do_all_tasks
  do_tasks(
    workers_in_group(:all)
  )
end

#do_default_tasksObject



67
68
69
70
71
# File 'lib/sweat_shop.rb', line 67

def do_default_tasks
  do_tasks(
    workers_in_group(:default)
  )
end

#do_tasks(workers) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sweat_shop.rb', line 33

def do_tasks(workers)
  if queue.subscribe?
    EM.run do
      workers.each do |worker|
        worker.subscribe
      end
    end
  else
    loop do
      wait = true
      workers.each do |worker|
        if task = worker.dequeue
          worker.do_task(task)
          wait = false
        end
      end
      if stop?
        workers.each do |worker|
          worker.stop
        end
        queue.stop
        exit 
      end
      sleep 1 if wait
    end
  end
end

#log(msg) ⇒ Object



159
160
161
162
# File 'lib/sweat_shop.rb', line 159

def log(msg)
  return if logger == :silent
  logger ? logger.debug(msg) : puts(msg)
end

#loggerObject



164
165
166
# File 'lib/sweat_shop.rb', line 164

def logger
  @logger
end

#logger=(logger) ⇒ Object



168
169
170
# File 'lib/sweat_shop.rb', line 168

def logger=(logger)
  @logger = logger
end

#pp_sizesObject



129
130
131
132
133
134
# File 'lib/sweat_shop.rb', line 129

def pp_sizes
  max_width = workers.collect{|w| w.to_s.size}.max
  puts '-' * (max_width + 10)
  puts queue_sizes.collect{ |p| sprintf("%-#{max_width}s %2s", p.first, p.last) }.join("\n")
  puts '-' * (max_width + 10)
end

#queue(type = 'default') ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/sweat_shop.rb', line 106

def queue(type = 'default')
  type = config[type] ? type : 'default'
  return queues[type] if queues[type]

  qconfig = config[type]
  qtype   = qconfig['queue'] || 'rabbit'
  queue   = constantize("MessageQueue::#{qtype.capitalize}")

  queues[type] = queue.new(qconfig)
end

#queue=(queue, type = 'default') ⇒ Object



117
118
119
# File 'lib/sweat_shop.rb', line 117

def queue=(queue, type = 'default')
  queues[type] = queue
end

#queue_groupsObject



125
126
127
# File 'lib/sweat_shop.rb', line 125

def queue_groups
  @queue_groups ||= workers.collect{|w| w.queue_group} << 'default'
end

#queue_sizesObject



99
100
101
102
103
104
# File 'lib/sweat_shop.rb', line 99

def queue_sizes
  workers.inject([]) do |all, worker|
    all << [worker, worker.queue_size]
    all
  end
end

#queuesObject



121
122
123
# File 'lib/sweat_shop.rb', line 121

def queues
  @queues ||= {}
end

#stopObject



90
91
92
93
# File 'lib/sweat_shop.rb', line 90

def stop
  @stop = true
  queue.stop if queue.subscribe?
end

#stop?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/sweat_shop.rb', line 95

def stop?
  @stop
end

#workersObject



14
15
16
# File 'lib/sweat_shop.rb', line 14

def workers
  @workers ||= []
end

#workers=(workers) ⇒ Object



18
19
20
# File 'lib/sweat_shop.rb', line 18

def workers=(workers)
  @workers = workers 
end

#workers_in_group(groups) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/sweat_shop.rb', line 22

def workers_in_group(groups)
  groups = [groups] unless groups.is_a?(Array)
  if groups.include?(:all)
    workers
  else
    workers.select do |worker|
      groups.include?(worker.queue_group)
    end
  end
end