Class: Beetle::Publisher

Inherits:
Base
  • Object
show all
Defined in:
lib/beetle/publisher.rb

Overview

Provides the publishing logic implementation.

Constant Summary collapse

RPC_DEFAULT_TIMEOUT =

:nodoc:

10

Instance Attribute Summary collapse

Attributes inherited from Base

#options, #server, #servers

Instance Method Summary collapse

Methods included from Logging

#logger

Constructor Details

#initialize(client, options = {}) ⇒ Publisher

:nodoc:



7
8
9
10
11
12
13
14
15
16
# File 'lib/beetle/publisher.rb', line 7

def initialize(client, options = {}) #:nodoc:
  super
  @exchanges_with_bound_queues = {}
  @dead_servers = {}
  @bunnies = {}
  @throttling_options = {}
  @next_throttle_refresh = Time.now
  @throttled = false
  at_exit { stop }
end

Instance Attribute Details

#dead_serversObject (readonly)

Returns the value of attribute dead_servers.



5
6
7
# File 'lib/beetle/publisher.rb', line 5

def dead_servers
  @dead_servers
end

Instance Method Details

#bunny_exceptionsObject

list of exceptions potentially raised by bunny these need to be lazy, because qrack exceptions are only defined after a connection has been established



32
33
34
35
36
37
38
39
# File 'lib/beetle/publisher.rb', line 32

def bunny_exceptions
  [
    Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError,
    Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError,
    Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError,
    Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
  ]
end

#publish(message_name, data, opts = {}) ⇒ Object

:nodoc:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/beetle/publisher.rb', line 41

def publish(message_name, data, opts={}) #:nodoc:
  ActiveSupport::Notifications.instrument('publish.beetle') do
    opts = @client.messages[message_name].merge(opts.symbolize_keys)
    exchange_name = opts.delete(:exchange)
    opts.delete(:queue)
    recycle_dead_servers unless @dead_servers.empty?
    throttle!
    if opts[:redundant]
      publish_with_redundancy(exchange_name, message_name, data, opts)
    else
      publish_with_failover(exchange_name, message_name, data, opts)
    end
  end
end

#publish_with_failover(exchange_name, message_name, data, opts) ⇒ Object

:nodoc:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/beetle/publisher.rb', line 56

def publish_with_failover(exchange_name, message_name, data, opts) #:nodoc:
  tries = @servers.size * 2
  logger.debug "Beetle: sending #{message_name}"
  published = 0
  opts = Message.publishing_options(opts)
  begin
    select_next_server if tries.even?
    bind_queues_for_exchange(exchange_name)
    logger.debug "Beetle: trying to send message #{message_name}:#{opts[:message_id]} to #{@server}"
    exchange(exchange_name).publish(data, opts)
    logger.debug "Beetle: message sent!"
    published = 1
  rescue *bunny_exceptions => e
    logger.warn("Beetle: publishing exception #{e} #{e.backtrace[0..4].join("\n")}")
    stop!(e)
    tries -= 1
    # retry same server on receiving the first exception for it (might have been a normal restart)
    # in this case you'll see either a broken pipe or a forced connection shutdown error
    retry if tries.odd?
    mark_server_dead
    retry if tries > 0
    logger.error "Beetle: message could not be delivered: #{message_name}"
    raise NoMessageSent.new
  end
  published
end

#publish_with_redundancy(exchange_name, message_name, data, opts) ⇒ Object

:nodoc:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/beetle/publisher.rb', line 83

def publish_with_redundancy(exchange_name, message_name, data, opts) #:nodoc:
  if @servers.size < 2
    logger.warn "Beetle: at least two active servers are required for redundant publishing" if @dead_servers.size > 0
    return publish_with_failover(exchange_name, message_name, data, opts)
  end
  published = []
  opts = Message.publishing_options(opts)
  loop do
    break if published.size == 2 || @servers.empty? || published == @servers
    tries = 0
    select_next_server
    begin
      next if published.include? @server
      bind_queues_for_exchange(exchange_name)
      logger.debug "Beetle: trying to send #{message_name}:#{opts[:message_id]} to #{@server}"
      exchange(exchange_name).publish(data, opts)
      published << @server
      logger.debug "Beetle: message sent (#{published})!"
    rescue *bunny_exceptions => e
      logger.warn("Beetle: publishing exception #{e} #{e.backtrace[0..4].join("\n")}")
      stop!(e)
      retry if (tries += 1) == 1
      mark_server_dead
    end
  end
  case published.size
  when 0
    logger.error "Beetle: message could not be delivered: #{message_name}"
    raise NoMessageSent.new
  when 1
    logger.warn "Beetle: failed to send message redundantly"
  end

  published.size
end

#purge(queue_names) ⇒ Object

:nodoc:



166
167
168
169
170
171
172
# File 'lib/beetle/publisher.rb', line 166

def purge(queue_names) #:nodoc:
  each_server do
    queue_names.each do |name|
      queue(name).purge rescue nil
    end
  end
end

#rpc(message_name, data, opts = {}) ⇒ Object

:nodoc:



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/beetle/publisher.rb', line 121

def rpc(message_name, data, opts={}) #:nodoc:
  opts = @client.messages[message_name].merge(opts.symbolize_keys)
  exchange_name = opts.delete(:exchange)
  opts.delete(:queue)
  recycle_dead_servers unless @dead_servers.empty?
  tries = @servers.size
  logger.debug "Beetle: performing rpc with message #{message_name}"
  result = nil
  status = "TIMEOUT"
  begin
    select_next_server
    bind_queues_for_exchange(exchange_name)
    # create non durable, autodeleted temporary queue with a server assigned name
    queue = bunny.queue
    opts = Message.publishing_options(opts.merge :reply_to => queue.name)
    logger.debug "Beetle: trying to send #{message_name}:#{opts[:message_id]} to #{@server}"
    exchange(exchange_name).publish(data, opts)
    logger.debug "Beetle: message sent!"
    logger.debug "Beetle: listening on reply queue #{queue.name}"
    queue.subscribe(:message_max => 1, :timeout => opts[:timeout] || RPC_DEFAULT_TIMEOUT) do |msg|
      logger.debug "Beetle: received reply!"
      result = msg[:payload]
      status = msg[:header].properties[:headers][:status]
    end
    logger.debug "Beetle: rpc complete!"
  rescue *bunny_exceptions => e
    stop!(e)
    mark_server_dead
    tries -= 1
    retry if tries > 0
    logger.error "Beetle: message could not be delivered: #{message_name}"
  end
  [status, result]
end

#setup_queues_and_policiesObject



174
175
176
177
178
179
180
181
182
183
184
# File 'lib/beetle/publisher.rb', line 174

def setup_queues_and_policies
  each_server do
    begin
      @client.queues.keys.each do |name|
        queue(name)
      end
    rescue => e
      logger.warn "Beetle: failed setting up queues and policies on #{@server}: #{e}"
    end
  end
end

#stopObject

:nodoc:



186
187
188
# File 'lib/beetle/publisher.rb', line 186

def stop #:nodoc:
  each_server { stop! }
end

#throttle(queue_options) ⇒ Object



156
157
158
# File 'lib/beetle/publisher.rb', line 156

def throttle(queue_options)
  @throttling_options = queue_options
end

#throttle!Object



160
161
162
163
164
# File 'lib/beetle/publisher.rb', line 160

def throttle!
  return unless throttling?
  refresh_throttling!
  sleep 1 if throttled?
end

#throttled?Boolean

Returns:

  • (Boolean)


18
19
20
# File 'lib/beetle/publisher.rb', line 18

def throttled?
  @throttled
end

#throttling?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/beetle/publisher.rb', line 22

def throttling?
  !@throttling_options.empty?
end

#throttling_statusObject



26
27
28
# File 'lib/beetle/publisher.rb', line 26

def throttling_status
  @throttled ? 'throttled' : 'unthrottled'
end