Class: StompServer::ActiveRecordQueue
- Inherits:
-
Object
- Object
- StompServer::ActiveRecordQueue
- Defined in:
- lib/stomp_server_ng/queue/activerecord_queue.rb
Instance Attribute Summary collapse
-
#checkpoint_interval ⇒ Object
Returns the value of attribute checkpoint_interval.
Instance Method Summary collapse
-
#affect_msgid_and_store(frame, queue_name) ⇒ Object
store a frame (assigning it a message-id).
- #assign_id(frame, queue_name) ⇒ Object
-
#dequeue(queue_name, session_id) ⇒ Object
Get and remove a frame from the queue.
-
#enqueue(queue_name, frame) ⇒ Object
Add a frame to the queue.
-
#initialize(configdir, storagedir, db_ymlfile) ⇒ ActiveRecordQueue
constructor
A new instance of ActiveRecordQueue.
- #message_for?(queue_name, session_id) ⇒ Boolean
-
#remove_from_store(message_id) ⇒ Object
remove a frame from the store.
-
#requeue(queue_name, frame) ⇒ Object
Requeue the frame previously pending.
Constructor Details
#initialize(configdir, storagedir, db_ymlfile) ⇒ ActiveRecordQueue
Returns a new instance of ActiveRecordQueue.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 14 def initialize(configdir, storagedir, db_ymlfile) # Default configuration, use SQLite for simplicity db_params = { 'adapter' => 'sqlite3', 'database' => "#{configdir}/stompserver_development" } @@log = Logger::new(STDOUT) @@log.level = StompServer::LogHelper.get_loglevel() # Load DB configuration @@log.debug "trying to read from #{db_ymlfile}" if File.exists? db_ymlfile @@log.debug("File #{db_ymlfile} exists.") db_params.merge! YAML::load(File.open(db_ymlfile)) else @@log.warn("File #{db_ymlfile} not found, using sqlite3 default.") end @@log.debug("using DB params: #{db_params.inspect}") # Setup activerecord ActiveRecord::Base.establish_connection(db_params) @@log.debug("connection complete") # AR Logger ActiveRecord::Base.logger = Logger.new(STDOUT) ActiveRecord::Base.logger.level = StompServer::LogHelper.get_loglevel() # we need the connection, it can't be done earlier ArMessage.reset_column_information reload_queues @stompid = StompServer::StompId.new end |
Instance Attribute Details
#checkpoint_interval ⇒ Object
Returns the value of attribute checkpoint_interval.
12 13 14 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 12 def checkpoint_interval @checkpoint_interval end |
Instance Method Details
#affect_msgid_and_store(frame, queue_name) ⇒ Object
store a frame (assigning it a message-id)
78 79 80 81 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 78 def affect_msgid_and_store(frame, queue_name) msgid = assign_id(frame, queue_name) ArMessage.create!(:stomp_id => msgid, :frame => frame) end |
#assign_id(frame, queue_name) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 87 def assign_id(frame, queue_name) unless @frames[queue_name] @frames[queue_name] = { :last_index => 0, :frames => [], } end msgid = @stompid[@frames[queue_name][:last_index] += 1] frame.headers['message-id'] = msgid end |
#dequeue(queue_name, session_id) ⇒ Object
Get and remove a frame from the queue
58 59 60 61 62 63 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 58 def dequeue(queue_name, session_id) return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty? frame = @frames[queue_name][:frames].shift remove_from_store(frame.headers['message-id']) return frame end |
#enqueue(queue_name, frame) ⇒ Object
Add a frame to the queue
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 46 def enqueue(queue_name, frame) unless @frames[queue_name] @frames[queue_name] = { :last_index => 0, :frames => [], } end affect_msgid_and_store(frame, queue_name) @frames[queue_name][:frames] << frame end |
#message_for?(queue_name, session_id) ⇒ Boolean
83 84 85 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 83 def (queue_name, session_id) @frames[queue_name] && !@frames[queue_name][:frames].empty? end |
#remove_from_store(message_id) ⇒ Object
remove a frame from the store
73 74 75 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 73 def remove_from_store() ArMessage.find_by_stomp_id().destroy end |
#requeue(queue_name, frame) ⇒ Object
Requeue the frame previously pending
66 67 68 69 70 |
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 66 def requeue(queue_name, frame) @frames[queue_name][:frames] << frame ArMessage.create!(:stomp_id => frame.headers['message-id'], :frame => frame) end |