Class: Clamour::Bus

Inherits:
Object
  • Object
show all
Defined in:
lib/clamour/bus.rb

Defined Under Namespace

Classes: WrongContentTypeError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = Clamour.configuration) ⇒ Bus

Returns a new instance of Bus.

Parameters:



25
26
27
28
29
30
# File 'lib/clamour/bus.rb', line 25

def initialize(configuration = Clamour.configuration)
  @configuration = configuration
  @connection_settings = configuration.rabbit_mq.to_hash
  @exchange_name = configuration.exchange
  @logger = configuration.logger
end

Instance Attribute Details

#configurationClamour::Configuration (readonly)



13
14
15
# File 'lib/clamour/bus.rb', line 13

def configuration
  @configuration
end

#connection_settingsHash (readonly)

Returns:

  • (Hash)


16
17
18
# File 'lib/clamour/bus.rb', line 16

def connection_settings
  @connection_settings
end

#exchange_nameString (readonly)

Returns:

  • (String)


19
20
21
# File 'lib/clamour/bus.rb', line 19

def exchange_name
  @exchange_name
end

#loggerLogger (readonly)

Returns:

  • (Logger)


22
23
24
# File 'lib/clamour/bus.rb', line 22

def logger
  @logger
end

Instance Method Details

#before_shutdown(&block) ⇒ Object

Do something gentle on SIGINT



116
117
118
119
120
121
# File 'lib/clamour/bus.rb', line 116

def before_shutdown(&block)
  Signal.trap('INT') do
    logger.info 'Shutting down on SIGINT...'
    block.call if block_given?
  end
end

#dump_json(message) ⇒ String

Parameters:

Returns:

  • (String)


125
126
127
# File 'lib/clamour/bus.rb', line 125

def dump_json(message)
  Oj.dump(message, mode: :compat)
end

#em_publish(message, &block) ⇒ Object

Parameters:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/clamour/bus.rb', line 56

def em_publish(message, &block)
  logger.debug "Message #{message.inspect} is going to be published"
  if configuration.enable_connection?
    AMQP.connect(connection_settings) do |connection|
      AMQP::Channel.new(connection) do |channel|
        channel.fanout(exchange_name, durable: true) do |exchange|
          options = { content_type: 'application/json' }
          exchange.publish(dump_json(message), options) do
            logger.debug "Message #{message.inspect} is published to #{exchange_name}"
            connection.disconnect do
              block.call if block_given?
            end
          end
        end
      end
    end
  else
    logger.debug "Connection is disabled. Message #{message.inspect} is not really published"
    block.call if block_given?
  end
end

#em_subscribe(&block) ⇒ Object

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/clamour/bus.rb', line 78

def em_subscribe(&block)
  raise ArgumentError.new('You have to provide a block') unless block_given?

  if configuration.enable_connection?
    AMQP.connect(connection_settings) do |connection|
      before_shutdown do
        connection.close do
          EM.stop
        end
      end

      AMQP::Channel.new(connection) do |channel|
        channel.fanout(exchange_name, durable: true) do |exchange|
          EM.schedule do
            channel.queue('', exclusive: true) do |queue|
              queue.bind(exchange).subscribe do |header, delivery|
                message_hash =
                    case header.content_type
                      when 'application/json'
                        ActiveSupport::HashWithIndifferentAccess.new(load_json(delivery))
                      else
                        raise WrongContentTypeError.new("Got #{delivery.inspect} for content type #{header.content_type}")
                    end
                logger.debug "Got hash #{message_hash}"
                block.call(message_hash)
              end
            end
          end
        end
      end
    end
  else
    logger.info 'Connection is disabled. Doing nothing...'
    before_shutdown { EM.stop }
  end
end

#load_json(json) ⇒ Hash

Parameters:

  • json (String)

Returns:

  • (Hash)


131
132
133
# File 'lib/clamour/bus.rb', line 131

def load_json(json)
  Oj.load(json)
end

#publish(message) ⇒ Object

Parameters:



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/clamour/bus.rb', line 33

def publish(message)
  if EM.reactor_running?
    em_publish(message)
  else
    EM.run do
      em_publish(message) do
        EM.stop
      end
    end
  end
end

#subscribe(&block) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'lib/clamour/bus.rb', line 45

def subscribe(&block)
  if EM.reactor_running?
    em_subscribe(&block)
  else
    EM.run do
      em_subscribe(&block)
    end
  end
end