Class: Pipeline::Client
- Inherits:
-
Object
- Object
- Pipeline::Client
- Defined in:
- lib/pipeline/client.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#on_open_callbacks ⇒ Object
readonly
Returns the value of attribute on_open_callbacks.
Class Method Summary collapse
Instance Method Summary collapse
- #connect(opts) ⇒ Object
- #consumer(name, routing_key = name, &block) ⇒ Object
- #declare_queue(name, routing_key) ⇒ Object
- #disconnect(&block) ⇒ Object
-
#initialize ⇒ Client
constructor
A new instance of Client.
-
#on_open(&block) ⇒ Object
This runs after the channel is open.
- #publish(*args) ⇒ Object
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
28 29 30 |
# File 'lib/pipeline/client.rb', line 28 def initialize @on_open_callbacks = Array.new end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
32 33 34 |
# File 'lib/pipeline/client.rb', line 32 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
32 33 34 |
# File 'lib/pipeline/client.rb', line 32 def connection @connection end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
32 33 34 |
# File 'lib/pipeline/client.rb', line 32 def exchange @exchange end |
#on_open_callbacks ⇒ Object (readonly)
Returns the value of attribute on_open_callbacks.
32 33 34 |
# File 'lib/pipeline/client.rb', line 32 def on_open_callbacks @on_open_callbacks end |
Class Method Details
.boot(config) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/pipeline/client.rb', line 9 def self.boot(config) client = self.new # Next tick, so we can use it with Thin. EM.next_tick do client.connect(config.merge(adapter: 'eventmachine')) # Set up signals. ['INT', 'TERM'].each do |signal| Signal.trap(signal) do puts "~ Received #{signal} signal, terminating." client.disconnect { EM.stop } end end end client end |
Instance Method Details
#connect(opts) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/pipeline/client.rb', line 33 def connect(opts) @connection = AMQ::Client.connect(opts) @channel = AMQ::Client::Channel.new(@connection, 1) @connection.on_open do puts "~ Connected to RabbitMQ." @channel.open do self.on_open_callbacks.each do |callback| callback.call end end end end |
#consumer(name, routing_key = name, &block) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/pipeline/client.rb', line 68 def consumer(name, routing_key = name, &block) queue = self.declare_queue(name, routing_key) queue.consume(true) do |consume_ok| puts "Subscribed for messages routed to #{queue.name}, consumer tag is #{consume_ok.consumer_tag}, using no-ack mode" queue.on_delivery do |basic_deliver, header, payload| block.call(payload, header, basic_deliver) end end end |
#declare_queue(name, routing_key) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pipeline/client.rb', line 52 def declare_queue(name, routing_key) queue = AMQ::Client::Queue.new(@connection, @channel, name) self.on_open do queue.declare(false, true, false, true) do # puts "~ Queue #{queue.name.inspect} is ready" end queue.bind(self.exchange.name, routing_key) do puts "~ Queue #{queue.name} is now bound to #{self.exchange.name} with routing key #{routing_key}" end end queue end |
#disconnect(&block) ⇒ Object
94 95 96 |
# File 'lib/pipeline/client.rb', line 94 def disconnect(&block) @connection.disconnect(&block) end |
#on_open(&block) ⇒ Object
This runs after the channel is open. TODO: Why amq-client doesn’t support adding multiple callbacks?
82 83 84 85 86 87 88 |
# File 'lib/pipeline/client.rb', line 82 def on_open(&block) if @channel.status == :opening self.on_open_callbacks << block else block.call end end |
#publish(*args) ⇒ Object
90 91 92 |
# File 'lib/pipeline/client.rb', line 90 def publish(*args) self.exchange.publish(*args) end |