Module: RabbitJobs::Publisher

Extended by:
AmqpHelpers, Publisher
Included in:
Publisher
Defined in:
lib/rabbit_jobs/publisher.rb

Instance Method Summary collapse

Methods included from AmqpHelpers

amqp_with_exchange, amqp_with_queue, make_queue

Instance Method Details

#publish(klass, opts = {}, *params) ⇒ Object



12
13
14
15
# File 'lib/rabbit_jobs/publisher.rb', line 12

def publish(klass, opts = {}, *params)
  key = RabbitJobs.config.routing_keys.first
  publish_to(key, klass, opts, *params)
end

#publish_job_to(routing_key, job) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rabbit_jobs/publisher.rb', line 27

def publish_job_to(routing_key, job)
  amqp_with_exchange do |connection, exchange|

    queue = make_queue(exchange, routing_key)

    job.opts['created_at'] = Time.now.to_s

    payload = job.payload
    exchange.publish(job.payload, Configuration::DEFAULT_MESSAGE_PARAMS.merge({routing_key: routing_key})) {
      connection.close { EM.stop }
    }
  end
end

#publish_to(routing_key, klass, opts = {}, *params) ⇒ Object

Raises:

  • (ArgumentError)


17
18
19
20
21
22
23
24
25
# File 'lib/rabbit_jobs/publisher.rb', line 17

def publish_to(routing_key, klass, opts = {}, *params)
  raise ArgumentError unless klass && routing_key
  opts ||= {}

  job = klass.new(*params)
  job.opts = opts

  publish_job_to(routing_key, job)
end

#purge_queue(routing_key) ⇒ Object

Raises:

  • (ArgumentError)


41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rabbit_jobs/publisher.rb', line 41

def purge_queue(routing_key)
  raise ArgumentError unless routing_key

  amqp_with_queue(routing_key) do |connection, queue|
    queue.status do |number_of_messages, number_of_consumers|
      queue.purge {
        connection.close {
          EM.stop
          return number_of_messages
        }
      }
    end
  end
end