Class: Thrift::AMQPServer

Inherits:
BaseServer
  • Object
show all
Defined in:
lib/thrift/amqp/server.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

15s

15_000

Instance Method Summary collapse

Constructor Details

#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer

Returns a new instance of AMQPServer.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/thrift/amqp/server.rb', line 13

def initialize(processor, iprot_factory, oprot_factory = nil, opts = {})
  @processor = processor
  @iprot_factory = iprot_factory
  @oprot_factory = oprot_factory || iprot_factory

  @queue_name = opts[:queue_name]
  @amqp_uri = opts[:amqp_uri]
  @routing_key = opts[:routing_key]
  @exchange_name = opts[:exchange_name]
  @prefetch = (ENV['QOS_SIZE'] || opts[:prefetch]).to_i
  @timeout = opts[:timeout] ? opts[:timeout] * 1000 : DEFAULT_TIMEOUT
  @consumer_tag = opts[:consumer_tag]
  @fetching_disabled = ENV['RABBITMQ_QOS'] == '0'
  @queue_declare_args = opts[:queue_declare_args] || { durable: true }
end

Instance Method Details

#handle(delivery_info, properties, payload) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/thrift/amqp/server.rb', line 29

def handle(delivery_info, properties, payload)
  input = StringIO.new payload
  out = StringIO.new
  transport = IOStreamTransport.new input, out
  protocol = @iprot_factory.get_protocol transport

  begin
    @processor.process protocol, protocol

    if out.length > 0
      out.rewind
      @channel.default_exchange.publish(
        out.read(out.length),
        routing_key: properties.reply_to
      )
    end
  rescue => e
    LOGGER.error("Processor failure #{e}")
  end
end

#serveObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/thrift/amqp/server.rb', line 50

def serve
  @conn = Bunny.new(@amqp_uri, continuation_timeout: @timeout * 1000)

  @conn.start
  @channel = @conn.create_channel(nil, @prefetch == 0 ? 1 : @prefetch)

  exchange = @channel.direct(@exchange_name)
  queue = @channel.queue(@queue_name, @queue_declare_args)
  queue.bind exchange, routing_key: @routing_key
  @consumer_tag ||= @channel.generate_consumer_tag

  @channel.prefetch @prefetch

  loop do
    if @fetching_disabled
      LOGGER.info("Fetching disabled")
      sleep @timeout
      next
    end

    LOGGER.info("Fetching message from #{@queue_name}")
    queue.subscribe(
      manual_ack: true,
      block: true,
      consumer_tag: @consumer_tag
    ) do |delivery_info, properties, payload|
      begin
        if @timeout
          begin
            Timeout.timeout(@timeout) do
              handle(delivery_info, properties, payload)
            end
          rescue Timeout::Error
            LOGGER.info("Timeout raised")
          end
        else
          handle delivery_info, properties, payload
        end
      rescue => e
        LOGGER.info("Error happened: #{e}")
      end
      @channel.acknowledge(delivery_info.delivery_tag, false)
    end
  end
rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::ConnectionClosedError
  LOGGER.error("Can't establish the connection")
  sleep 5

  retry
end