Class: Krakow::Consumer::Queue

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/consumer/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(consumer, *args) ⇒ self

Create new consumer queue instance

Parameters:



23
24
25
26
27
28
29
30
# File 'lib/krakow/consumer/queue.rb', line 23

def initialize(consumer, *args)
  opts = args.detect{|x| x.is_a?(Hash)}
  @consumer = consumer
  @removal_callback = opts[:removal_callback]
  @messages = {}
  @pop_order = []
  @cleaner = nil
end

Instance Attribute Details

#consumerConsumer (readonly)

Returns:



13
14
15
# File 'lib/krakow/consumer/queue.rb', line 13

def consumer
  @consumer
end

#pop_orderArray (readonly)

Returns order of message removal.

Returns:

  • (Array)

    order of message removal



15
16
17
# File 'lib/krakow/consumer/queue.rb', line 15

def pop_order
  @pop_order
end

#removal_callbackSymbol (readonly)

Returns callback method name.

Returns:

  • (Symbol)

    callback method name



17
18
19
# File 'lib/krakow/consumer/queue.rb', line 17

def removal_callback
  @removal_callback
end

Instance Method Details

#deregister_connection(identifier) ⇒ Array<FrameType::Message>

Remove connection registration and remove all messages

Parameters:

  • identifier (String)

    connection identifier

Returns:



59
60
61
62
63
64
65
# File 'lib/krakow/consumer/queue.rb', line 59

def deregister_connection(identifier)
  messages do |collection|
    removed = collection.delete(identifier)
    pop_order.delete(identifier)
    removed
  end
end

#messages {|messages| ... } ⇒ Hash

Message container

Yield Parameters:

  • messages (Hash)

Returns:

  • (Hash)

    messages or block result



36
37
38
39
40
41
42
# File 'lib/krakow/consumer/queue.rb', line 36

def messages
  if(block_given?)
    yield @messages
  else
    @messages
  end
end

#popObject Also known as: deq

Pop first item off the queue

Returns:

  • (Object)


92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/krakow/consumer/queue.rb', line 92

def pop
  message = nil
  until(message)
    wait(:new_message) if pop_order.empty?
    messages do |collection|
      key = pop_order.shift
      if(key)
        message = collection[key].shift
        message = validate_message(message)
      end
    end
  end
  message
end

#push(message) ⇒ self Also known as: <<, enq

Push new message into queue

Parameters:

Returns:

  • (self)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/krakow/consumer/queue.rb', line 71

def push(message)
  unless(message.is_a?(FrameType::Message))
    abort TypeError.new "Expecting `FrameType::Message` but received `#{message.class}`!"
  end
  messages do |collection|
    begin
      collection[message.connection.identifier] << message
      pop_order << message.connection.identifier
    rescue Celluloid::DeadActorError
      abort Error::ConnectionUnavailable.new
    end
  end
  signal(:new_message)
  current_actor
end

#register_connection(connection) ⇒ TrueClass

Register a new connection

Parameters:

Returns:

  • (TrueClass)


48
49
50
51
52
53
# File 'lib/krakow/consumer/queue.rb', line 48

def register_connection(connection)
  messages do |collection|
    collection[connection.identifier] = []
  end
  true
end

#scrub_duplicate_message(message) ⇒ TrueClass, FalseClass

Remove duplicate message from queue if possible

Parameters:

Returns:

  • (TrueClass, FalseClass)


119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/krakow/consumer/queue.rb', line 119

def scrub_duplicate_message(message)
  messages do |collection|
    idx = collection[message.connection.identifier].index do |msg|
      msg.message_id == message.message_id
    end
    if(idx)
      msg = collection[message.connection.identifier].delete_at(idx)
      if(removal_callback)
        consumer.send(removal_callback, [message])
      end
      true
    else
      false
    end
  end
end

#sizeInteger

Returns number of queued messages.

Returns:

  • (Integer)

    number of queued messages



109
110
111
112
113
# File 'lib/krakow/consumer/queue.rb', line 109

def size
  messages do |collection|
    collection.values.map(&:size).inject(&:+)
  end
end

#validate_message(message) ⇒ Object

Validate message



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/krakow/consumer/queue.rb', line 137

def validate_message(message)
  if(message.instance_stamp > message.instance_stamp + (message.connection.endpoint_settings[:msg_timeout] / 1000.0))
    warn "Message exceeded timeout! Discarding. (#{message})"
    if(removal_callback)
      consumer.send(removal_callback, [message])
    end
    nil
  else
    message
  end
end