Class: Wire::RabbitMQTransport
- Inherits:
-
Object
- Object
- Wire::RabbitMQTransport
- Defined in:
- lib/wire.rb
Instance Method Summary collapse
-
#initialize(host, port) ⇒ RabbitMQTransport
constructor
A new instance of RabbitMQTransport.
- #initialize_service(service_name) ⇒ Object
- #transmit(service_name, version, invocation_signal, timeout_seconds) ⇒ Object
- #transmit_in_only(service_name, version, invocation_signal) ⇒ Object
Constructor Details
#initialize(host, port) ⇒ RabbitMQTransport
Returns a new instance of RabbitMQTransport.
103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/wire.rb', line 103 def initialize (host, port) @connection = Bunny.new("amqp://#{host}:#{port}") @connection.start @semaphore = Mutex.new @uuid = UUID.new @caller_id = @uuid.generate @topic_hash = {} @message_hash = {} end |
Instance Method Details
#initialize_service(service_name) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/wire.rb', line 116 def initialize_service (service_name) @semaphore.synchronize do topic = @topic_hash[service_name] if topic == nil request_channel = @connection.create_channel topic = request_channel.topic("#{service_name}.request") response_queue_name = @uuid.generate response_channel = @connection.create_channel response_exchange = response_channel.topic("#{service_name}.response") response_queue = response_channel.queue("#{response_queue_name}", {:durable => false, :exclusive => true, :auto_delete => true}) response_queue.bind(response_exchange, :routing_key => "#{@caller_id}").subscribe do |delivery_info, , payload| if ( = @message_hash[[:correlation_id]]) != nil result_body = JSON.parse(payload) .set_result(result_body) end end end return topic end end |
#transmit(service_name, version, invocation_signal, timeout_seconds) ⇒ Object
145 146 147 148 149 150 151 |
# File 'lib/wire.rb', line 145 def transmit(service_name, version, invocation_signal, timeout_seconds) = "#{@uuid.generate}".freeze @message_hash[] = MessageCallback.new(@message_hash, , = Message.new, timeout_seconds) initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{}", :timestamp => Time.new.to_i) end |
#transmit_in_only(service_name, version, invocation_signal) ⇒ Object
141 142 143 |
# File 'lib/wire.rb', line 141 def transmit_in_only(service_name, version, invocation_signal) initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{@uuid.generate}", :timestamp => Time.new.to_i) end |