Class: Emque::Consuming::Adapters::RabbitMq::DelayedMessageWorker
Instance Method Summary
collapse
#delay_ms_time, #retry_error, #retryable_error_limit, #retryable_errors
included
Constructor Details
Returns a new instance of DelayedMessageWorker.
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 18
def initialize(connection)
self.channel = connection.create_channel
self.delayed_message_exchange = channel.exchange(
"emque.#{config.app_name}.delayed_message",
{
:type => "x-delayed-message",
:durable => true,
:auto_delete => false,
:arguments => {
"x-delayed-type" => "direct",
}
}
)
self.queue = channel.queue(
"emque.#{config.app_name}.delayed_message",
:durable => config.adapter.options[:durable],
:auto_delete => config.adapter.options[:auto_delete],
:arguments => {
"x-dead-letter-exchange" => "#{config.app_name}.error"
}
).bind(delayed_message_exchange)
end
|
Instance Method Details
#actor_died(actor, reason) ⇒ Object
12
13
14
15
16
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 12
def actor_died(actor, reason)
unless shutdown
logger.error "#{log_prefix} actor_died - died: #{reason}"
end
end
|
#start ⇒ Object
43
44
45
46
47
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 43
def start
logger.info "#{log_prefix} starting..."
queue.subscribe(:manual_ack => true, &method(:process_message))
logger.debug "#{log_prefix} started"
end
|
#stop ⇒ Object
49
50
51
52
53
54
55
56
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 49
def stop
logger.debug "#{log_prefix} stopping..."
super do
logger.debug "#{log_prefix} closing channel"
channel.close
end
logger.debug "#{log_prefix} stopped"
end
|