Class: Hermes::RpcClient
- Inherits:
-
Object
- Object
- Hermes::RpcClient
- Extended by:
- Forwardable
- Defined in:
- lib/hermes/rpc_client.rb
Defined Under Namespace
Classes: ResponseEvent, RpcTimeoutError
Constant Summary collapse
- DIRECT_REPLY_TO =
"amq.rabbitmq.reply-to".freeze
Instance Attribute Summary collapse
-
#publisher ⇒ Object
readonly
Returns the value of attribute publisher.
Class Method Summary collapse
Instance Method Summary collapse
- #call(event) ⇒ Object
-
#initialize(publisher: Hermes::DependenciesContainer["publisher"], config: Hermes::DependenciesContainer["config"], distributed_trace_repository: Hermes::DependenciesContainer["distributed_trace_repository"], rpc_call_timeout: nil) ⇒ RpcClient
constructor
A new instance of RpcClient.
Constructor Details
#initialize(publisher: Hermes::DependenciesContainer["publisher"], config: Hermes::DependenciesContainer["config"], distributed_trace_repository: Hermes::DependenciesContainer["distributed_trace_repository"], rpc_call_timeout: nil) ⇒ RpcClient
Returns a new instance of RpcClient.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/hermes/rpc_client.rb', line 18 def initialize(publisher: Hermes::DependenciesContainer["publisher"], config: Hermes::DependenciesContainer["config"], distributed_trace_repository: Hermes::DependenciesContainer["distributed_trace_repository"], rpc_call_timeout: nil) @config = config @distributed_trace_repository = distributed_trace_repository @broker = Hutch::Broker.new instrumenter.instrument("Hermes.RpcClient.broker_connect") do @connection = broker.open_connection end @lock = Mutex.new @condition = ConditionVariable.new @rpc_call_timeout = rpc_call_timeout || config.rpc_call_timeout @consumer = Bunny::Consumer.new(channel, DIRECT_REPLY_TO, SecureRandom.uuid) consumer.on_delivery do |_delivery_info, , received_payload| handle_delivery([:headers].to_h, received_payload) end end |
Instance Attribute Details
#publisher ⇒ Object (readonly)
Returns the value of attribute publisher.
7 8 9 |
# File 'lib/hermes/rpc_client.rb', line 7 def publisher @publisher end |
Class Method Details
.call(event) ⇒ Object
9 10 11 |
# File 'lib/hermes/rpc_client.rb', line 9 def self.call(event) new.call(event) end |
Instance Method Details
#call(event) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/hermes/rpc_client.rb', line 37 def call(event) begin instrumenter.instrument("Hermes.RpcClient.call") do channel.basic_consume_with(consumer) event.origin_headers ||= Hermes.origin_headers lock.synchronize do = { routing_key: event.routing_key, reply_to: DIRECT_REPLY_TO, persistence: false, headers: event.to_headers } topic_exchange.publish(event.as_json.to_json, ) distributed_trace_repository.create(event) condition.wait(lock, rpc_call_timeout) end end rescue StandardError => error close_connection raise error ensure consumer.cancel if connection.open? end close_connection handle_response end |