Class: Pipeline::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pipeline/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



28
29
30
# File 'lib/pipeline/client.rb', line 28

def initialize
  @on_open_callbacks = Array.new
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



32
33
34
# File 'lib/pipeline/client.rb', line 32

def channel
  @channel
end

#connectionObject (readonly)

Returns the value of attribute connection.



32
33
34
# File 'lib/pipeline/client.rb', line 32

def connection
  @connection
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



32
33
34
# File 'lib/pipeline/client.rb', line 32

def exchange
  @exchange
end

#on_open_callbacksObject (readonly)

Returns the value of attribute on_open_callbacks.



32
33
34
# File 'lib/pipeline/client.rb', line 32

def on_open_callbacks
  @on_open_callbacks
end

Class Method Details

.boot(config) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/pipeline/client.rb', line 9

def self.boot(config)
  client = self.new

  # Next tick, so we can use it with Thin.
  EM.next_tick do
    client.connect(config.merge(adapter: 'eventmachine'))

    # Set up signals.
    ['INT', 'TERM'].each do |signal|
      Signal.trap(signal) do
        puts "~ Received #{signal} signal, terminating."
        client.disconnect { EM.stop }
      end
    end
  end

  client
end

Instance Method Details

#connect(opts) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pipeline/client.rb', line 33

def connect(opts)
  @connection = AMQ::Client.connect(opts)
  @channel = AMQ::Client::Channel.new(@connection, 1)

  @connection.on_open do
    puts "~ Connected to RabbitMQ."

    @channel.open do
      self.on_open_callbacks.each do |callback|
        callback.call
      end
    end
  end
end

#consumer(name, routing_key = name, &block) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pipeline/client.rb', line 68

def consumer(name, routing_key = name, &block)
  queue = self.declare_queue(name, routing_key)

  queue.consume(true) do |consume_ok|
    puts "Subscribed for messages routed to #{queue.name}, consumer tag is #{consume_ok.consumer_tag}, using no-ack mode"

    queue.on_delivery do |basic_deliver, header, payload|
      block.call(payload, header, basic_deliver)
    end
  end
end

#declare_queue(name, routing_key) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/pipeline/client.rb', line 52

def declare_queue(name, routing_key)
  queue = AMQ::Client::Queue.new(@connection, @channel, name)

  self.on_open do
    queue.declare(false, true, false, true) do
      # puts "~ Queue #{queue.name.inspect} is ready"
    end

    queue.bind(self.exchange.name, routing_key) do
      puts "~ Queue #{queue.name} is now bound to #{self.exchange.name} with routing key #{routing_key}"
    end
  end

  queue
end

#disconnect(&block) ⇒ Object



94
95
96
# File 'lib/pipeline/client.rb', line 94

def disconnect(&block)
  @connection.disconnect(&block)
end

#on_open(&block) ⇒ Object

This runs after the channel is open. TODO: Why amq-client doesn’t support adding multiple callbacks?



82
83
84
85
86
87
88
# File 'lib/pipeline/client.rb', line 82

def on_open(&block)
  if @channel.status == :opening
    self.on_open_callbacks << block
  else
    block.call
  end
end

#publish(*args) ⇒ Object



90
91
92
# File 'lib/pipeline/client.rb', line 90

def publish(*args)
  self.exchange.publish(*args)
end