Class: StompServer::ActiveRecordQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server_ng/queue/activerecord_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configdir, storagedir, db_ymlfile) ⇒ ActiveRecordQueue

Returns a new instance of ActiveRecordQueue.


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 14

def initialize(configdir, storagedir, db_ymlfile)
  # Default configuration, use SQLite for simplicity
  db_params = {
    'adapter' => 'sqlite3',
    'database' => "#{configdir}/stompserver_development"
  }
  @@log = Logger::new(STDOUT)
  @@log.level = StompServer::LogHelper.get_loglevel()
  # Load DB configuration
  @@log.debug "trying to read from #{db_ymlfile}"
  if File.exists? db_ymlfile
    @@log.debug("File #{db_ymlfile} exists.")
    db_params.merge! YAML::load(File.open(db_ymlfile))
  else
    @@log.warn("File #{db_ymlfile} not found, using sqlite3 default.")
  end
  @@log.debug("using DB params: #{db_params.inspect}")
  # Setup activerecord
  ActiveRecord::Base.establish_connection(db_params)
  @@log.debug("connection complete")

  # AR Logger
  ActiveRecord::Base.logger = Logger.new(STDOUT)
  ActiveRecord::Base.logger.level = StompServer::LogHelper.get_loglevel()

  # we need the connection, it can't be done earlier
  ArMessage.reset_column_information
  reload_queues
  @stompid = StompServer::StompId.new
end

Instance Attribute Details

#checkpoint_intervalObject

Returns the value of attribute checkpoint_interval.


12
13
14
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 12

def checkpoint_interval
  @checkpoint_interval
end

Instance Method Details

#affect_msgid_and_store(frame, queue_name) ⇒ Object

store a frame (assigning it a message-id)


78
79
80
81
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 78

def affect_msgid_and_store(frame, queue_name)
  msgid = assign_id(frame, queue_name)
  ArMessage.create!(:stomp_id => msgid, :frame => frame)
end

#assign_id(frame, queue_name) ⇒ Object


87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 87

def assign_id(frame, queue_name)

  unless @frames[queue_name]
    @frames[queue_name] = {
      :last_index => 0,
      :frames => [],
    }
  end

  msgid = @stompid[@frames[queue_name][:last_index] += 1]
  frame.headers['message-id'] = msgid
end

#dequeue(queue_name, session_id) ⇒ Object

Get and remove a frame from the queue


58
59
60
61
62
63
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 58

def dequeue(queue_name, session_id)
  return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty?
  frame = @frames[queue_name][:frames].shift
  remove_from_store(frame.headers['message-id'])
  return frame
end

#enqueue(queue_name, frame) ⇒ Object

Add a frame to the queue


46
47
48
49
50
51
52
53
54
55
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 46

def enqueue(queue_name, frame)
  unless @frames[queue_name]
    @frames[queue_name] = {
      :last_index => 0,
      :frames => [],
    }
  end
  affect_msgid_and_store(frame, queue_name)
  @frames[queue_name][:frames] << frame
end

#message_for?(queue_name, session_id) ⇒ Boolean

Returns:

  • (Boolean)

83
84
85
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 83

def message_for?(queue_name, session_id)
  @frames[queue_name] && !@frames[queue_name][:frames].empty?
end

#remove_from_store(message_id) ⇒ Object

remove a frame from the store


73
74
75
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 73

def remove_from_store(message_id)
  ArMessage.find_by_stomp_id(message_id).destroy
end

#requeue(queue_name, frame) ⇒ Object

Requeue the frame previously pending


66
67
68
69
70
# File 'lib/stomp_server_ng/queue/activerecord_queue.rb', line 66

def requeue(queue_name, frame)
  @frames[queue_name][:frames] << frame
  ArMessage.create!(:stomp_id => frame.headers['message-id'],
                    :frame => frame)
end