Module: SweatShop

Extended by:
SweatShop
Included in:
SweatShop
Defined in:
lib/sweat_shop.rb,
lib/sweat_shop/sweatd.rb,
lib/sweat_shop/worker.rb

Defined Under Namespace

Classes: Sweatd, Worker

Instance Method Summary collapse

Instance Method Details

#configObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/sweat_shop.rb', line 73

def config
  @config ||= begin
    defaults = YAML.load_file(File.dirname(__FILE__) + '/../config/defaults.yml')
    if defined?(RAILS_ROOT)
      file = RAILS_ROOT + '/config/sweatshop.yml'
      if File.exist?(file)
        YAML.load_file(file)[RAILS_ENV || 'development']
      else
        defaults['enable'] = false
        defaults
      end
    else
      defaults
    end
  end
end

#constantize(str) ⇒ Object



140
141
142
# File 'lib/sweat_shop.rb', line 140

def constantize(str)
  Object.module_eval("#{str}", __FILE__, __LINE__)
end

#do_all_tasksObject



61
62
63
64
65
# File 'lib/sweat_shop.rb', line 61

def do_all_tasks
  do_tasks(
    workers_in_group(:all)
  )
end

#do_default_tasksObject



67
68
69
70
71
# File 'lib/sweat_shop.rb', line 67

def do_default_tasks
  do_tasks(
    workers_in_group(:default)
  )
end

#do_tasks(workers) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sweat_shop.rb', line 33

def do_tasks(workers)
  if queue.subscribe?
    EM.run do
      workers.each do |worker|
        worker.subscribe
      end
    end
  else
    loop do
      wait = true
      workers.each do |worker|
        if task = worker.dequeue
          worker.do_task(task)
          wait = false
        end
      end
      if stop?
        workers.each do |worker|
          worker.stop
        end
        queue.stop
        exit 
      end
      sleep 1 if wait
    end
  end
end

#log(msg) ⇒ Object



127
128
129
130
# File 'lib/sweat_shop.rb', line 127

def log(msg)
  return if logger == :silent
  logger ? logger.debug(msg) : puts(msg)
end

#loggerObject



132
133
134
# File 'lib/sweat_shop.rb', line 132

def logger
  @logger
end

#logger=(logger) ⇒ Object



136
137
138
# File 'lib/sweat_shop.rb', line 136

def logger=(logger)
  @logger = logger
end

#pp_sizesObject



106
107
108
109
110
111
# File 'lib/sweat_shop.rb', line 106

def pp_sizes
  max_width = workers.collect{|w| w.to_s.size}.max
  puts '-' * (max_width + 10)
  puts queue_sizes.collect{|p| sprintf("%-#{max_width}s %2s", p.first, p.last)}.join("\n")
  puts '-' * (max_width + 10)
end

#queue(type = 'default') ⇒ Object



113
114
115
116
117
118
119
120
121
# File 'lib/sweat_shop.rb', line 113

def queue(type = 'default')
  @queues ||= {}
  @queues[type] ||= begin 
    qconfig = config[type] || config['default']
    qtype   = qconfig['queue'] || 'rabbit'
    queue   = constantize("MessageQueue::#{qtype.capitalize}")
    queue.new(qconfig)
  end
end

#queue=(queue, type = 'default') ⇒ Object



123
124
125
# File 'lib/sweat_shop.rb', line 123

def queue=(queue, type = 'default')
  @queues[type] = queue
end

#queue_sizesObject



99
100
101
102
103
104
# File 'lib/sweat_shop.rb', line 99

def queue_sizes
  workers.inject([]) do |all, worker|
    all << [worker, worker.queue_size]
    all
  end
end

#stopObject



90
91
92
93
# File 'lib/sweat_shop.rb', line 90

def stop
  @stop = true
  queue.stop if queue.subscribe?
end

#stop?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/sweat_shop.rb', line 95

def stop?
  @stop
end

#workersObject



14
15
16
# File 'lib/sweat_shop.rb', line 14

def workers
  @workers ||= []
end

#workers=(workers) ⇒ Object



18
19
20
# File 'lib/sweat_shop.rb', line 18

def workers=(workers)
  @workers = workers 
end

#workers_in_group(groups) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/sweat_shop.rb', line 22

def workers_in_group(groups)
  groups = [groups] unless groups.is_a?(Array)
  if groups.include?(:all)
    workers
  else
    workers.select do |worker|
      groups.include?(worker.queue_group)
    end
  end
end