Class: Euston::Daemons::Pipeline::MessageBuffer::Buffer
- Inherits:
-
Object
- Object
- Euston::Daemons::Pipeline::MessageBuffer::Buffer
- Defined in:
- lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #delete_dispatched_messages(component_id) ⇒ Object
- #enqueue(exchange, message, dispatch_at = nil) ⇒ Object
- #find_dispatchable_messages(component_id) ⇒ Object
- #get_by_id(id) ⇒ Object
-
#initialize(mongodb) ⇒ Buffer
constructor
A new instance of Buffer.
- #take_ownership_of_dispatchable_messages(component_id) ⇒ Object
Constructor Details
#initialize(mongodb) ⇒ Buffer
Returns a new instance of Buffer.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 6 def initialize mongodb name = 'message_buffer' mongodb.create_collection name unless mongodb.collection_names.include? name @name = name @collection = mongodb.collection name @collection.ensure_index [ ['message_id', Mongo::ASCENDING] ], :unique => false, :name => "#{name}_message_id_index" @collection.ensure_index [ ['component_id', Mongo::ASCENDING], ['dispatch_at', Mongo::ASCENDING] ], :unique => false, :name => "#{name}_component_id_dispatch_at_index" end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
23 24 25 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 23 def name @name end |
Instance Method Details
#delete_dispatched_messages(component_id) ⇒ Object
25 26 27 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 25 def component_id @collection.remove({ 'component_id' => component_id }, :multi => true) end |
#enqueue(exchange, message, dispatch_at = nil) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 29 def enqueue exchange, , dispatch_at = nil = = [{ :hash => , :dispatch_at => dispatch_at }] unless .is_a? Array = .map do |m| = m.is_a?(Hash) && m.has_key?(:hash) && m.has_key?(:dispatch_at) m = { :hash => m, :dispatch_at => dispatch_at } unless map_to_document exchange, m end @collection.insert() unless .empty? end |
#find_dispatchable_messages(component_id) ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 42 def component_id query = { 'component_id' => component_id } fields = ['exchange', 'type', 'json'] sort = [ 'dispatch_at', Mongo::ASCENDING ] @collection.find query, :fields => fields, :sort => sort end |
#get_by_id(id) ⇒ Object
50 51 52 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 50 def get_by_id id @collection.find_one 'message_id' => id end |
#take_ownership_of_dispatchable_messages(component_id) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 54 def component_id = { 'component_id' => '', 'dispatch_at' => { '$lte' => Time.now.to_f } } = { 'component_id' => { '$ne' => '' }, 'dispatch_at' => { '$lte' => Time.now.to_f - 60 } } query = { '$or' => [ , ] } doc = { '$set' => { 'component_id' => component_id } } @collection.update query, doc, :multi => true end |