Module: QueueingRabbit
- Extended by:
- MonitorMixin, QueueingRabbit, Callbacks, Configuration, Logging
- Included in:
- QueueingRabbit
- Defined in:
- lib/queueing_rabbit.rb,
lib/queueing_rabbit/bus.rb,
lib/queueing_rabbit/job.rb,
lib/queueing_rabbit/worker.rb,
lib/queueing_rabbit/logging.rb,
lib/queueing_rabbit/version.rb,
lib/queueing_rabbit/callbacks.rb,
lib/queueing_rabbit/serializer.rb,
lib/queueing_rabbit/client/amqp.rb,
lib/queueing_rabbit/client/bunny.rb,
lib/queueing_rabbit/configuration.rb,
lib/queueing_rabbit/jobs/json_job.rb,
lib/queueing_rabbit/buses/json_bus.rb,
lib/queueing_rabbit/client/callbacks.rb,
lib/queueing_rabbit/jobs/abstract_job.rb,
lib/queueing_rabbit/buses/abstract_bus.rb,
lib/queueing_rabbit/extensions/threaded.rb,
lib/queueing_rabbit/extensions/new_relic.rb,
lib/queueing_rabbit/extensions/retryable.rb,
lib/queueing_rabbit/extensions/direct_exchange.rb,
lib/queueing_rabbit/misc/inheritable_class_variables.rb
Defined Under Namespace
Modules: Bus, Callbacks, Client, Configuration, InheritableClassVariables, Job, JobExtensions, Logging, Serializer
Classes: AbstractBus, AbstractJob, JSONBus, JSONJob, JobNotFoundError, JobNotPresentError, QueueingRabbitError, Worker
Constant Summary
collapse
- VERSION =
"0.4.0"
Instance Attribute Summary collapse
#amqp_uri, #heartbeat, #tcp_timeout
Instance Method Summary
collapse
Methods included from Callbacks
after_consuming, before_consuming, on_event_machine_start, setup_callback, trigger_event
configure, default_client
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
32
33
34
|
# File 'lib/queueing_rabbit.rb', line 32
def client
@client
end
|
#logger ⇒ Object
Returns the value of attribute logger.
32
33
34
|
# File 'lib/queueing_rabbit.rb', line 32
def logger
@logger
end
|
Instance Method Details
#begin_worker_loop ⇒ Object
86
87
88
89
90
|
# File 'lib/queueing_rabbit.rb', line 86
def begin_worker_loop
conn.begin_worker_loop do
yield
end
end
|
#connect ⇒ Object
Also known as:
conn, connection
34
35
36
37
38
|
# File 'lib/queueing_rabbit.rb', line 34
def connect
synchronize do
@connection ||= client.connect
end
end
|
#connected? ⇒ Boolean
51
52
53
|
# File 'lib/queueing_rabbit.rb', line 51
def connected?
@connection && @connection.open?
end
|
#disconnect ⇒ Object
42
43
44
45
46
47
48
49
|
# File 'lib/queueing_rabbit.rb', line 42
def disconnect
synchronize do
if connected?
@connection.close
end
drop_connection
end
end
|
#drop_connection ⇒ Object
55
56
57
|
# File 'lib/queueing_rabbit.rb', line 55
def drop_connection
@connection = nil
end
|
#enqueue(job, payload = nil, options = {}) ⇒ Object
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/queueing_rabbit.rb', line 59
def enqueue(job, payload = nil, options = {})
info "enqueueing job #{job}"
follow_job_requirements(job) do |channel, exchange, _|
publish_to_exchange(exchange, payload, options)
channel.close
end
true
end
|
#follow_bus_requirements(bus) ⇒ Object
104
105
106
107
108
109
110
|
# File 'lib/queueing_rabbit.rb', line 104
def follow_bus_requirements(bus)
conn.open_channel(bus.channel_options) do |ch, _|
conn.define_exchange(ch, bus.exchange_name, bus.exchange_options) do |ex|
yield ch, ex
end
end
end
|
#follow_job_requirements(job) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/queueing_rabbit.rb', line 92
def follow_job_requirements(job)
follow_bus_requirements(job) do |ch, ex|
conn.define_queue(ch, job.queue_name, job.queue_options) do |q|
if job.bind_queue?
job.binding_declarations.each { |o| conn.bind_queue(q, ex, o) }
end
yield ch, ex, q
end
end
end
|
#publish(bus, payload = nil, options = {}) ⇒ Object
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/queueing_rabbit.rb', line 70
def publish(bus, payload = nil, options = {})
info "publishing to event bus #{bus}"
follow_bus_requirements(bus) do |channel, exchange|
publish_to_exchange(exchange, payload, options)
channel.close
end
true
end
|
#publish_to_exchange(exchange, payload = nil, options = {}) ⇒ Object
81
82
83
84
|
# File 'lib/queueing_rabbit.rb', line 81
def publish_to_exchange(exchange, payload = nil, options = {})
conn.publish(exchange, payload, options)
true
end
|
#purge_queue(job) ⇒ Object
122
123
124
125
126
127
128
129
|
# File 'lib/queueing_rabbit.rb', line 122
def purge_queue(job)
connection.open_channel(job.channel_options) do |c, _|
connection.define_queue(c, job.queue_name, job.queue_options) do |q|
connection.purge_queue(q) { c.close }
end
end
true
end
|
#queue_size(job) ⇒ Object
112
113
114
115
116
117
118
119
120
|
# File 'lib/queueing_rabbit.rb', line 112
def queue_size(job)
size = 0
connection.open_channel(job.channel_options) do |c, _|
queue = connection.define_queue(c, job.queue_name, job.queue_options)
size = connection.queue_size(queue)
c.close
end
size
end
|