Module: QueueingRabbit::Job

Includes:
Bus
Included in:
AbstractJob
Defined in:
lib/queueing_rabbit/job.rb

Instance Attribute Summary

Attributes included from Bus

#shared_exchange

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Bus

#batch_publishing?, #channel, #channel_options, #exchange, #exchange_name, #exchange_options, #publish, #publish_with

Class Method Details

.extended(othermod) ⇒ Object



6
7
8
9
10
11
12
13
14
15
# File 'lib/queueing_rabbit/job.rb', line 6

def self.extended(othermod)
  othermod.extend(QueueingRabbit::InheritableClassVariables)

  othermod.class_eval do
    inheritable_variables :queue_name, :queue_options, :channel_options,
                          :exchange_name, :exchange_options,
                          :binding_declarations, :listening_options,
                          :publishing_defaults
  end
end

Instance Method Details

#bind(options = {}) ⇒ Object



36
37
38
39
# File 'lib/queueing_rabbit/job.rb', line 36

def bind(options = {})
  @binding_declarations ||= []
  @binding_declarations << options
end

#bind_queue?Boolean

Returns:

  • (Boolean)


45
46
47
48
49
# File 'lib/queueing_rabbit/job.rb', line 45

def bind_queue?
  exchange_options[:type] &&
      exchange_options[:type] != :default &&
      !binding_declarations.empty?
end

#binding_declarationsObject



41
42
43
# File 'lib/queueing_rabbit/job.rb', line 41

def binding_declarations
  @binding_declarations || []
end

#demand_batch_publishing!Object



66
67
68
69
70
# File 'lib/queueing_rabbit/job.rb', line 66

def demand_batch_publishing!
  QueueingRabbit.follow_job_requirements(self) do |_, exchange, _|
    @shared_exchange = exchange
  end
end

#enqueue(payload, options = {}) ⇒ Object



72
73
74
# File 'lib/queueing_rabbit/job.rb', line 72

def enqueue(payload, options = {})
  publish(payload, options, :enqueue)
end

#listen(options = {}) ⇒ Object Also known as: subscribe



55
56
57
58
# File 'lib/queueing_rabbit/job.rb', line 55

def listen(options = {})
  @listening_options ||= {}
  @listening_options.update(options)
end

#listening_optionsObject



51
52
53
# File 'lib/queueing_rabbit/job.rb', line 51

def listening_options
  @listening_options || {}
end

#publishing_defaultsObject



61
62
63
64
# File 'lib/queueing_rabbit/job.rb', line 61

def publishing_defaults
  @publishing_defaults ||= {}
  {:routing_key => queue_name.to_s}.merge(@publishing_defaults)
end

#queue(*args) ⇒ Object



17
18
19
20
21
22
# File 'lib/queueing_rabbit/job.rb', line 17

def queue(*args)
  @queue_options ||= {}
  name, options = extract_name_and_options(*args)
  @queue_name = name if name
  @queue_options.update(options) if options
end

#queue_nameObject



24
25
26
# File 'lib/queueing_rabbit/job.rb', line 24

def queue_name
  @queue_name || (self.name.split('::')[-1] if self.name)
end

#queue_optionsObject



28
29
30
# File 'lib/queueing_rabbit/job.rb', line 28

def queue_options
  @queue_options || {}
end

#queue_sizeObject



32
33
34
# File 'lib/queueing_rabbit/job.rb', line 32

def queue_size
  QueueingRabbit.queue_size(self)
end