Class: Krakow::Consumer

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/consumer.rb

Overview

Consume messages from a server

Instance Attribute Summary collapse

Attributes collapse

Instance Method Summary collapse

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(args = {}) ⇒ Consumer

Returns a new instance of Consumer.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/krakow/consumer.rb', line 39

def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}}.merge(
    arguments[:connection_options] || {}
  )
  @connections = {}
  @distribution = Distribution::Default.new(
    :max_in_flight => max_in_flight,
    :backoff_interval => backoff_interval,
    :consumer => current_actor
  )
  @queue = Queue.new
  if(nsqlookupd)
    debug "Connections will be established via lookup #{nsqlookupd.inspect}"
    @discovery = Discovery.new(:nsqlookupd => nsqlookupd)
    discover
  elsif(host && port)
    debug "Connection will be established via direct connection #{host}:#{port}"
    connection = build_connection(host, port, queue)
    if(register(connection))
      info "Registered new connection #{connection}"
      distribution.redistribute!
    else
      abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}")
    end
  else
    abort Error::ConfigurationError.new('No connection information provided!')
  end
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def connections
  @connections
end

#discoveryObject (readonly)

Returns the value of attribute discovery.



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def discovery
  @discovery
end

#distributionObject (readonly)

Returns the value of attribute distribution.



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def distribution
  @distribution
end

#queueObject (readonly)

Returns the value of attribute queue.



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def queue
  @queue
end

Instance Method Details

#backoff_intervalNumeric

Returns the backoff_interval attribute.

Returns:

  • (Numeric)

    the backoff_interval attribute



31
# File 'lib/krakow/consumer.rb', line 31

attribute :backoff_interval, Numeric

#backoff_interval?TrueClass, FalseClass

Returns truthiness of the backoff_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the backoff_interval attribute



31
# File 'lib/krakow/consumer.rb', line 31

attribute :backoff_interval, Numeric

#build_connection(host, port, queue) ⇒ Krakow::Connection?

Build a new [Krakow::Connection]

Parameters:

  • host (String)

    remote host

  • port (String, Integer)

    remote port

  • queue (Queue)

    queue for messages

Returns:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/krakow/consumer.rb', line 101

def build_connection(host, port, queue)
  begin
    connection = Connection.new(
      :host => host,
      :port => port,
      :queue => queue,
      :topic => topic,
      :channel => channel,
      :notifier => notifier,
      :features => connection_options[:features],
      :features_args => connection_options[:config],
      :callbacks => {
        :handle => {
          :actor => current_actor,
          :method => :process_message
        },
        :reconnect => {
          :actor => current_actor,
          :method => :connection_reconnect
        }
      }
    )
  rescue => e
    error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}"
    debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}"
    nil
  end
end

#channelString

Returns the channel attribute.

Returns:

  • (String)

    the channel attribute



26
# File 'lib/krakow/consumer.rb', line 26

attribute :channel, String, :required => true

#channel?TrueClass, FalseClass

Returns truthiness of the channel attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the channel attribute



26
# File 'lib/krakow/consumer.rb', line 26

attribute :channel, String, :required => true

#confirm(message_id) ⇒ TrueClass Also known as: finish

Confirm message has been processed

Parameters:

Returns:

  • (TrueClass)

Raises:

  • (KeyError)

    connection not found



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/krakow/consumer.rb', line 237

def confirm(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    distribution.in_flight_lookup(message_id) do |connection|
      distribution.unregister_message(message_id)
      connection.transmit(Command::Fin.new(:message_id => message_id))
      distribution.success(connection.identifier)
      update_ready!(connection)
    end
    true
  rescue KeyError => e
    error "Message confirmation failed: #{e}"
    abort e
  rescue Error::LookupFailed => e
    error "Lookup of message for confirmation failed! <Message ID: #{message_id} - Error: #{e}>"
    abort e
  rescue Error::ConnectionUnavailable => e
    retry
  end
end

#connection(key) ⇒ Krakow::Connection

Returns [Krakow::Connection] associated to key

Parameters:

  • key (Object)

    identifier

Returns:



73
74
75
# File 'lib/krakow/consumer.rb', line 73

def connection(key)
  @connections[key]
end

#connection_failure(actor, reason) ⇒ nil

Remove connection references when connection is terminated

Parameters:

  • actor (Object)

    terminated actor

  • reason (Exception)

    reason for termination

Returns:

  • (nil)


216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/krakow/consumer.rb', line 216

def connection_failure(actor, reason)
  connections.delete_if do |key, value|
    if(value == actor && reason.nil?)
      warn "Connection failure detected. Removing connection: #{key} - #{reason || 'no reason provided'}"
      begin
        distribution.remove_connection(key)
      rescue Error::ConnectionUnavailable, Error::ConnectionFailure
        warn 'Caught connection unavailability'
      end
      distribution.redistribute!
      true
    end
  end
  nil
end

#connection_optionsHash

Returns the connection_options attribute.

Returns:

  • (Hash)

    the connection_options attribute



35
# File 'lib/krakow/consumer.rb', line 35

attribute :connection_options, Hash, :default => ->{ Hash.new }

#connection_options?TrueClass, FalseClass

Returns truthiness of the connection_options attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the connection_options attribute



35
# File 'lib/krakow/consumer.rb', line 35

attribute :connection_options, Hash, :default => ->{ Hash.new }

#connection_reconnect(connection) ⇒ nil

Action to take when a connection has reconnected

Parameters:

Returns:

  • (nil)


147
148
149
150
151
# File 'lib/krakow/consumer.rb', line 147

def connection_reconnect(connection)
  connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel))
  distribution.set_ready_for(connection)
  nil
end

#discovernil

Start the discovery interval lookup

Returns:

  • (nil)


187
188
189
190
# File 'lib/krakow/consumer.rb', line 187

def discover
  init!
  after(discovery_interval + (discovery_jitter * rand)){ discover }
end

#discovery_intervalNumeric

Returns the discovery_interval attribute.

Returns:

  • (Numeric)

    the discovery_interval attribute



32
# File 'lib/krakow/consumer.rb', line 32

attribute :discovery_interval, Numeric, :default => 30

#discovery_interval?TrueClass, FalseClass

Returns truthiness of the discovery_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the discovery_interval attribute



32
# File 'lib/krakow/consumer.rb', line 32

attribute :discovery_interval, Numeric, :default => 30

#discovery_jitterNumeric

Returns the discovery_jitter attribute.

Returns:

  • (Numeric)

    the discovery_jitter attribute



33
# File 'lib/krakow/consumer.rb', line 33

attribute :discovery_jitter, Numeric, :default => 10.0

#discovery_jitter?TrueClass, FalseClass

Returns truthiness of the discovery_jitter attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the discovery_jitter attribute



33
# File 'lib/krakow/consumer.rb', line 33

attribute :discovery_jitter, Numeric, :default => 10.0

#goodbye_my_love!nil

Instance destructor

Returns:

  • (nil)


85
86
87
88
89
90
91
92
93
# File 'lib/krakow/consumer.rb', line 85

def goodbye_my_love!
  debug 'Tearing down consumer'
  connections.values.each do |con|
    con.terminate if con.alive?
  end
  distribution.terminate if distribution && distribution.alive?
  info 'Consumer torn down'
  nil
end

#hostString

Returns the host attribute.

Returns:

  • (String)

    the host attribute



27
# File 'lib/krakow/consumer.rb', line 27

attribute :host, String

#host?TrueClass, FalseClass

Returns truthiness of the host attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



27
# File 'lib/krakow/consumer.rb', line 27

attribute :host, String

#init!nil

Initialize the consumer by starting lookup and adding connections

Returns:

  • (nil)


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/krakow/consumer.rb', line 165

def init!
  debug 'Running consumer `init!` connection builds'
  found = discovery.lookup(topic)
  debug "Discovery results: #{found.inspect}"
  connection = nil
  found.each do |node|
    debug "Processing discovery result: #{node.inspect}"
    key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel)
    unless(connections[key])
      connection = build_connection(node[:broadcast_address], node[:tcp_port], queue)
      info "Registered new connection #{connection}" if register(connection)
    else
      debug "Discovery result already registered: #{node.inspect}"
    end
  end
  distribution.redistribute! if connection
  nil
end

#max_in_flightInteger

Returns the max_in_flight attribute.

Returns:

  • (Integer)

    the max_in_flight attribute



30
# File 'lib/krakow/consumer.rb', line 30

attribute :max_in_flight, Integer, :default => 1

#max_in_flight?TrueClass, FalseClass

Returns truthiness of the max_in_flight attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the max_in_flight attribute



30
# File 'lib/krakow/consumer.rb', line 30

attribute :max_in_flight, Integer, :default => 1

#notifier[Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

Returns the notifier attribute.

Returns:

  • ([Celluloid::Signals, Celluloid::Condition, Celluloid::Actor])

    the notifier attribute



34
# File 'lib/krakow/consumer.rb', line 34

attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

#notifier?TrueClass, FalseClass

Returns truthiness of the notifier attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the notifier attribute



34
# File 'lib/krakow/consumer.rb', line 34

attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

#nsqlookupd[Array, String]

Returns the nsqlookupd attribute.

Returns:

  • ([Array, String])

    the nsqlookupd attribute



29
# File 'lib/krakow/consumer.rb', line 29

attribute :nsqlookupd, [Array, String]

#nsqlookupd?TrueClass, FalseClass

Returns truthiness of the nsqlookupd attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the nsqlookupd attribute



29
# File 'lib/krakow/consumer.rb', line 29

attribute :nsqlookupd, [Array, String]

#port[String, Integer]

Returns the port attribute.

Returns:

  • ([String, Integer])

    the port attribute



28
# File 'lib/krakow/consumer.rb', line 28

attribute :port, [String, Integer]

#port?TrueClass, FalseClass

Returns truthiness of the port attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



28
# File 'lib/krakow/consumer.rb', line 28

attribute :port, [String, Integer]

#process_message(message, connection) ⇒ Krakow::FrameType

Process a given message if required

Parameters:

Returns:



135
136
137
138
139
140
141
# File 'lib/krakow/consumer.rb', line 135

def process_message(message, connection)
  if(message.is_a?(FrameType::Message))
    distribution.register_message(message, connection.identifier)
    message.origin = current_actor
  end
  message
end

#register(connection) ⇒ TrueClass, FalseClass

Register connection with distribution

Parameters:

Returns:

  • (TrueClass, FalseClass)

    true if subscription was successful



196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/krakow/consumer.rb', line 196

def register(connection)
  begin
    connection.init!
    connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel))
    self.link connection
    connections[connection.identifier] = connection
    distribution.add_connection(connection)
    true
  rescue Error::BadResponse => e
    debug "Failed to establish connection: #{e.result ? e.result.error : '<No Response!>'}"
    connection.terminate
    false
  end
end

#requeue(message_id, timeout = 0) ⇒ TrueClass

Requeue message (generally due to processing failure)

Parameters:

Returns:

  • (TrueClass)


264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/krakow/consumer.rb', line 264

def requeue(message_id, timeout=0)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  distribution.in_flight_lookup(message_id) do |connection|
    distribution.unregister_message(message_id)
    connection.transmit(
      Command::Req.new(
        :message_id => message_id,
        :timeout => timeout
      )
    )
    distribution.failure(connection.identifier)
    update_ready!(connection)
  end
  true
end

#to_sString

Returns stringify object.

Returns:

  • (String)

    stringify object



78
79
80
# File 'lib/krakow/consumer.rb', line 78

def to_s
  "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>"
end

#topicString

Returns the topic attribute.

Returns:

  • (String)

    the topic attribute



25
# File 'lib/krakow/consumer.rb', line 25

attribute :topic, String, :required => true

#topic?TrueClass, FalseClass

Returns truthiness of the topic attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



25
# File 'lib/krakow/consumer.rb', line 25

attribute :topic, String, :required => true

#touch(message_id) ⇒ TrueClass

Touch message (to extend timeout)

Parameters:

Returns:

  • (TrueClass)


284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/krakow/consumer.rb', line 284

def touch(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    distribution.in_flight_lookup(message_id) do |connection|
      connection.transmit(
        Command::Touch.new(:message_id => message_id)
      )
    end
    true
  rescue Error::LookupFailed => e
    error "Lookup of message for touch failed! <Message ID: #{message_id} - Error: #{e}>"
    abort e
  end
end

#update_ready!(connection) ⇒ nil

Send RDY for connection based on distribution rules

Parameters:

Returns:

  • (nil)


157
158
159
160
# File 'lib/krakow/consumer.rb', line 157

def update_ready!(connection)
  distribution.set_ready_for(connection)
  nil
end