Class: Wire::RabbitMQTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/wire.rb

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ RabbitMQTransport

Returns a new instance of RabbitMQTransport.



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/wire.rb', line 103

def initialize (host, port)
  @connection = Bunny.new("amqp://#{host}:#{port}")
  @connection.start

  @semaphore = Mutex.new

  @uuid = UUID.new
  @caller_id = @uuid.generate

  @topic_hash = {}
  @message_hash = {}
end

Instance Method Details

#initialize_service(service_name) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/wire.rb', line 116

def initialize_service (service_name)
  @semaphore.synchronize do
    topic = @topic_hash[service_name]

    if topic == nil
      request_channel = @connection.create_channel
      topic = request_channel.topic("#{service_name}.request")

      response_queue_name = @uuid.generate
      response_channel = @connection.create_channel
      response_exchange = response_channel.topic("#{service_name}.response")
      response_queue = response_channel.queue("#{response_queue_name}", {:durable => false, :exclusive => true, :auto_delete => true})

      response_queue.bind(response_exchange, :routing_key => "#{@caller_id}").subscribe do |delivery_info, , payload|
        if (message_callback = @message_hash[[:correlation_id]]) != nil
          result_body = JSON.parse(payload)
          message_callback.set_result(result_body)
        end
      end
    end

    return topic
  end
end

#transmit(service_name, version, invocation_signal, timeout_seconds) ⇒ Object



145
146
147
148
149
150
151
# File 'lib/wire.rb', line 145

def transmit(service_name, version, invocation_signal, timeout_seconds)
  message_id = "#{@uuid.generate}".freeze
  @message_hash[message_id] = MessageCallback.new(@message_hash, message_id, message = Message.new, timeout_seconds)
  initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{message_id}", :timestamp => Time.new.to_i)

  message
end

#transmit_in_only(service_name, version, invocation_signal) ⇒ Object



141
142
143
# File 'lib/wire.rb', line 141

def transmit_in_only(service_name, version, invocation_signal)
  initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{@uuid.generate}", :timestamp => Time.new.to_i)
end