Class: CarrotRpc::RpcClient

Inherits:
Object
  • Object
show all
Extended by:
ClientServer
Includes:
ClientActions
Defined in:
lib/carrot_rpc/rpc_client.rb

Overview

Generic class for all RPC Consumers. Use as a base class to build other RPC Consumers for related functionality. Let’s define a naming convention here for subclasses becuase I don’t want to write a Confluence doc. All subclasses should have the following naming convention: <Name>RpcConsumer ex: PostRpcConsumer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ClientServer

queue_name, queue_options, test_queue_name

Methods included from ClientActions

#create, #index, #show, #update

Constructor Details

#initialize(config = nil) ⇒ RpcClient

Use defaults for application level connection to RabbitMQ.

Examples:

pass custom Configuration class as an argument to override.

config = CarrotRpc::Configuration.new
config.rpc_client_timeout = 10
CarrotRpc::RpcClient.new(config)


28
29
30
31
# File 'lib/carrot_rpc/rpc_client.rb', line 28

def initialize(config = nil)
  @config = config || CarrotRpc.configuration
  @logger = @config.logger
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/carrot_rpc/rpc_client.rb', line 7

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



7
8
9
# File 'lib/carrot_rpc/rpc_client.rb', line 7

def logger
  @logger
end

#server_queueObject (readonly)

Returns the value of attribute server_queue.



7
8
9
# File 'lib/carrot_rpc/rpc_client.rb', line 7

def server_queue
  @server_queue
end

Class Method Details

.before_request(*proc) ⇒ Object



12
13
14
15
16
17
18
19
20
# File 'lib/carrot_rpc/rpc_client.rb', line 12

def self.before_request(*proc)
  if proc.length == 0
    @before_request
  elsif proc.length == 1
    @before_request = proc.first || CarrotRpc.configuration.before_request
  else
    fail ArgumentError
  end
end

Instance Method Details

#message(correlation_id:, method:, params:) ⇒ Object



139
140
141
142
143
144
145
146
# File 'lib/carrot_rpc/rpc_client.rb', line 139

def message(correlation_id:, method:, params:)
  {
    id:      correlation_id,
    jsonrpc: "2.0",
    method:  method,
    params:  params.except(:controller, :action)
  }
end

#publish(correlation_id:, method:, params:) ⇒ Object

A @reply_queue is deleted when the channel is closed. Closing the channel accounts for cleanup of the client @reply_queue.



130
131
132
133
134
135
136
137
# File 'lib/carrot_rpc/rpc_client.rb', line 130

def publish(correlation_id:, method:, params:)
  message = message(
    correlation_id: correlation_id,
    params:         params,
    method:         method
  )
  publish_payload(message.to_json, correlation_id: correlation_id)
end

#remote_call(remote_method, params) ⇒ Object

params is an array of method argument values programmer implementing this class must know about the remote service the remote service must have documented the methods and arguments in order for this pattern to work. TODO: change to a hash to account for keyword arguments???

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength

Parameters:

  • remote_method (String, Symbol)

    the method to be called on current receiver

  • params (Hash)

    the arguments for the method being called.

Returns:

  • (Object)

    the result of the method call.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/carrot_rpc/rpc_client.rb', line 76

def remote_call(remote_method, params)
  start
  subscribe
  server_queue_name = server_queue.name
  correlation_id = SecureRandom.uuid
  extra = { correlation_id: correlation_id }

  ActiveSupport::Notifications.instrument("client.#{server_queue_name}.remote_call", extra: extra) {
    logger.tagged("client", "server_queue=#{server_queue_name}", "correlation_id=#{correlation_id}") {
      params = self.class.before_request.call(params) if self.class.before_request
      publish(correlation_id: correlation_id, method: remote_method, params: request_key_formatter(params))
      wait_for_result(correlation_id)
    }
  }
end

#request_key_formatter(params) ⇒ Hash

Formats keys in the request data.

Parameters:

  • payload (Hash)

    request data to be sent to the remote server.

Returns:

  • (Hash)

    formatted data structure.



124
125
126
# File 'lib/carrot_rpc/rpc_client.rb', line 124

def request_key_formatter(params)
  CarrotRpc::Format.keys @config.rpc_client_request_key_format, params
end

#response_key_formatter(payload) ⇒ Hash

Formats keys in the response data.

Parameters:

  • payload (Hash)

    response data received from the remote server.

Returns:

  • (Hash)

    formatted data structure.



117
118
119
# File 'lib/carrot_rpc/rpc_client.rb', line 117

def response_key_formatter(payload)
  CarrotRpc::Format.keys @config.rpc_client_response_key_format, payload
end

#startObject

Starts the connection to listen for messages.

All RpcClient requests go to the a single @server_queue Responses come back over a unique queue name.



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/carrot_rpc/rpc_client.rb', line 37

def start
  # Create a new channel on each request because the channel should be closed after each request.
  @channel = @config.bunny.create_channel

  queue_name = self.class.test_queue_name(self.class.queue_name, @config.client_test_mode)
  # auto_delete => false keeps the queue around until RabbitMQ restarts or explicitly deleted
  options = { auto_delete: false }.merge(self.class.queue_options)
  @server_queue = @channel.queue(queue_name, options)

  # Setup a direct exchange.
  @exchange = @channel.default_exchange
end

#subscribeObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/carrot_rpc/rpc_client.rb', line 50

def subscribe
  # Empty queue name ends up creating a randomly named queue by RabbitMQ
  # Exclusive => queue will be deleted when connection closes. Allows for automatic "cleanup".
  @reply_queue = @channel.queue("", exclusive: true, auto_delete: true, durable: false)

  # setup a hash for results with a Queue object as a value
  @results = Hash.new { |h, k| h[k] = Queue.new }

  # setup subscribe block to Service
  # block => false is a non blocking IO option.
  @reply_queue.subscribe(block: false) do |delivery_info, properties, payload|
    consume(delivery_info, properties, payload)
  end
end

#wait_for_result(correlation_id) ⇒ Object

rubocop:enable Metrics/AbcSize rubocop:enable Metrics/MethodLength



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/carrot_rpc/rpc_client.rb', line 94

def wait_for_result(correlation_id)
  # Should be good to timeout here because we're blocking in the main thread here.
  Timeout.timeout(@config.rpc_client_timeout, CarrotRpc::Exception::RpcClientTimeout) do
    # `pop` is `Queue#pop`, so it is blocking on the receiving thread
    # and this must happend before the `Hash.delete` or
    # the receiving thread won't be able to find the correlation_id in @results
    result = @results[correlation_id].pop

    # If we get an exception, raise it in this thread, so the application can deal with it.
    if result.is_a? Exception
      fail result
    end

    result
  end
ensure
  @results.delete correlation_id # remove item from hash. prevents memory leak.
  @channel.close
end