Class: Munificent::KeyAssignment::RequestProcessor
- Inherits:
-
Object
- Object
- Munificent::KeyAssignment::RequestProcessor
show all
- Defined in:
- app/services/munificent/key_assignment/request_processor.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: PingMismatchError, PingTimeoutError
Constant Summary
collapse
- QUEUES =
[
COMMAND_QUEUE = "key_assignment:command_queue".freeze,
FULFILLMENT_QUEUE = "key_assignment:fulfillment_queue".freeze,
RESPONSE_QUEUE = "key_assignment:response_queue".freeze,
].freeze
- PAUSE_COMMAND =
"pause".freeze
- PING_COMMAND =
"ping".freeze
- RECHECK_DATABASE_COMMAND =
"recheck_database".freeze
- STATUS_REPORT_COMMAND =
"status_report".freeze
- STOP_COMMAND =
"stop".freeze
- UNPAUSE_COMMAND =
"unpause".freeze
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of RequestProcessor.
105
106
107
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 105
def initialize
@key_assigner = KeyAssignment::KeyAssigner.new
end
|
Class Method Details
.clear_all_queues ⇒ Object
85
86
87
88
89
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 85
def clear_all_queues
QUEUES.each do |queue|
redis.del(queue)
end
end
|
.finished_backlog? ⇒ Boolean
43
44
45
46
47
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 43
def finished_backlog?
status_report[:processing_backlog] == false
end
|
.ping_processor! ⇒ Object
33
34
35
36
37
38
39
40
41
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 33
def ping_processor!
Rails.logger.debug("Pinging key assignment processor")
nonce = SecureRandom.hex
response = send_command(PING_COMMAND, nonce, await_response: true)
response == nonce or raise PingMismatchError, "Response `#{response}` != `#{nonce}`"
rescue Redis::TimeoutError => e
raise PingTimeoutError, e
end
|
.queue_fulfillment(donator_bundle_tier) ⇒ Object
20
21
22
23
24
25
26
27
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 20
def queue_fulfillment(donator_bundle_tier)
case donator_bundle_tier
when DonatorBundleTier
redis.lpush(FULFILLMENT_QUEUE, donator_bundle_tier.id)
else
raise ArgumentError, "Expected DonatorBundleTier, got #{donator_bundle_tier.class}"
end
end
|
.recheck_database ⇒ Object
29
30
31
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 29
def recheck_database
send_command(RECHECK_DATABASE_COMMAND)
end
|
.send_command(command, *args, await_response: false, json: false, **kwargs) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 53
def send_command(command, *args, await_response: false, json: false, **kwargs) await_response = 10 if await_response == true
args = [JSON.dump(kwargs)] if json
redis.lpush(COMMAND_QUEUE, args.unshift(client_id).unshift(command).join(" "))
if await_response
redis.subscribe_with_timeout(await_response, client_id) do |on|
on.message do |channel, response|
match = response.match(/\A(\w+) (.*)\z/)
response_command, response_body = match[1, 2]
response_args = if json
JSON.parse(response_body, symbolize_names: true)
elsif response_body.include?(" ")
response_body.split
else
response_body
end
if response_command == command
redis.unsubscribe(channel)
response_args = response_args.first if response_args.length == 1
return response_args
end
end
end
end
end
|
.status_report ⇒ Object
49
50
51
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 49
def status_report
send_command(STATUS_REPORT_COMMAND, await_response: true, json: true)
end
|
Instance Method Details
#start ⇒ Object
109
110
111
112
|
# File 'app/services/munificent/key_assignment/request_processor.rb', line 109
def start
process_backlog_from_database
begin_reading_from_redis_queue
end
|