Class: Sqspoller::SqsPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/sqs_poller.rb

Class Method Summary collapse

Class Method Details

.daemonize(filename) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/sqspoller/sqs_poller.rb', line 22

def daemonize(filename)
  raise 'Must run as root' if Process.euid != 0

  raise 'First fork failed' if (pid = fork) == -1
  exit unless pid.nil?

  Process.setsid
  raise 'Second fork failed' if (pid = fork) == -1
  exit unless pid.nil?
  puts "Daemon pid: #{Process.pid}" # Or save it somewhere, etc.

  Dir.chdir '/'
  File.umask 0000

  STDIN.reopen filename
  STDOUT.reopen '/dev/null', 'a'
  STDERR.reopen STDOUT
end

.get_wait_time_seconds(wait_time_seconds) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/sqspoller/sqs_poller.rb', line 69

def get_wait_time_seconds(wait_time_seconds)
  default_wait_time_seconds = 20
  if wait_time_seconds && wait_time_seconds >=0 && wait_time_seconds <=20
    return wait_time_seconds
  end
  default_wait_time_seconds
end

.initialize_worker(worker_configuration, total_poller_threads, logger_file) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/sqspoller/sqs_poller.rb', line 95

def initialize_worker(worker_configuration, total_poller_threads, logger_file)
  worker_thread_count = worker_configuration[:concurrency]
  worker_task = worker_configuration[:worker_class].split('::').inject(Object) {|o,c| o.const_get c}.new(worker_configuration, logger_file)
  waiting_tasks_ratio = worker_configuration[:waiting_tasks_ratio]
  waiting_tasks_ratio = 1 if waiting_tasks_ratio.nil?
  if worker_thread_count.nil?
    message_delegator = MessageDelegator.new total_poller_threads, waiting_tasks_ratio, worker_task, logger_file
  else
    message_delegator = MessageDelegator.new worker_thread_count, waiting_tasks_ratio, worker_task, logger_file
  end
  return message_delegator
end

.start_poller(filename, queue_config_name, access_key_id, secret_access_key, region, log_filename = nil) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/sqspoller/sqs_poller.rb', line 77

def start_poller(filename, queue_config_name, access_key_id, secret_access_key, region, log_filename=nil)
  puts "Starting poller"
  config = YAML.load(ERB.new(IO.read(filename)).result)
  config = sym(config)

  if log_filename.nil? || log_filename.empty?
    puts "Did not receive log file name"
    fork do
      Process.daemon
      start_poller_with_config config, queue_config_name, access_key_id, secret_access_key, region, STDOUT
    end
  else
    puts "Did receive log file name"
    daemonize log_filename
    start_poller_with_config config, queue_config_name, access_key_id, secret_access_key, region, log_filename
  end
end

.start_poller_with_config(config, queue_config_name, access_key_id, secret_access_key, region, logger_file) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sqspoller/sqs_poller.rb', line 41

def start_poller_with_config(config, queue_config_name, access_key_id, secret_access_key, region, logger_file)
  puts "Started poller method"
  @logger = Logger.new(logger_file)

  total_poller_threads = 0
  qcs = []
  queues_config = config[queue_config_name] || config[queue_config_name.to_sym]
  queues_config.keys.each { |queue|
    total_poller_threads += queues_config[queue][:polling_threads]
  }
  message_delegator = initialize_worker config[:worker_configuration], total_poller_threads, logger_file
  queues_config.keys.each { |queue|
    if queues_config[queue][:polling_threads] == 0
      @logger.info "Polling disabled for queue: #{queue}"
      next
    end
    @logger.info "Creating QueueController object for queue: #{queue}"
    qc = QueueController.new queue, queues_config[queue][:polling_threads], get_wait_time_seconds(queues_config[queue][:wait_time_seconds]), message_delegator, access_key_id, secret_access_key, region, logger_file
    qcs << qc
  }

  qcs.each { |qc|
    qc.start
  }

  qcs.each{ |qc| qc.threads.each { |thread| thread.join } }
end

.sym(map) ⇒ Object



15
16
17
18
19
20
# File 'lib/sqspoller/sqs_poller.rb', line 15

def sym(map)
  if map.class == Hash
    map = map.inject({}){|memo,(k,v)| memo[k.to_sym] = sym(v); memo}
  end
  return map
end