Module: PubSub::Publisher

Includes:
BunnyConfig
Defined in:
lib/pub_sub/publisher.rb

Constant Summary collapse

RETRY_WAIT =
5

Instance Method Summary collapse

Methods included from BunnyConfig

#bunny_config

Instance Method Details

#bunnyObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/pub_sub/publisher.rb', line 23

def bunny
  @bunny ||= begin
    logger.info("PubSub:Publisher#bunny: initializing and starting new bunny instance")
    
    b = Bunny.new(bunny_config)
    b.start
    
    if(@transactional)
      logger.info("PubSub:Publisher#bunny: publisher is transactional, calling tx_select")
      b.tx_select
    end

    b
  end
end

#loggerObject



11
12
13
14
15
16
17
# File 'lib/pub_sub/publisher.rb', line 11

def logger
  @logger ||= begin
    logger = Logger.new(STDOUT)
    logger.level = Logger::WARN
    logger
  end
end

#logger=(logger) ⇒ Object



7
8
9
# File 'lib/pub_sub/publisher.rb', line 7

def logger=(logger)
  @logger = logger
end

#publish(msg, exchange_name, routing_key) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/pub_sub/publisher.rb', line 39

def publish(msg, exchange_name, routing_key)      
  begin

    begin
      exchange = bunny.exchange(exchange_name, :type => :topic, :durable => true)
    rescue StandardError, Timeout::Error => error
      logger.info("PubSub:Publisher#publish: resetting and reconnecting to #{exchange_name}, because - #{error}")
      sleep RETRY_WAIT
      reset
      retry
    end

    exchange.publish(msg, :key => routing_key, :persistent => true)

    if(@transactional)
      logger.debug("PubSub:Publisher#bunny: publisher is transactional, calling tx_commit")
      bunny.tx_commit
    end

  rescue StandardError, Timeout::Error => error
    logger.info("PubSub:Publisher#publish: error encountered while publishing to #{exchange_name} with routing_key #{routing_key}, retrying - #{error}")
    sleep RETRY_WAIT
    reset
    retry
  end
end

#resetObject



66
67
68
69
70
71
# File 'lib/pub_sub/publisher.rb', line 66

def reset
  logger.info("PubSub:Publisher#reset: stopping bunny")

  @bunny.stop rescue nil
  @bunny = nil
end

#transactional=(transactional) ⇒ Object



19
20
21
# File 'lib/pub_sub/publisher.rb', line 19

def transactional=(transactional)
  @transactional=transactional
end