Class: Jackhammer::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/jackhammer/topic.rb

Constant Summary collapse

QUEUE_NAME_KEY =
'queue_name'.freeze
ROUTING_KEY_KEY =
'routing_key'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(name:, queue_config:, options: {}) ⇒ Topic

Returns a new instance of Topic.



6
7
8
9
# File 'lib/jackhammer/topic.rb', line 6

def initialize(name:, queue_config:, options: {})
  @topic = Jackhammer.channel.topic(name, options)
  @queue_config = normalize_queue_config(queue_config)
end

Instance Method Details

#publish(message, options) ⇒ Object

We’re expecting the client to specify at least the routing_key in options for each message published.



17
18
19
20
21
# File 'lib/jackhammer/topic.rb', line 17

def publish(message, options)
  Jackhammer.client_middleware.call(message, Jackhammer.publish_options(options)) do |msg, opts|
    topic.publish msg, opts
  end
end

#queuesObject



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/jackhammer/topic.rb', line 23

def queues
  return @queues if @queues

  @queues = queue_config.map do |options|
    handler = MessageReceiver.new(options.delete('handler'))
    routing_key = fetch_and_delete_key(options, ROUTING_KEY_KEY)
    queue_name = options.delete(QUEUE_NAME_KEY) || QueueName.from_routing_key(routing_key)
    queue = Jackhammer.channel.queue(queue_name, options)
    Log.info { "'#{queue_name}' configured to subscribe on '#{routing_key}'" }
    Queue.new(topic: topic, queue: queue, handler: handler, routing_key: routing_key)
  end
end

#subscribe_queuesObject



11
12
13
# File 'lib/jackhammer/topic.rb', line 11

def subscribe_queues
  queues.each(&:subscribe)
end