Class: Thrift::AMQPClientTransport

Inherits:
BaseTransport
  • Object
show all
Defined in:
lib/thrift/amqp/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(amqp_uri, exchange_name, routing_key, opts = {}) ⇒ AMQPClientTransport

Returns a new instance of AMQPClientTransport.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/thrift/amqp/client.rb', line 16

def initialize(amqp_uri, exchange_name, routing_key, opts = {})
  @outbuf = Bytes.empty_byte_buffer
  @inbuf_r, @inbuf_w = IO.pipe(binmode: true)
  @inbuf_w.set_encoding('binary')

  if opts[:channel]
    @channel = opts[:channel]
  else
    @conn = Bunny.new(amqp_uri)
  end

  @opened = false
  @handle_conn_lifecycle = opts[:channel].nil?
  @exchange_name = exchange_name
  @routing_key = routing_key
  @oneway = opts.fetch(:oneway, false)
end

Class Method Details

.from_channel(channel, exchange_name, routing_key) ⇒ Object



11
12
13
# File 'lib/thrift/amqp/client.rb', line 11

def from_channel(channel, exchange_name, routing_key)
  new(nil, exchange_name, routing_key, channel: channel)
end

Instance Method Details

#closeObject



64
65
66
67
68
69
70
# File 'lib/thrift/amqp/client.rb', line 64

def close
  if open?
    @reply_queue.delete unless @oneway
    @channel.close if @handle_conn_lifecycle
    @opened = false
  end
end

#flushObject



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/thrift/amqp/client.rb', line 84

def flush
  open unless open?

  @service_exchange.publish(
    @outbuf,
    routing_key: @routing_key,
    correlation_id: generate_uuid,
    reply_to: @oneway ? '' : @reply_queue.name
  )

  @outbuf = Bytes.empty_byte_buffer
end

#openObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/thrift/amqp/client.rb', line 34

def open
  return if open?

  if @channel.nil? || !@channel.open?
    unless @conn
      raise TransportException.new(
        TransportException::NOT_OPEN, 'channel cosed'
      )
    end

    @conn.start
    @channel = @conn.create_channel
  end

  @service_exchange = @channel.exchange(@exchange_name)

  unless @oneway
    @reply_queue = @channel.queue('', auto_delete: true, exclusive: true)

    @reply_queue.subscribe(
      block: false, manual_ack: true
    ) do |delivery_info, properties, payload|
      @inbuf_w << Bytes.force_binary_encoding(payload)
      @channel.acknowledge(delivery_info.delivery_tag, false)
    end
  end
  @opened = true
end

#open?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/thrift/amqp/client.rb', line 72

def open?
  @opened && @channel && @channel.open?
end

#read(sz) ⇒ Object



76
77
78
# File 'lib/thrift/amqp/client.rb', line 76

def read(sz)
  @inbuf_r.read(sz)
end

#write(buf) ⇒ Object



80
81
82
# File 'lib/thrift/amqp/client.rb', line 80

def write(buf)
  @outbuf << Bytes.force_binary_encoding(buf)
end