Class: StompServer::ActiveRecordQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server/queue/activerecord_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configdir, storagedir) ⇒ ActiveRecordQueue

Returns a new instance of ActiveRecordQueue.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 14

def initialize(configdir, storagedir)
  # Default configuration, use SQLite for simplicity
  db_params = {
    'adapter' => 'sqlite3',
    'database' => "#{configdir}/stompserver_development"
  }
  # Load DB configuration
  db_config = "#{configdir}/database.yml"
  puts "reading from #{db_config}"
  if File.exists? db_config
    db_params.merge! YAML::load(File.open(db_config))
  end

  puts "using #{db_params['database']} DB"

  # Setup activerecord
  ActiveRecord::Base.establish_connection(db_params)
  # Development <TODO> fix this
  ActiveRecord::Base.logger = Logger.new(STDERR)
  ActiveRecord::Base.logger.level = Logger::INFO
  # we need the connection, it can't be done earlier
  ArMessage.reset_column_information
  reload_queues
  @stompid = StompServer::StompId.new
end

Instance Attribute Details

#checkpoint_intervalObject

Returns the value of attribute checkpoint_interval.



12
13
14
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 12

def checkpoint_interval
  @checkpoint_interval
end

Instance Method Details

#affect_msgid_and_store(frame, queue_name) ⇒ Object

store a frame (assigning it a message-id)



73
74
75
76
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 73

def affect_msgid_and_store(frame, queue_name)
  msgid = assign_id(frame, queue_name)
  ArMessage.create!(:stomp_id => msgid, :frame => frame)
end

#assign_id(frame, queue_name) ⇒ Object



82
83
84
85
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 82

def assign_id(frame, queue_name)
  msgid = @stompid[@frames[queue_name][:last_index] += 1]
  frame.headers['message-id'] = msgid
end

#dequeue(queue_name) ⇒ Object

Get and remove a frame from the queue



53
54
55
56
57
58
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 53

def dequeue(queue_name)
  return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty?
  frame = @frames[queue_name][:frames].shift
  remove_from_store(frame.headers['message-id'])
  return frame
end

#enqueue(queue_name, frame) ⇒ Object

Add a frame to the queue



41
42
43
44
45
46
47
48
49
50
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 41

def enqueue(queue_name, frame)
  unless @frames[queue_name]
    @frames[queue_name] = {
      :last_index => 0,
      :frames => [],
    }
  end
  affect_msgid_and_store(frame, queue_name)
  @frames[queue_name][:frames] << frame
end

#message_for?(queue_name) ⇒ Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 78

def message_for?(queue_name)
  @frames[queue_name] && !@frames[queue_name][:frames].empty?
end

#remove_from_store(message_id) ⇒ Object

remove a frame from the store



68
69
70
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 68

def remove_from_store(message_id)
  ArMessage.find_by_stomp_id(message_id).destroy
end

#requeue(queue_name, frame) ⇒ Object

Requeue the frame previously pending



61
62
63
64
65
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 61

def requeue(queue_name, frame)
  @frames[queue_name][:frames] << frame
  ArMessage.create!(:stomp_id => frame.headers['message-id'],
                    :frame => frame)
end