Class: Thrift::AMQPClientTransport
- Inherits:
-
BaseTransport
- Object
- BaseTransport
- Thrift::AMQPClientTransport
- Defined in:
- lib/thrift/amqp/client.rb
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #flush ⇒ Object
-
#initialize(amqp_uri, exchange_name, routing_key, opts = {}) ⇒ AMQPClientTransport
constructor
A new instance of AMQPClientTransport.
- #open ⇒ Object
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
- #write(buf) ⇒ Object
Constructor Details
#initialize(amqp_uri, exchange_name, routing_key, opts = {}) ⇒ AMQPClientTransport
Returns a new instance of AMQPClientTransport.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/thrift/amqp/client.rb', line 16 def initialize(amqp_uri, exchange_name, routing_key, opts = {}) @outbuf = Bytes.empty_byte_buffer @inbuf_r, @inbuf_w = IO.pipe(binmode: true) @inbuf_w.set_encoding('binary') if opts[:channel] @channel = opts[:channel] else @conn = Bunny.new(amqp_uri) end @opened = false @handle_conn_lifecycle = opts[:channel].nil? @exchange_name = exchange_name @routing_key = routing_key @oneway = opts.fetch(:oneway, false) end |
Class Method Details
.from_channel(channel, exchange_name, routing_key) ⇒ Object
11 12 13 |
# File 'lib/thrift/amqp/client.rb', line 11 def from_channel(channel, exchange_name, routing_key) new(nil, exchange_name, routing_key, channel: channel) end |
Instance Method Details
#close ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/thrift/amqp/client.rb', line 64 def close if open? @reply_queue.delete unless @oneway @channel.close if @handle_conn_lifecycle @opened = false end end |
#flush ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/thrift/amqp/client.rb', line 84 def flush open unless open? @service_exchange.publish( @outbuf, routing_key: @routing_key, correlation_id: generate_uuid, reply_to: @oneway ? '' : @reply_queue.name ) @outbuf = Bytes.empty_byte_buffer end |
#open ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/thrift/amqp/client.rb', line 34 def open return if open? if @channel.nil? || !@channel.open? unless @conn raise TransportException.new( TransportException::NOT_OPEN, 'channel cosed' ) end @conn.start @channel = @conn.create_channel end @service_exchange = @channel.exchange(@exchange_name) unless @oneway @reply_queue = @channel.queue('', auto_delete: true, exclusive: true) @reply_queue.subscribe( block: false, manual_ack: true ) do |delivery_info, properties, payload| @inbuf_w << Bytes.force_binary_encoding(payload) @channel.acknowledge(delivery_info.delivery_tag, false) end end @opened = true end |
#open? ⇒ Boolean
72 73 74 |
# File 'lib/thrift/amqp/client.rb', line 72 def open? @opened && @channel && @channel.open? end |
#read(sz) ⇒ Object
76 77 78 |
# File 'lib/thrift/amqp/client.rb', line 76 def read(sz) @inbuf_r.read(sz) end |
#write(buf) ⇒ Object
80 81 82 |
# File 'lib/thrift/amqp/client.rb', line 80 def write(buf) @outbuf << Bytes.force_binary_encoding(buf) end |