Class: CarrotRpc::RpcClient
- Inherits:
-
Object
- Object
- CarrotRpc::RpcClient
- Extended by:
- ClientServer
- Includes:
- ClientActions
- Defined in:
- lib/carrot_rpc/rpc_client.rb
Overview
Generic class for all RPC Consumers. Use as a base class to build other RPC Consumers for related functionality. Let’s define a naming convention here for subclasses becuase I don’t want to write a Confluence doc. All subclasses should have the following naming convention: <Name>RpcConsumer ex: PostRpcConsumer
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#server_queue ⇒ Object
readonly
Returns the value of attribute server_queue.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(config = nil) ⇒ RpcClient
constructor
Use defaults for application level connection to RabbitMQ.
- #message(correlation_id:, method:, params:) ⇒ Object
-
#publish(correlation_id:, method:, params:) ⇒ Object
A @reply_queue is deleted when the channel is closed.
-
#remote_call(remote_method, params) ⇒ Object
params is an array of method argument values programmer implementing this class must know about the remote service the remote service must have documented the methods and arguments in order for this pattern to work.
-
#request_key_formatter(params) ⇒ Hash
Formats keys in the request data.
-
#response_key_formatter(payload) ⇒ Hash
Formats keys in the response data.
-
#start ⇒ Object
Starts the connection to listen for messages.
- #subscribe ⇒ Object
-
#wait_for_result(correlation_id) ⇒ Object
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/MethodLength.
Methods included from ClientServer
queue_name, queue_options, test_queue_name
Methods included from ClientActions
#create, #index, #show, #update
Constructor Details
#initialize(config = nil) ⇒ RpcClient
Use defaults for application level connection to RabbitMQ.
28 29 30 31 |
# File 'lib/carrot_rpc/rpc_client.rb', line 28 def initialize(config = nil) @config = config || CarrotRpc.configuration @logger = @config.logger end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/carrot_rpc/rpc_client.rb', line 7 def channel @channel end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
7 8 9 |
# File 'lib/carrot_rpc/rpc_client.rb', line 7 def logger @logger end |
#server_queue ⇒ Object (readonly)
Returns the value of attribute server_queue.
7 8 9 |
# File 'lib/carrot_rpc/rpc_client.rb', line 7 def server_queue @server_queue end |
Class Method Details
.before_request(*proc) ⇒ Object
12 13 14 15 16 17 18 19 20 |
# File 'lib/carrot_rpc/rpc_client.rb', line 12 def self.before_request(*proc) if proc.length == 0 @before_request elsif proc.length == 1 @before_request = proc.first || CarrotRpc.configuration.before_request else fail ArgumentError end end |
Instance Method Details
#message(correlation_id:, method:, params:) ⇒ Object
139 140 141 142 143 144 145 146 |
# File 'lib/carrot_rpc/rpc_client.rb', line 139 def (correlation_id:, method:, params:) { id: correlation_id, jsonrpc: "2.0", method: method, params: params.except(:controller, :action) } end |
#publish(correlation_id:, method:, params:) ⇒ Object
A @reply_queue is deleted when the channel is closed. Closing the channel accounts for cleanup of the client @reply_queue.
130 131 132 133 134 135 136 137 |
# File 'lib/carrot_rpc/rpc_client.rb', line 130 def publish(correlation_id:, method:, params:) = ( correlation_id: correlation_id, params: params, method: method ) publish_payload(.to_json, correlation_id: correlation_id) end |
#remote_call(remote_method, params) ⇒ Object
params is an array of method argument values programmer implementing this class must know about the remote service the remote service must have documented the methods and arguments in order for this pattern to work. TODO: change to a hash to account for keyword arguments???
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/carrot_rpc/rpc_client.rb', line 76 def remote_call(remote_method, params) start subscribe server_queue_name = server_queue.name correlation_id = SecureRandom.uuid extra = { correlation_id: correlation_id } ActiveSupport::Notifications.instrument("client.#{server_queue_name}.remote_call", extra: extra) { logger.tagged("client", "server_queue=#{server_queue_name}", "correlation_id=#{correlation_id}") { params = self.class.before_request.call(params) if self.class.before_request publish(correlation_id: correlation_id, method: remote_method, params: request_key_formatter(params)) wait_for_result(correlation_id) } } end |
#request_key_formatter(params) ⇒ Hash
Formats keys in the request data.
124 125 126 |
# File 'lib/carrot_rpc/rpc_client.rb', line 124 def request_key_formatter(params) CarrotRpc::Format.keys @config.rpc_client_request_key_format, params end |
#response_key_formatter(payload) ⇒ Hash
Formats keys in the response data.
117 118 119 |
# File 'lib/carrot_rpc/rpc_client.rb', line 117 def response_key_formatter(payload) CarrotRpc::Format.keys @config.rpc_client_response_key_format, payload end |
#start ⇒ Object
Starts the connection to listen for messages.
All RpcClient requests go to the a single @server_queue Responses come back over a unique queue name.
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/carrot_rpc/rpc_client.rb', line 37 def start # Create a new channel on each request because the channel should be closed after each request. @channel = @config.bunny.create_channel queue_name = self.class.test_queue_name(self.class.queue_name, @config.client_test_mode) # auto_delete => false keeps the queue around until RabbitMQ restarts or explicitly deleted = { auto_delete: false }.merge(self.class.) @server_queue = @channel.queue(queue_name, ) # Setup a direct exchange. @exchange = @channel.default_exchange end |
#subscribe ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/carrot_rpc/rpc_client.rb', line 50 def subscribe # Empty queue name ends up creating a randomly named queue by RabbitMQ # Exclusive => queue will be deleted when connection closes. Allows for automatic "cleanup". @reply_queue = @channel.queue("", exclusive: true, auto_delete: true, durable: false) # setup a hash for results with a Queue object as a value @results = Hash.new { |h, k| h[k] = Queue.new } # setup subscribe block to Service # block => false is a non blocking IO option. @reply_queue.subscribe(block: false) do |delivery_info, properties, payload| consume(delivery_info, properties, payload) end end |
#wait_for_result(correlation_id) ⇒ Object
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/MethodLength
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/carrot_rpc/rpc_client.rb', line 94 def wait_for_result(correlation_id) # Should be good to timeout here because we're blocking in the main thread here. Timeout.timeout(@config.rpc_client_timeout, CarrotRpc::Exception::RpcClientTimeout) do # `pop` is `Queue#pop`, so it is blocking on the receiving thread # and this must happend before the `Hash.delete` or # the receiving thread won't be able to find the correlation_id in @results result = @results[correlation_id].pop # If we get an exception, raise it in this thread, so the application can deal with it. if result.is_a? Exception fail result end result end ensure @results.delete correlation_id # remove item from hash. prevents memory leak. @channel.close end |