Class: ScoutAgent::Database::Queue
- Inherits:
-
ScoutAgent::Database
- Object
- ScoutAgent::Database
- ScoutAgent::Database::Queue
- Defined in:
- lib/scout_agent/database/queue.rb
Overview
This database is used to stored messages queued from external processes. Such messages can be data for missions or full reports ready for submission to the Scout server.
Constant Summary collapse
- QUEUE_LIMIT =
A size limit for the queue to prevent data from building up.
3000
Instance Attribute Summary
Attributes inherited from ScoutAgent::Database
Instance Method Summary collapse
-
#dequeue(*ids) ⇒ Object
Removes queued messages from the database by their
ids
. -
#enqueue(mission_id, fields) ⇒ Object
Adds a message to the queue.
-
#peek(mission_id) ⇒ Object
Returns a message (
id
,fields
, andcreated_at
) queued formission_id
. -
#queued_reports ⇒ Object
This method returns queued reports intended for the Scout server.
-
#update_schema(version = schema_version) ⇒ Object
Builds a schema for the queue table.
Methods inherited from ScoutAgent::Database
#initialize, load, #locked?, #maintain, #migrate, #path, path, #prepare_connection, #query, #read_from_sqlite, #read_locked?, #schema_version, #write_locked?, #write_to_sqlite
Constructor Details
This class inherits a constructor from ScoutAgent::Database
Instance Method Details
#dequeue(*ids) ⇒ Object
Removes queued messages from the database by their ids
. Returns true
if the removal succeeded, or false
otherwise.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/scout_agent/database/queue.rb', line 146 def dequeue(*ids) return true if ids.empty? write_to_sqlite do |sqlite| sqlite.execute(<<-END_DELETE_QUEUED.trim, *ids) DELETE FROM queue WHERE ROWID IN (#{(['?'] * ids.size).join(', ')}) END_DELETE_QUEUED end true rescue Amalgalite::SQLite3::Error => error # failed to remove messages # # do nothing: messages will be delivered again, # mission can block duplicate # log.error("Database dequeuing error: #{error.}.") false end |
#enqueue(mission_id, fields) ⇒ Object
Adds a message to the queue. The passed mission_id
needs to be an Integer ID for a mission or one of the Strings 'report'
, 'hint'
, 'alert'
, or 'error'
for a full report. The fields
parameter is expected to be a Hash of fields and should include a 'plugin_id'
key identifying what the data is for when the message is a report for the server.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/scout_agent/database/queue.rb', line 44 def enqueue(mission_id, fields) write_to_sqlite do |sqlite| sqlite.execute(<<-END_ENQUEUE.trim, mission_id, fields.to_json) INSERT INTO queue(mission_id, fields) VALUES(?, ?) END_ENQUEUE end true rescue Amalgalite::SQLite3::Error => error # failed to enqueue message log.error("Database queuing error: #{error.}.") false # reject bad message end |
#peek(mission_id) ⇒ Object
Returns a message (id
, fields
, and created_at
) queued for mission_id
. The fields
are JSON parsed if possible and created_at
is converted to a Time object. This method will return nil
if no messages are queued for mission_id
.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/scout_agent/database/queue.rb', line 62 def peek(mission_id) queued = read_from_sqlite { |sqlite| sqlite.first_row_from(<<-END_FIND_QUEUED.trim, mission_id.to_s) SELECT ROWID AS id, fields, created_at FROM queue WHERE mission_id = ? END_FIND_QUEUED } if queued.empty? nil # not found else begin queued[:fields] = JSON.parse(queued[:fields].to_s) rescue JSON::ParserError # failed to parse # leave for mission to decode it log.warn("Queued fields malformed.") end if created = Time.from_db_s(queued[:created_at]) queued[:created_at] = created else log.warn("Queued timestamp missing.") end queued end rescue Amalgalite::SQLite3::Error => error # failed to retrieve message log.error("Database peeking error: #{error.}.") nil # not found end |
#queued_reports ⇒ Object
This method returns queued reports intended for the Scout server.
The process is pretty much identical to how mission generated reports are pulled. See ScoutAgent::Database::MissionLog#current_reports() for details.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/scout_agent/database/queue.rb', line 96 def queued_reports write_to_sqlite { |sqlite| # read the current reports begin report_ids = Array.new reports = query(<<-END_FIND_REPORTS.trim) { |row| SELECT ROWID AS id, mission_id AS type, fields, created_at FROM queue WHERE mission_id IN ('report', 'hint', 'alert', 'error') ORDER BY created_at LIMIT 500 END_FIND_REPORTS begin row[:fields] = JSON.parse(row[:fields].to_s) if row[:fields].include? "plugin_id" row[:plugin_id] = row[:fields].delete("plugin_id") end rescue JSON::ParserError # failed to parse # skip the transform since we can't parse it log.warn("Queued fields malformed.") end if created = Time.from_db_s(row[:created_at]) row[:created_at] = created.utc.to_db_s else log.warn("Queued timestamp missing.") end report_ids << row.delete_at(:id) } rescue Amalgalite::SQLite3::Error => error # failed to find reports log.error("Database queued reports error: #{error.}.") return Array.new # return empty results end return reports if reports.empty? # delete the reports we read unless dequeue(*report_ids) # cancel sending this batch sqlite.rollback # we can't submit unless we're sure they are gone return Array.new # return empty results end reports # the reports ready for sending } rescue Amalgalite::SQLite3::Error => error # failed to get a write lock # try again to read reports later log.error("Database queued reports locking error: #{error.}.") end |
#update_schema(version = schema_version) ⇒ Object
Builds a schema for the queue table. This table is size controlled by a trigger to prevent infinite data growth.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/scout_agent/database/queue.rb', line 19 def update_schema(version = schema_version) case version when 0 <<-END_INITIAL_SCHEMA.trim CREATE TABLE queue ( mission_id TEXT NOT NULL CHECK( mission_id IN ('report', 'hint', 'alert', 'error') OR CAST(mission_id AS 'integer') > 0 ), fields REQUIRED_TEXT_TYPE, created_at DATETIME_TYPE ); DEFAULT_LOCALTIME_TRIGGER queue created_at LIMIT_TABLE_SIZE_TRIGGER queue #{QUEUE_LIMIT} END_INITIAL_SCHEMA end end |