Class: Sqspoller::SqsPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/sqs_poller.rb

Class Method Summary collapse

Class Method Details

.daemonize(filename) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sqspoller/sqs_poller.rb', line 32

def daemonize(filename)
  raise 'Must run as root' if Process.euid != 0

  raise 'First fork failed' if (pid = fork) == -1
  exit unless pid.nil?

  Process.setsid
  raise 'Second fork failed' if (pid = fork) == -1
  exit unless pid.nil?
  puts "Daemon pid: #{Process.pid}" # Or save it somewhere, etc.

  Dir.chdir '/'
  File.umask 0000

  STDIN.reopen filename
  STDOUT.reopen '/dev/null', 'a'
  STDERR.reopen STDOUT
end

.initialize_worker(worker_configuration, total_poller_threads, logger_file) ⇒ Object



190
191
192
193
194
195
196
197
198
# File 'lib/sqspoller/sqs_poller.rb', line 190

def initialize_worker worker_configuration, total_poller_threads, logger_file
  worker_thread_count = worker_configuration[:concurrency] || total_poller_threads
  waiting_tasks_ratio = worker_configuration[:waiting_tasks_ratio] || 1

  klass = worker_configuration[:worker_class].split('::').reduce(Object, :const_get)
  worker_task = klass.new worker_configuration

  MessageDelegator.new worker_thread_count, waiting_tasks_ratio, worker_task, logger_file
end

.load_config_from_file(filename) ⇒ Object



85
86
87
88
# File 'lib/sqspoller/sqs_poller.rb', line 85

def load_config_from_file filename
  content = IO.read filename
  symbolize_from_content content
end

.load_config_from_redis(redis_key) ⇒ Object



90
91
92
93
# File 'lib/sqspoller/sqs_poller.rb', line 90

def load_config_from_redis redis_key
  content = REDIS.get redis_key
  symbolize_from_content content
end

.start_all_queues(queues_config, message_delegator, poller_args) ⇒ Object



157
158
159
160
161
162
# File 'lib/sqspoller/sqs_poller.rb', line 157

def start_all_queues queues_config, message_delegator, poller_args
  @logger.info "Start all queues " 
  queues_config.keys.each do |queue|
    start_queue_controller queues_config, queue, message_delegator, poller_args
  end
end

.start_all_queues_with_refresh(queues_config, message_delegator, poller_args, refresh_interval) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/sqspoller/sqs_poller.rb', line 130

def start_all_queues_with_refresh queues_config, message_delegator, poller_args, refresh_interval
  @logger.info "Start all queues with refresh"
  queues = {}
  loop do
    queue_names = queues_config.keys
    queues_config.keys.each do |queue|
      @logger.info "    Checking queue #{queue}"
      if queues[queue]
        if queues[queue].all_threads_alive?
          @logger.info "      Queue: #{queue} not created, already initialized and running"
        else
          @logger.info "      Queue: #{queue} previously created, however not all threads are running. Restarting."
          queues[queue] = start_queue_controller queues_config, queue, message_delegator, poller_args
        end
      else
        queues[queue] = start_queue_controller queues_config, queue, message_delegator, poller_args
      end
    end
    @logger.info "  Done creating queues, sleeping for #{refresh_interval} seconds"
    sleep refresh_interval
    @logger.info "  Refreshing config"
    config = load_config_from_redis poller_args[:content_name]
    queues_config = config[poller_args[:queue_config_name]]
    refresh_interval = config[:worker_configuration][:refresh_interval_in_seconds]
  end
end

.start_poller(content_name, queue_config_name, access_key_id, secret_access_key, region, log_filename = nil, redis_or_file = false) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/sqspoller/sqs_poller.rb', line 51

def start_poller content_name, queue_config_name, access_key_id, secret_access_key, region, log_filename=nil, redis_or_file=false
  puts "StartPoller"

  poller_args = {content_name:      content_name,
                 queue_config_name: queue_config_name.to_sym,
                 access_key_id:     access_key_id,
                 secret_access_key: secret_access_key,
                 region:            region,
                 log_filename:      log_filename,
                 redis_or_file:     redis_or_file
                }

  if redis_or_file
    if REDIS.get('random_key') == nil
      puts "Able to connect to Redis"
    else
      puts "*** Unable to connect to Redis"
      exit -1
    end
  end
  if log_filename.nil? || log_filename.empty?
    poller_args[:log_filename] = STDOUT
    puts "Did not receive log file name"
    fork do
      Process.daemon nil, :noclose
      start_queues_with_config poller_args
    end
  else
    puts "Did receive log file name"
    daemonize log_filename
    start_queues_with_config poller_args
  end
end

.start_queue_controller(queues_config, queue, message_delegator, poller_args) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/sqspoller/sqs_poller.rb', line 171

def start_queue_controller queues_config, queue, message_delegator, poller_args
  if queues_config[queue][:polling_threads] == 0
    @logger.info "  Polling disabled for queue: #{queue}"
    nil
  else
    @logger.info "  Creating QueueController object for queue: #{queue}"
    qc = QueueController.new queue_name: queue,
                             polling_threads_count: queues_config[queue][:polling_threads],
                             task_delegator: message_delegator,
                             access_key_id: poller_args[:access_key_id],
                             secret_access_key: poller_args[:secret_access_key],
                             region: poller_args[:region],
                             logger_file: poller_args[:log_filename],
                             logger: poller_args[:logger]
    qc.start
    qc
  end
end

.start_queues_with_config(poller_args) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/sqspoller/sqs_poller.rb', line 101

def start_queues_with_config poller_args
  @logger = Logger.new(poller_args[:log_filename])
  poller_args[:logger] = @logger
  @logger.info "Starting version: #{VERSION}"
  @logger.info "Get config"
  config = if poller_args[:redis_or_file]
             load_config_from_redis poller_args[:content_name]
           else
             load_config_from_file poller_args[:content_name]
           end
  @logger.info "Config: #{config.inspect}"
  queues_config = config[poller_args[:queue_config_name]]
  if queues_config
    @logger.info "QueuesConfig: #{queues_config.inspect}"
  else
    @logger.error "Unable to fetch Queue Config"
  end

  @logger.info "Started poller method"

  message_delegator = worker_pool_init config[:worker_configuration].merge(poller_args), queues_config, poller_args[:log_filename]

  if poller_args[:redis_or_file]
    start_all_queues_with_refresh queues_config, message_delegator, poller_args, config[:worker_configuration][:refresh_interval_in_seconds] || 3600
  else
    start_all_queues  queues_config, message_delegator, poller_args
  end
end

.symbolize(map) ⇒ Object



21
22
23
24
25
26
27
28
29
30
# File 'lib/sqspoller/sqs_poller.rb', line 21

def symbolize(map)
  if map.class == Hash
    map.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = symbolize(v)
      memo
    end
  else
    map
  end
end

.symbolize_from_content(content) ⇒ Object



95
96
97
98
99
# File 'lib/sqspoller/sqs_poller.rb', line 95

def symbolize_from_content content
  erb = ERB.new content
  yaml = YAML.load erb.result
  symbolize yaml
end

.worker_pool_init(worker_config, queues_config, logger_file) ⇒ Object



164
165
166
167
168
169
# File 'lib/sqspoller/sqs_poller.rb', line 164

def worker_pool_init worker_config, queues_config, logger_file
  total_poller_threads = queues_config.keys.reduce(0) do |sum, queue|
                           sum += queues_config[queue][:polling_threads]
                         end
  initialize_worker worker_config, total_poller_threads, logger_file
end