40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/queue.rb', line 40
def read(opts={})
opts = {
:messages_read => 10,
:visibility_timeout => -1.0,
:routing_key => nil,
:block => false,
:polling_interval => 1,
:polling_timeout => nil
}.update(opts)
current_time = Time.new.to_f
messages = []
query = {
'_meta.expired' => false,
'_meta.visible_after' => {
'$lt' => current_time
}
}
if opts[:routing_key] != nil
query['_meta.routing_key'] = opts[:routing_key]
else
query['_meta.routing_key'] = {
'$exists' => false
}
end
update = false
if opts[:visibility_timeout] > -1.0
update = {
'$set' => {
'_meta.visible_after' => current_time + opts[:visibility_timeout]
}
}
end
raw_messages = []
if opts[:block]
block_until_message_available( query, opts[:polling_interval], opts[:polling_timeout] )
end
if update
(0..opts[:messages_read]).each do
begin
raw_message = @queue_collection.find_and_modify(:query => query, :update => update)
if raw_message
raw_messages << raw_message
else
break
end
rescue Mongo::OperationFailure => operation_failure
if not operation_failure.to_s.match(Queue::NO_OBJECT_FOUND_ERROR)
raise operation_failure
end
end
end
else
@queue_collection.find(query).limit(opts[:messages_read]).each do |raw_message|
raw_messages << raw_message
end
end
raw_messages.each do |raw_message|
message = Karait::Message.new(raw_message=raw_message, queue_collection=@queue_collection)
if not message.expired?
messages << message
end
end
return messages
end
|