Module: QueueingRabbit

Extended by:
MonitorMixin, QueueingRabbit, Callbacks, Configuration, Logging
Included in:
QueueingRabbit
Defined in:
lib/queueing_rabbit.rb,
lib/queueing_rabbit/bus.rb,
lib/queueing_rabbit/job.rb,
lib/queueing_rabbit/worker.rb,
lib/queueing_rabbit/logging.rb,
lib/queueing_rabbit/version.rb,
lib/queueing_rabbit/callbacks.rb,
lib/queueing_rabbit/serializer.rb,
lib/queueing_rabbit/client/amqp.rb,
lib/queueing_rabbit/client/bunny.rb,
lib/queueing_rabbit/configuration.rb,
lib/queueing_rabbit/jobs/json_job.rb,
lib/queueing_rabbit/buses/json_bus.rb,
lib/queueing_rabbit/client/callbacks.rb,
lib/queueing_rabbit/jobs/abstract_job.rb,
lib/queueing_rabbit/buses/abstract_bus.rb,
lib/queueing_rabbit/extensions/threaded.rb,
lib/queueing_rabbit/extensions/new_relic.rb,
lib/queueing_rabbit/extensions/retryable.rb,
lib/queueing_rabbit/extensions/direct_exchange.rb,
lib/queueing_rabbit/misc/inheritable_class_variables.rb

Defined Under Namespace

Modules: Bus, Callbacks, Client, Configuration, InheritableClassVariables, Job, JobExtensions, Logging, Serializer Classes: AbstractBus, AbstractJob, JSONBus, JSONJob, JobNotFoundError, JobNotPresentError, QueueingRabbitError, Worker

Constant Summary collapse

VERSION =
"0.6.0"

Instance Attribute Summary collapse

Attributes included from Configuration

#amqp_uri, #heartbeat, #tcp_timeout

Instance Method Summary collapse

Methods included from Callbacks

after_consuming, before_consuming, on_consumer_error, on_event_machine_start, setup_callback, trigger_event

Methods included from Configuration

configure, default_client, jobs_wait_timeout

Instance Attribute Details

#clientObject

Returns the value of attribute client.



32
33
34
# File 'lib/queueing_rabbit.rb', line 32

def client
  @client
end

#loggerObject

Returns the value of attribute logger.



32
33
34
# File 'lib/queueing_rabbit.rb', line 32

def logger
  @logger
end

Instance Method Details

#begin_worker_loopObject



86
87
88
89
90
# File 'lib/queueing_rabbit.rb', line 86

def begin_worker_loop
  conn.begin_worker_loop do
    yield
  end
end

#connectObject Also known as: conn, connection



34
35
36
37
38
# File 'lib/queueing_rabbit.rb', line 34

def connect
  synchronize do
    @connection ||= client.connect
  end
end

#connected?Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/queueing_rabbit.rb', line 51

def connected?
  @connection && @connection.open?
end

#disconnectObject



42
43
44
45
46
47
48
49
# File 'lib/queueing_rabbit.rb', line 42

def disconnect
  synchronize do
    if connected?
      @connection.close
    end
    drop_connection
  end
end

#drop_connectionObject



55
56
57
# File 'lib/queueing_rabbit.rb', line 55

def drop_connection
  @connection = nil
end

#enqueue(job, payload = nil, options = {}) ⇒ Object



59
60
61
62
63
64
65
66
67
68
# File 'lib/queueing_rabbit.rb', line 59

def enqueue(job, payload = nil, options = {})
  info "enqueueing job #{job}"

  follow_job_requirements(job) do |channel, exchange, _|
    publish_to_exchange(exchange, payload, options)
    channel.close
  end

  true
end

#follow_bus_requirements(bus) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/queueing_rabbit.rb', line 106

def follow_bus_requirements(bus)
  ret = nil
  conn.open_channel(bus.channel_options) do |ch, _|
    conn.define_exchange(ch, bus.exchange_name, bus.exchange_options) do |ex|
      ret = yield ch, ex
    end
  end
  ret
end

#follow_job_requirements(job) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/queueing_rabbit.rb', line 92

def follow_job_requirements(job)
  ret = nil
  follow_bus_requirements(job) do |ch, ex|
    conn.define_queue(ch, job.queue_name, job.queue_options) do |q|
      if job.bind_queue?
        job.binding_declarations.each { |o| conn.bind_queue(q, ex, o) }
      end

      ret = yield ch, ex, q
    end
  end
  ret
end

#publish(bus, payload = nil, options = {}) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/queueing_rabbit.rb', line 70

def publish(bus, payload = nil, options = {})
  info "publishing to event bus #{bus}"

  follow_bus_requirements(bus) do |channel, exchange|
    publish_to_exchange(exchange, payload, options)
    channel.close
  end

  true
end

#publish_to_exchange(exchange, payload = nil, options = {}) ⇒ Object



81
82
83
84
# File 'lib/queueing_rabbit.rb', line 81

def publish_to_exchange(exchange, payload = nil, options = {})
  conn.publish(exchange, payload, options)
  true
end

#purge_queue(job) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/queueing_rabbit.rb', line 126

def purge_queue(job)
  connection.open_channel(job.channel_options) do |c, _|
    connection.define_queue(c, job.queue_name, job.queue_options) do |q|
      connection.purge_queue(q) { c.close }
    end
  end
  true
end

#queue_size(job) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/queueing_rabbit.rb', line 116

def queue_size(job)
  size = 0
  connection.open_channel(job.channel_options) do |c, _|
    queue = connection.define_queue(c, job.queue_name, job.queue_options)
    size = connection.queue_size(queue)
    c.close
  end
  size
end