Class: PikaQue::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/broker.rb

Instance Method Summary collapse

Constructor Details

#initialize(processor = nil, opts = {}) ⇒ Broker

Returns a new instance of Broker.



7
8
9
10
11
# File 'lib/pika_que/broker.rb', line 7

def initialize(processor = nil, opts = {})
  @opts = PikaQue.config.merge(opts)
  @processor = processor
  @handlers = {}
end

Instance Method Details

#channelObject



64
65
66
# File 'lib/pika_que/broker.rb', line 64

def channel
  @channel ||= init_channel
end

#cleanup(force = false) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/pika_que/broker.rb', line 74

def cleanup(force = false)
  if (@processor && force) || !@processor
    @channel.close unless @channel.closed?
    @channel = nil
    @exchange = nil
    if @default_handler
      @default_handler.close
      @default_handler = nil
    end
    @handlers.values.each(&:close)
  end
end

#default_handlerObject



56
57
58
# File 'lib/pika_que/broker.rb', line 56

def default_handler
  @default_handler ||= @opts[:handler_class] ? PikaQue::Util.constantize(@opts[:handler_class]).new(@opts[:handler_options].merge({ connection: @connection })) : PikaQue::Handlers::DefaultHandler.new
end

#exchangeObject



60
61
62
# File 'lib/pika_que/broker.rb', line 60

def exchange
  @exchange ||= channel.exchange(@opts[:exchange], @opts[:exchange_options])
end

#handler(handler_class, handler_opts = {}) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pika_que/broker.rb', line 42

def handler(handler_class, handler_opts = {})
  if handler_class
    h_key = "#{handler_class}-#{handler_opts.hash}"
    _handler = @handlers[h_key]
    unless _handler
      _handler = handler_class.new(handler_opts.merge({ connection: @connection }))
      @handlers[h_key] = _handler
    end
    _handler
  else
    default_handler
  end
end

#init_channelObject



68
69
70
71
72
# File 'lib/pika_que/broker.rb', line 68

def init_channel
  @connection.create_channel(nil, @opts[:channel_options][:consumer_pool_size]).tap do |ch|
    ch.prefetch(@opts[:channel_options][:prefetch])
  end
end

#local_connection?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/pika_que/broker.rb', line 22

def local_connection?
  @opts[:connection_options] || @processor.nil?
end

#queue(queue_name, queue_opts = {}) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pika_que/broker.rb', line 26

def queue(queue_name, queue_opts = {})
  begin
    queue = channel.queue(queue_name, queue_opts)
    routing_key = queue_opts[:routing_key] || queue_name
    routing_keys = [routing_key, *queue_opts[:routing_keys]]

    routing_keys.each do |key|
      queue.bind(exchange, routing_key: key)
    end
    queue
  rescue => e
    PikaQue.logger.fatal e.message
    raise SetupError.new e.message
  end
end

#startObject



13
14
15
16
# File 'lib/pika_que/broker.rb', line 13

def start
  @connection ||= @opts[:connection_options] ? PikaQue::Connection.create(@opts[:connection_options]) : PikaQue.connection
  @connection.ensure_connection
end

#stopObject



18
19
20
# File 'lib/pika_que/broker.rb', line 18

def stop
  @connection.disconnect! if local_connection?
end