Class: Urabbit::RPC::Client
- Inherits:
-
Object
- Object
- Urabbit::RPC::Client
- Defined in:
- lib/urabbit/rpc/client.rb
Overview
RPC::Client is a low level RPC client for RabbitMQ. It does not assume anything about message format.
Usage:
begin
client = RPC::Client.new
result = client.call(routing_key, )
rescue RPC::Client::Error => exception
puts exception.
puts excpetion.cause
end
routing_key - a function name message - a String with function params as JSON result - a String with the result as JSON or nil in case of error exception - an Exception with message describing what went wrong,
can be thrown during initialization and method calls.
It can also contain a cause raised from Bunny itself.
Defined Under Namespace
Classes: ServerError
Instance Method Summary collapse
- #call(routing_key, message, timeout = 10) ⇒ Object
-
#initialize ⇒ Client
constructor
A new instance of Client.
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/urabbit/rpc/client.rb', line 23 def initialize @channel = Urabbit.create_channel @exchange = @channel.default_exchange @reply_queue = @channel.queue("amq.rabbitmq.reply-to") @lock = Mutex.new @condition = ConditionVariable.new @reply_queue.subscribe do |delivery_info, properties, payload| if properties[:correlation_id] == @correlation_id # Headers are only present if explicitly set. if error = properties.to_hash.dig(:headers, 'error') @error = error else @result = payload end @lock.synchronize{@condition.signal} end end rescue Bunny::Exception raise Urabbit::Error.new("Error connecting to queue") end |
Instance Method Details
#call(routing_key, message, timeout = 10) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/urabbit/rpc/client.rb', line 47 def call(routing_key, , timeout = 10) @correlation_id = SecureRandom.uuid @exchange.publish(, routing_key: routing_key, correlation_id: @correlation_id, reply_to: "amq.rabbitmq.reply-to" ) @lock.synchronize{@condition.wait(@lock, timeout)} if @error.nil? && @result.nil? raise Urabbit::Error.new("Timed out waiting for reply. "\ "Make sure the RPC queue name is correct.") end if @error raise ServerError.new(@error['message']) else @result end rescue Bunny::Exception raise Urabbit::Error.new("Error communicating with queue") end |