Class: Hermes::RpcClient

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/hermes/rpc_client.rb

Defined Under Namespace

Classes: ResponseEvent, RpcTimeoutError

Constant Summary collapse

DIRECT_REPLY_TO =
"amq.rabbitmq.reply-to".freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(publisher: Hermes::DependenciesContainer["publisher"], config: Hermes::DependenciesContainer["config"], distributed_trace_repository: Hermes::DependenciesContainer["distributed_trace_repository"], rpc_call_timeout: nil) ⇒ RpcClient

Returns a new instance of RpcClient.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/hermes/rpc_client.rb', line 18

def initialize(publisher: Hermes::DependenciesContainer["publisher"],
config: Hermes::DependenciesContainer["config"],
distributed_trace_repository: Hermes::DependenciesContainer["distributed_trace_repository"],
rpc_call_timeout: nil)
  @config = config
  @distributed_trace_repository = distributed_trace_repository
  @broker = Hutch::Broker.new
  instrumenter.instrument("Hermes.RpcClient.broker_connect") do
    @connection = broker.open_connection
  end
  @lock = Mutex.new
  @condition = ConditionVariable.new
  @rpc_call_timeout = rpc_call_timeout || config.rpc_call_timeout
  @consumer = Bunny::Consumer.new(channel, DIRECT_REPLY_TO, SecureRandom.uuid)
  consumer.on_delivery do |_delivery_info, , received_payload|
    handle_delivery([:headers].to_h, received_payload)
  end
end

Instance Attribute Details

#publisherObject (readonly)

Returns the value of attribute publisher.



7
8
9
# File 'lib/hermes/rpc_client.rb', line 7

def publisher
  @publisher
end

Class Method Details

.call(event) ⇒ Object



9
10
11
# File 'lib/hermes/rpc_client.rb', line 9

def self.call(event)
  new.call(event)
end

Instance Method Details

#call(event) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/hermes/rpc_client.rb', line 37

def call(event)
  begin
    instrumenter.instrument("Hermes.RpcClient.call") do
      channel.basic_consume_with(consumer)
      event.origin_headers ||= Hermes.origin_headers

      lock.synchronize do
        options = {
          routing_key: event.routing_key,
          reply_to: DIRECT_REPLY_TO,
          persistence: false,
          headers: event.to_headers
        }
        topic_exchange.publish(event.as_json.to_json, options)
        distributed_trace_repository.create(event)
        condition.wait(lock, rpc_call_timeout)
      end
    end
  rescue StandardError => error
    close_connection
    raise error
  ensure
    consumer.cancel if connection.open?
  end

  close_connection
  handle_response
end