Class: Chillout::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/chillout/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_queue, dispatcher, queue, logger, container_class = CreationsContainer) ⇒ Worker

Returns a new instance of Worker.



5
6
7
8
9
10
11
# File 'lib/chillout/worker.rb', line 5

def initialize(max_queue, dispatcher, queue, logger, container_class=CreationsContainer)
  @max_queue = max_queue
  @dispatcher = dispatcher
  @queue = queue
  @logger = logger
  @container_class = container_class
end

Instance Attribute Details

#dispatcherObject (readonly)

Returns the value of attribute dispatcher.



3
4
5
# File 'lib/chillout/worker.rb', line 3

def dispatcher
  @dispatcher
end

#loggerObject (readonly)

Returns the value of attribute logger.



3
4
5
# File 'lib/chillout/worker.rb', line 3

def logger
  @logger
end

#queueObject (readonly)

Returns the value of attribute queue.



3
4
5
# File 'lib/chillout/worker.rb', line 3

def queue
  @queue
end

Instance Method Details

#get_all_containers_to_processObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/chillout/worker.rb', line 13

def get_all_containers_to_process
  logger.debug "Waiting for at least one container."
  all_containers = [queue.pop]
  logger.debug "Received at least one container."
  (@max_queue-1).times do
    begin
      all_containers << queue.pop(true)
    rescue ThreadError
      break
    end
  end
  logger.debug "Received containers to process: #{all_containers.count}"
  all_containers
end

#merge_containers(containers_to_merge) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/chillout/worker.rb', line 28

def merge_containers(containers_to_merge)
  mergable, unmergable = containers_to_merge.partition{|cont| @container_class === cont }
  creations_container = @container_class.new
  mergable.each do |container|
    creations_container.merge(container)
  end
  unmergable.unshift(creations_container) unless creations_container.empty?
  unmergable
end

#runObject



51
52
53
54
55
56
57
58
59
60
# File 'lib/chillout/worker.rb', line 51

def run
  logger.info "Worker started"
  send_startup_message
  loop do
    containers_to_merge = get_all_containers_to_process
    measurements = merge_containers(containers_to_merge)
    send_measurements(measurements)
    sleep 5
  end
end

#send_measurements(measurements) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/chillout/worker.rb', line 38

def send_measurements(measurements)
  logger.debug "Trying to send creations"
  dispatcher.send_measurements(measurements)
  logger.info "Metrics sent"
rescue Dispatcher::SendCreationsFailed
  logger.error "Sending metrics failed"
end

#send_startup_messageObject



46
47
48
49
# File 'lib/chillout/worker.rb', line 46

def send_startup_message
  dispatcher.send_startup_message
  logger.debug "Sending startup message"
end