Class: Munificent::KeyAssignment::RequestProcessor

Inherits:
Object
  • Object
show all
Defined in:
app/services/munificent/key_assignment/request_processor.rb

Overview

rubocop:disable Metrics/ClassLength

Defined Under Namespace

Classes: PingMismatchError, PingTimeoutError

Constant Summary collapse

QUEUES =
[
  COMMAND_QUEUE = "key_assignment:command_queue".freeze,
  FULFILLMENT_QUEUE = "key_assignment:fulfillment_queue".freeze,
  RESPONSE_QUEUE = "key_assignment:response_queue".freeze,
].freeze
PAUSE_COMMAND =
"pause".freeze
PING_COMMAND =
"ping".freeze
RECHECK_DATABASE_COMMAND =
"recheck_database".freeze
STATUS_REPORT_COMMAND =
"status_report".freeze
STOP_COMMAND =
"stop".freeze
UNPAUSE_COMMAND =
"unpause".freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRequestProcessor

Returns a new instance of RequestProcessor.



105
106
107
# File 'app/services/munificent/key_assignment/request_processor.rb', line 105

def initialize
  @key_assigner = KeyAssignment::KeyAssigner.new
end

Class Method Details

.clear_all_queuesObject



85
86
87
88
89
# File 'app/services/munificent/key_assignment/request_processor.rb', line 85

def clear_all_queues
  QUEUES.each do |queue|
    redis.del(queue)
  end
end

.finished_backlog?Boolean

Returns:

  • (Boolean)


43
44
45
46
47
# File 'app/services/munificent/key_assignment/request_processor.rb', line 43

def finished_backlog?
  # Don't change this to `!status_report[:processing_backlog]` because we want to
  # wait for the backlog to start processing and then finish.
  status_report[:processing_backlog] == false
end

.ping_processor!Object



33
34
35
36
37
38
39
40
41
# File 'app/services/munificent/key_assignment/request_processor.rb', line 33

def ping_processor!
  Rails.logger.debug("Pinging key assignment processor")
  nonce = SecureRandom.hex

  response = send_command(PING_COMMAND, nonce, await_response: true)
  response == nonce or raise PingMismatchError, "Response `#{response}` != `#{nonce}`"
rescue Redis::TimeoutError => e
  raise PingTimeoutError, e
end

.queue_fulfillment(donator_bundle_tier) ⇒ Object



20
21
22
23
24
25
26
27
# File 'app/services/munificent/key_assignment/request_processor.rb', line 20

def queue_fulfillment(donator_bundle_tier)
  case donator_bundle_tier
  when DonatorBundleTier
    redis.lpush(FULFILLMENT_QUEUE, donator_bundle_tier.id)
  else
    raise ArgumentError, "Expected DonatorBundleTier, got #{donator_bundle_tier.class}"
  end
end

.recheck_databaseObject



29
30
31
# File 'app/services/munificent/key_assignment/request_processor.rb', line 29

def recheck_database
  send_command(RECHECK_DATABASE_COMMAND)
end

.send_command(command, *args, await_response: false, json: false, **kwargs) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'app/services/munificent/key_assignment/request_processor.rb', line 53

def send_command(command, *args, await_response: false, json: false, **kwargs) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  await_response = 10 if await_response == true

  args = [JSON.dump(kwargs)] if json

  redis.lpush(COMMAND_QUEUE, args.unshift(client_id).unshift(command).join(" "))

  if await_response
    redis.subscribe_with_timeout(await_response, client_id) do |on|
      on.message do |channel, response|
        match = response.match(/\A(\w+) (.*)\z/)
        response_command, response_body = match[1, 2]

        response_args = if json
          JSON.parse(response_body, symbolize_names: true)
        elsif response_body.include?(" ")
          response_body.split
        else
          response_body
        end

        if response_command == command
          redis.unsubscribe(channel)

          response_args = response_args.first if response_args.length == 1
          return response_args
        end
      end
    end
  end
end

.status_reportObject



49
50
51
# File 'app/services/munificent/key_assignment/request_processor.rb', line 49

def status_report
  send_command(STATUS_REPORT_COMMAND, await_response: true, json: true)
end

Instance Method Details

#startObject



109
110
111
112
# File 'app/services/munificent/key_assignment/request_processor.rb', line 109

def start
  process_backlog_from_database
  begin_reading_from_redis_queue
end