Class: StompServer::ActiveRecordQueue
- Inherits:
-
Object
- Object
- StompServer::ActiveRecordQueue
- Defined in:
- lib/stomp_server/queue/activerecord_queue.rb
Instance Attribute Summary collapse
-
#checkpoint_interval ⇒ Object
Returns the value of attribute checkpoint_interval.
Instance Method Summary collapse
-
#affect_msgid_and_store(frame, queue_name) ⇒ Object
store a frame (assigning it a message-id).
- #assign_id(frame, queue_name) ⇒ Object
-
#dequeue(queue_name) ⇒ Object
Get and remove a frame from the queue.
-
#enqueue(queue_name, frame) ⇒ Object
Add a frame to the queue.
-
#initialize(configdir, storagedir) ⇒ ActiveRecordQueue
constructor
A new instance of ActiveRecordQueue.
- #message_for?(queue_name) ⇒ Boolean
-
#remove_from_store(message_id) ⇒ Object
remove a frame from the store.
-
#requeue(queue_name, frame) ⇒ Object
Requeue the frame previously pending.
Constructor Details
#initialize(configdir, storagedir) ⇒ ActiveRecordQueue
Returns a new instance of ActiveRecordQueue.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 14 def initialize(configdir, storagedir) # Default configuration, use SQLite for simplicity db_params = { 'adapter' => 'sqlite3', 'database' => "#{configdir}/stompserver_development" } # Load DB configuration db_config = "#{configdir}/database.yml" puts "reading from #{db_config}" if File.exists? db_config db_params.merge! YAML::load(File.open(db_config)) end puts "using #{db_params['database']} DB" # Setup activerecord ActiveRecord::Base.establish_connection(db_params) # Development <TODO> fix this ActiveRecord::Base.logger = Logger.new(STDERR) ActiveRecord::Base.logger.level = Logger::INFO # we need the connection, it can't be done earlier ArMessage.reset_column_information reload_queues @stompid = StompServer::StompId.new end |
Instance Attribute Details
#checkpoint_interval ⇒ Object
Returns the value of attribute checkpoint_interval.
12 13 14 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 12 def checkpoint_interval @checkpoint_interval end |
Instance Method Details
#affect_msgid_and_store(frame, queue_name) ⇒ Object
store a frame (assigning it a message-id)
73 74 75 76 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 73 def affect_msgid_and_store(frame, queue_name) msgid = assign_id(frame, queue_name) ArMessage.create!(:stomp_id => msgid, :frame => frame) end |
#assign_id(frame, queue_name) ⇒ Object
82 83 84 85 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 82 def assign_id(frame, queue_name) msgid = @stompid[@frames[queue_name][:last_index] += 1] frame.headers['message-id'] = msgid end |
#dequeue(queue_name) ⇒ Object
Get and remove a frame from the queue
53 54 55 56 57 58 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 53 def dequeue(queue_name) return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty? frame = @frames[queue_name][:frames].shift remove_from_store(frame.headers['message-id']) return frame end |
#enqueue(queue_name, frame) ⇒ Object
Add a frame to the queue
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 41 def enqueue(queue_name, frame) unless @frames[queue_name] @frames[queue_name] = { :last_index => 0, :frames => [], } end affect_msgid_and_store(frame, queue_name) @frames[queue_name][:frames] << frame end |
#message_for?(queue_name) ⇒ Boolean
78 79 80 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 78 def (queue_name) @frames[queue_name] && !@frames[queue_name][:frames].empty? end |
#remove_from_store(message_id) ⇒ Object
remove a frame from the store
68 69 70 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 68 def remove_from_store() ArMessage.find_by_stomp_id().destroy end |
#requeue(queue_name, frame) ⇒ Object
Requeue the frame previously pending
61 62 63 64 65 |
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 61 def requeue(queue_name, frame) @frames[queue_name][:frames] << frame ArMessage.create!(:stomp_id => frame.headers['message-id'], :frame => frame) end |