Class: Capacitor::CommandsFetcher
- Inherits:
-
Object
- Object
- Capacitor::CommandsFetcher
- Defined in:
- lib/capacitor/commands_fetcher.rb
Instance Method Summary collapse
- #block_on_incoming_signal_list ⇒ Object
- #blocking_timeout ⇒ Object
- #fetch ⇒ Object
- #flush_batch ⇒ Object
- #flush_incoming_signal_list ⇒ Object
- #flush_retried_batch ⇒ Object
- #incoming_signal_list ⇒ Object
- #logger ⇒ Object
- #redis ⇒ Object
-
#retrieve_batch ⇒ Object
When things are working well :incoming_hash -> :processing_hash -> flush().
- #retrieve_current_batch ⇒ Object
- #retrieve_existing_batch ⇒ Object
Instance Method Details
#block_on_incoming_signal_list ⇒ Object
25 26 27 28 |
# File 'lib/capacitor/commands_fetcher.rb', line 25 def block_on_incoming_signal_list false until incoming_signal_list flush_incoming_signal_list end |
#blocking_timeout ⇒ Object
9 10 11 |
# File 'lib/capacitor/commands_fetcher.rb', line 9 def blocking_timeout 60 end |
#fetch ⇒ Object
5 6 7 |
# File 'lib/capacitor/commands_fetcher.rb', line 5 def fetch new.retrieve_batch end |
#flush_batch ⇒ Object
72 73 74 75 |
# File 'lib/capacitor/commands_fetcher.rb', line 72 def flush_batch # Safely processed now, kill the batch in redis redis.del "processing_hash", "retry_hash" end |
#flush_incoming_signal_list ⇒ Object
30 31 32 |
# File 'lib/capacitor/commands_fetcher.rb', line 30 def flush_incoming_signal_list redis.del "incoming_signal_list" end |
#flush_retried_batch ⇒ Object
77 78 79 80 81 82 83 84 |
# File 'lib/capacitor/commands_fetcher.rb', line 77 def flush_retried_batch if redis.hlen("retry_hash") > 0 failure = 'failure:' + Time.new.utc.to_f.to_s redis.rename "retry_hash", failure redis.lpush "failed_hash_keys", failure logger.error "retry_hash moved to #{failure}" end end |
#incoming_signal_list ⇒ Object
21 22 23 |
# File 'lib/capacitor/commands_fetcher.rb', line 21 def incoming_signal_list redis.blpop "incoming_signal_list", blocking_timeout end |
#logger ⇒ Object
17 18 19 |
# File 'lib/capacitor/commands_fetcher.rb', line 17 def logger Capacitor.logger end |
#redis ⇒ Object
13 14 15 |
# File 'lib/capacitor/commands_fetcher.rb', line 13 def redis Capacitor.redis end |
#retrieve_batch ⇒ Object
When things are working well :incoming_hash -> :processing_hash -> flush()
If a batch fails once :processing_hash -> :retry_hash -> flush()
If a batch fails again :retry_hash -> :failed_hash_keys -> flush() then start over with :incoming_hash
67 68 69 70 |
# File 'lib/capacitor/commands_fetcher.rb', line 67 def retrieve_batch flush_retried_batch return retrieve_existing_batch || retrieve_current_batch end |
#retrieve_current_batch ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/capacitor/commands_fetcher.rb', line 45 def retrieve_current_batch begin result = redis.rename "incoming_hash", "processing_hash" rescue Exception => e # This means we got a signal without getting data, which is # probably okay due to the harmless race condition, but might # warrant investigation later, so let's log it and move on. logger.warn "empty incoming_hash in retrieve_batch" return {} end redis.hgetall "processing_hash" end |
#retrieve_existing_batch ⇒ Object
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/capacitor/commands_fetcher.rb', line 34 def retrieve_existing_batch batch = redis.hgetall "processing_hash" if !batch.empty? redis.rename "processing_hash", "retry_hash" logger.error "processing_hash moved to retry_hash" batch else nil end end |