Class: Capacitor::CommandsFetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/capacitor/commands_fetcher.rb

Instance Method Summary collapse

Instance Method Details

#block_on_incoming_signal_listObject



25
26
27
28
# File 'lib/capacitor/commands_fetcher.rb', line 25

def block_on_incoming_signal_list
  false until incoming_signal_list
  flush_incoming_signal_list
end

#blocking_timeoutObject



9
10
11
# File 'lib/capacitor/commands_fetcher.rb', line 9

def blocking_timeout
  60
end

#fetchObject



5
6
7
# File 'lib/capacitor/commands_fetcher.rb', line 5

def fetch
  new.retrieve_batch
end

#flush_batchObject



72
73
74
75
# File 'lib/capacitor/commands_fetcher.rb', line 72

def flush_batch
  # Safely processed now, kill the batch in redis
  redis.del "processing_hash", "retry_hash"
end

#flush_incoming_signal_listObject



30
31
32
# File 'lib/capacitor/commands_fetcher.rb', line 30

def flush_incoming_signal_list
  redis.del "incoming_signal_list"
end

#flush_retried_batchObject



77
78
79
80
81
82
83
84
# File 'lib/capacitor/commands_fetcher.rb', line 77

def flush_retried_batch
  if redis.hlen("retry_hash") > 0
    failure = 'failure:' + Time.new.utc.to_f.to_s
    redis.rename "retry_hash", failure
    redis.lpush "failed_hash_keys", failure
    logger.error "retry_hash moved to #{failure}"
  end
end

#incoming_signal_listObject



21
22
23
# File 'lib/capacitor/commands_fetcher.rb', line 21

def incoming_signal_list
  redis.blpop "incoming_signal_list", blocking_timeout
end

#loggerObject



17
18
19
# File 'lib/capacitor/commands_fetcher.rb', line 17

def logger
  Capacitor.logger
end

#redisObject



13
14
15
# File 'lib/capacitor/commands_fetcher.rb', line 13

def redis
  Capacitor.redis
end

#retrieve_batchObject

When things are working well :incoming_hash -> :processing_hash -> flush()

If a batch fails once :processing_hash -> :retry_hash -> flush()

If a batch fails again :retry_hash -> :failed_hash_keys -> flush() then start over with :incoming_hash



67
68
69
70
# File 'lib/capacitor/commands_fetcher.rb', line 67

def retrieve_batch
  flush_retried_batch
  return retrieve_existing_batch || retrieve_current_batch
end

#retrieve_current_batchObject



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/capacitor/commands_fetcher.rb', line 45

def retrieve_current_batch
  begin
    result = redis.rename "incoming_hash", "processing_hash"
  rescue Exception => e
    # This means we got a signal without getting data, which is
    # probably okay due to the harmless race condition, but might
    # warrant investigation later, so let's log it and move on.
    logger.warn "empty incoming_hash in retrieve_batch"
    return {}
  end
  redis.hgetall "processing_hash"
end

#retrieve_existing_batchObject



34
35
36
37
38
39
40
41
42
43
# File 'lib/capacitor/commands_fetcher.rb', line 34

def retrieve_existing_batch
  batch = redis.hgetall "processing_hash"
  if !batch.empty?
    redis.rename "processing_hash", "retry_hash"
    logger.error "processing_hash moved to retry_hash"
    batch
  else
    nil
  end
end