Class: ScoutAgent::Database::Queue

Inherits:
ScoutAgent::Database show all
Defined in:
lib/scout_agent/database/queue.rb

Overview

This database is used to stored messages queued from external processes. Such messages can be data for missions or full reports ready for submission to the Scout server.

Constant Summary collapse

QUEUE_LIMIT =

A size limit for the queue to prevent data from building up.

3000

Instance Attribute Summary

Attributes inherited from ScoutAgent::Database

#log

Instance Method Summary collapse

Methods inherited from ScoutAgent::Database

#initialize, load, #locked?, #maintain, #migrate, #path, path, #prepare_connection, #query, #read_from_sqlite, #read_locked?, #schema_version, #write_locked?, #write_to_sqlite

Constructor Details

This class inherits a constructor from ScoutAgent::Database

Instance Method Details

#dequeue(*ids) ⇒ Object

Removes queued messages from the database by their ids. Returns true if the removal succeeded, or false otherwise.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/scout_agent/database/queue.rb', line 146

def dequeue(*ids)
  return true if ids.empty?
  write_to_sqlite do |sqlite|
    sqlite.execute(<<-END_DELETE_QUEUED.trim, *ids)
    DELETE FROM queue WHERE ROWID IN (#{(['?'] * ids.size).join(', ')})
    END_DELETE_QUEUED
  end
  true
rescue Amalgalite::SQLite3::Error => error  # failed to remove messages
  # 
  # do nothing:  messages will be delivered again,
  #              mission can block duplicate
  # 
  log.error("Database dequeuing error:  #{error.message}.")
  false
end

#enqueue(mission_id, fields) ⇒ Object

Adds a message to the queue. The passed mission_id needs to be an Integer ID for a mission or one of the Strings 'report', 'hint', 'alert', or 'error' for a full report. The fields parameter is expected to be a Hash of fields and should include a 'plugin_id' key identifying what the data is for when the message is a report for the server.



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/scout_agent/database/queue.rb', line 44

def enqueue(mission_id, fields)
  write_to_sqlite do |sqlite|
    sqlite.execute(<<-END_ENQUEUE.trim, mission_id, fields.to_json)
    INSERT INTO queue(mission_id, fields) VALUES(?, ?)
    END_ENQUEUE
  end
  true
rescue Amalgalite::SQLite3::Error => error  # failed to enqueue message
  log.error("Database queuing error:  #{error.message}.")
  false  # reject bad message
end

#peek(mission_id) ⇒ Object

Returns a message (id, fields, and created_at) queued for mission_id. The fields are JSON parsed if possible and created_at is converted to a Time object. This method will return nil if no messages are queued for mission_id.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/scout_agent/database/queue.rb', line 62

def peek(mission_id)
  queued = read_from_sqlite { |sqlite|
    sqlite.first_row_from(<<-END_FIND_QUEUED.trim, mission_id.to_s)
    SELECT ROWID AS id, fields, created_at FROM queue WHERE mission_id = ?
    END_FIND_QUEUED
  }
  if queued.empty?
    nil  # not found
  else
    begin
      queued[:fields] = JSON.parse(queued[:fields].to_s)
    rescue JSON::ParserError  # failed to parse
      # leave for mission to decode it
      log.warn("Queued fields malformed.")
    end
    if created = Time.from_db_s(queued[:created_at])
      queued[:created_at] = created
    else
      log.warn("Queued timestamp missing.")
    end
    queued
  end
rescue Amalgalite::SQLite3::Error => error  # failed to retrieve message
  log.error("Database peeking error:  #{error.message}.")
  nil  # not found
end

#queued_reportsObject

This method returns queued reports intended for the Scout server.

The process is pretty much identical to how mission generated reports are pulled. See ScoutAgent::Database::MissionLog#current_reports() for details.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/scout_agent/database/queue.rb', line 96

def queued_reports
  write_to_sqlite { |sqlite|
    # read the current reports
    begin
      report_ids = Array.new
      reports    = query(<<-END_FIND_REPORTS.trim) { |row|
      SELECT     ROWID AS id, mission_id AS type, fields, created_at
      FROM       queue
      WHERE      mission_id IN ('report', 'hint', 'alert', 'error')
      ORDER BY   created_at
      LIMIT      500
      END_FIND_REPORTS
        begin
          row[:fields] = JSON.parse(row[:fields].to_s)
          if row[:fields].include? "plugin_id"
            row[:plugin_id] = row[:fields].delete("plugin_id")
          end
        rescue JSON::ParserError  # failed to parse
          # skip the transform since we can't parse it
          log.warn("Queued fields malformed.")
        end
        if created = Time.from_db_s(row[:created_at])
          row[:created_at] = created.utc.to_db_s
        else
          log.warn("Queued timestamp missing.")
        end
        report_ids << row.delete_at(:id)
      }
    rescue Amalgalite::SQLite3::Error => error  # failed to find reports
      log.error("Database queued reports error:  #{error.message}.")
      return Array.new  # return empty results
    end
    return reports if reports.empty?
    # delete the reports we read
    unless dequeue(*report_ids)
      # cancel sending this batch
      sqlite.rollback   # we can't submit unless we're sure they are gone
      return Array.new  # return empty results
    end
    reports  # the reports ready for sending
  }
rescue Amalgalite::SQLite3::Error => error  # failed to get a write lock
  # try again to read reports later
  log.error("Database queued reports locking error:  #{error.message}.")
end

#update_schema(version = schema_version) ⇒ Object

Builds a schema for the queue table. This table is size controlled by a trigger to prevent infinite data growth.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/scout_agent/database/queue.rb', line 19

def update_schema(version = schema_version)
  case version
  when 0
    <<-END_INITIAL_SCHEMA.trim
    CREATE TABLE queue (
      mission_id TEXT NOT NULL
        CHECK( mission_id IN ('report', 'hint', 'alert', 'error') OR
               CAST(mission_id AS 'integer') > 0 ),
      fields     REQUIRED_TEXT_TYPE,
      created_at DATETIME_TYPE
    );
    DEFAULT_LOCALTIME_TRIGGER queue created_at
    LIMIT_TABLE_SIZE_TRIGGER  queue #{QUEUE_LIMIT}
    END_INITIAL_SCHEMA
  end
end