Class: TinyQ::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/tinyq/subscriber.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(c, n = 1) ⇒ Subscriber

Returns a new instance of Subscriber.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/tinyq/subscriber.rb', line 10

def initialize(c,n = 1)
    @connection = c
    @requested = @count = n
    @queue = EventMachine::Queue.new
    @messages = []
    @message_ids = {}

    cb = Proc.new do |event|
        $LOG.debug("Subscriber #{@connection.ip}:#{@connection.port} - Queue callback")

        response = { :Messages => @messages }

        connection.reply(response)
        
        # Only at that point should the message be removed from the bucket!
        @message_ids.each do |message_id,info|
            bucket = info[:Bucket]
            funnel = info[:Funnel]

            bucket.message_sent(funnel, message_id)
        end


        @queue.pop &cb
    end

    @queue.pop &cb
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



3
4
5
# File 'lib/tinyq/subscriber.rb', line 3

def connection
  @connection
end

#countObject

Returns the value of attribute count.



6
7
8
# File 'lib/tinyq/subscriber.rb', line 6

def count
  @count
end

#message_idsObject (readonly)

Returns the value of attribute message_ids.



8
9
10
# File 'lib/tinyq/subscriber.rb', line 8

def message_ids
  @message_ids
end

#messagesObject (readonly)

Returns the value of attribute messages.



7
8
9
# File 'lib/tinyq/subscriber.rb', line 7

def messages
  @messages
end

#queueObject (readonly)

Returns the value of attribute queue.



4
5
6
# File 'lib/tinyq/subscriber.rb', line 4

def queue
  @queue
end

#requestedObject (readonly)

Returns the value of attribute requested.



5
6
7
# File 'lib/tinyq/subscriber.rb', line 5

def requested
  @requested
end

Instance Method Details

#put_message(bucket, funnel, message, message_id) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tinyq/subscriber.rb', line 39

def put_message(bucket, funnel, message, message_id)
    @messages.push(message)
    @message_ids[message_id] = {
        :Funnel => funnel,
        :Bucket => bucket
    }

    if @messages.size == @requested
        # We have everything now, so we can send all messages to subscriber
        event = {
            :Event => "New Message",
            :Funnel => funnel,
            :Bucket => bucket,
            :MessageID => message_id
        }
        @queue.push(event)
        true
    else
        false
    end
end