Module: QueueingRabbit::Bus

Included in:
AbstractBus, Job
Defined in:
lib/queueing_rabbit/bus.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#shared_exchangeObject (readonly)

Returns the value of attribute shared_exchange.



13
14
15
# File 'lib/queueing_rabbit/bus.rb', line 13

def shared_exchange
  @shared_exchange
end

Class Method Details

.extended(othermod) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/queueing_rabbit/bus.rb', line 4

def self.extended(othermod)
  othermod.extend(QueueingRabbit::InheritableClassVariables)

  othermod.class_eval do
    inheritable_variables :channel_options, :exchange_name,
                          :exchange_options, :publishing_defaults
  end
end

Instance Method Details

#batch_publishing?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/queueing_rabbit/bus.rb', line 54

def batch_publishing?
  !!@shared_exchange
end

#channel(options = {}) ⇒ Object



15
16
17
18
# File 'lib/queueing_rabbit/bus.rb', line 15

def channel(options = {})
  @channel_options ||= {}
  @channel_options.update(options)
end

#channel_optionsObject



20
21
22
# File 'lib/queueing_rabbit/bus.rb', line 20

def channel_options
  @channel_options ||= {}
end

#demand_batch_publishing!Object



48
49
50
51
52
# File 'lib/queueing_rabbit/bus.rb', line 48

def demand_batch_publishing!
  QueueingRabbit.follow_bus_requirements(self) do |_, exchange|
    @shared_exchange = exchange
  end
end

#exchange(*args) ⇒ Object



24
25
26
27
28
29
# File 'lib/queueing_rabbit/bus.rb', line 24

def exchange(*args)
  @exchange_options ||= {}
  name, options = extract_name_and_options(*args)
  @exchange_name = name if name
  @exchange_options.update(options) if options
end

#exchange_nameObject



31
32
33
# File 'lib/queueing_rabbit/bus.rb', line 31

def exchange_name
  @exchange_name || ''
end

#exchange_optionsObject



35
36
37
# File 'lib/queueing_rabbit/bus.rb', line 35

def exchange_options
  @exchange_options || {}
end

#publish(payload, options = {}, method = :publish) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/queueing_rabbit/bus.rb', line 58

def publish(payload, options = {}, method = :publish)
  args = [payload, publishing_defaults.merge(options)]
  
  if batch_publishing?
    QueueingRabbit.publish_to_exchange(@shared_exchange, *args)
  else
    QueueingRabbit.send(method, self, *args)
  end
end

#publish_with(options = {}) ⇒ Object



39
40
41
42
# File 'lib/queueing_rabbit/bus.rb', line 39

def publish_with(options = {})
  @publishing_defaults ||= {}
  @publishing_defaults.update(options)
end

#publishing_defaultsObject



44
45
46
# File 'lib/queueing_rabbit/bus.rb', line 44

def publishing_defaults
  @publishing_defaults || {}
end