Class: Longleaf::SequelIndexDriver

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/longleaf/indexing/sequel_index_driver.rb

Overview

Driver for interacting with RDBM based metadata index using the Sequel ORM gem. Users must create the database and credentials for connecting to it in advance, if using a database application that requires creation of databases (ie, not sqlite). The default database name is ‘longleaf_metadata_index’ but may be overridden.

See the Sequel documentation for details about accepted connection parameters: github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc

Instance Method Summary collapse

Methods included from Logging

#initialize_logger, initialize_logger, logger, #logger

Constructor Details

#initialize(app_config, adapter, conn_details, page_size: nil) ⇒ SequelIndexDriver

Initialize the index driver

Parameters:

  • app_config (ApplicationConfigManager)

    the application configuration manager

  • adapter (String)

    name of the database adapter to use.

  • conn_details

    Details about the configuration and connection to the database used for the index. If a string is provided, it will be used as the connection URL and must identify the adapter. If a hash is provided, it used as the parameters for the database connection.

  • page_size (Integer) (defaults to: nil)

    number of results to retrieve per query when getting candidates



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 33

def initialize(app_config, adapter, conn_details, page_size: nil)
  Sequel.default_timezone = :utc
  @app_config = app_config
  @adapter = adapter
  @conn_details = conn_details
  # Digest of the app config file so we can tell if it changes
  @config_md5 = app_config.config_md5
  @page_size = page_size.nil? || page_size <= 0 ? DEFAULT_PAGE_SIZE : page_size

  if @conn_details.is_a?(Hash)
    # Add in the adapter name
    @conn_details['adapter'] = adapter unless @conn_details.key?('adapter')
    # Add in default database name if none was specified
    @conn_details['database'] = INDEX_DB_NAME unless @conn_details.key?('database')
  end
end

Instance Method Details

#clear_index(older_than = nil) ⇒ Object

Remove all entries from the index

Parameters:

  • older_than (Time) (defaults to: nil)

    Optional. If provided, only entries that have not been indexed since before the provided time will be deleted.



153
154
155
156
157
158
159
160
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 153

def clear_index(older_than = nil)
  if older_than.nil?
    preserve_tbl.delete
  else
    older_than_timestamp = older_than.utc.strftime(TIMESTAMP_FORMAT)
    preserve_tbl.where { updated < older_than_timestamp }.delete
  end
end

#delay_until_timestamp(md_rec) ⇒ Object

Returns The first failure timestamp for any service, or nil if there were none.

Returns:

  • The first failure timestamp for any service, or nil if there were none.



126
127
128
129
130
131
132
133
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 126

def delay_until_timestamp(md_rec)
  md_rec.list_services.each do |service_name|
    service_rec = md_rec.service(service_name)
    return service_rec.failure_timestamp unless service_rec.failure_timestamp.nil?
  end
  # return lowest possible date
  return minimum_timestamp
end

#each_registered_path(file_selector, older_than: nil, &block) ⇒ Object

Calls the provided block once per each registered file path registered. Must be passed a block.

Parameters:

  • file_selector (FileSelector)

    selector for what paths to search for files

  • older_than (Time) (defaults to: nil)

    Optional. If provided, only files that have not been indexed since before this timestamp will be returned.



250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 250

def each_registered_path(file_selector, older_than: nil, &block)
  dataset = add_path_restrictions(registered_dataset, file_selector)
      .select(:file_path)
  if !older_than.nil?
    older_than_timestamp = older_than.utc.strftime(TIMESTAMP_FORMAT)
    dataset = dataset.where { updated < older_than_timestamp }
  end
  # Yield to the provided block once per row return
  dataset.paged_each(:rows_per_fetch => @page_size) do |row|
    block.call(row[:file_path])
  end
end

#first_service_execution_timestamp(expected_services, md_rec) ⇒ Object

Find the earliest service execution time for any services expected to be run for the specified file.

Parameters:

  • expected_services (Array)

    list of ServiceDefinition objects expected for specified file.

  • md_rec (MetadataRecord)

    metadata record for the file being evaluated

Returns:

  • The timestamp of the earliest service execution time for the file described by md_rec, in iso8601 format. Returns nil if no services are expected all services have already run and do not have a next occurrence, or the file is deregistered.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 105

def first_service_execution_timestamp(expected_services, md_rec)
  current_time = Time.now.utc.iso8601(3)
  if md_rec.deregistered?
    return nil
  end

  service_times = Array.new

  present_services = md_rec.list_services

  expected_services.each do |service_def|
    service_name = service_def.name

    next_run = ServiceDateHelper.next_run_needed(md_rec, service_def)
    service_times << next_run unless next_run.nil?
  end
  # Return the lowest service execution time
  service_times.min
end

#index(file_rec) ⇒ Object

Index the provided file_rec and its metadata

Parameters:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 59

def index(file_rec)
  file_path = file_rec.path
  md_rec = file_rec.
  storage_loc = file_rec.storage_location
  service_manager = @app_config.service_manager

  # Produce a list of service definitions which should apply to the file
  expected_services = service_manager.list_service_definitions(
      location: storage_loc.name)

  first_timestamp = first_service_execution_timestamp(expected_services, md_rec)
  delay_until_timestamp = delay_until_timestamp(md_rec)

  first_timestamp = convert_iso8601_to_timestamp(first_timestamp)
  delay_until_timestamp = convert_iso8601_to_timestamp(delay_until_timestamp)
  now_stamp = Time.now.utc.strftime(TIMESTAMP_FORMAT)

  if @adapter == :mysql || @adapter == :mysql2
    preserve_tbl.on_duplicate_key_update
        .insert(file_path: file_path,
            storage_location: storage_loc.name,
            service_time: first_timestamp,
            delay_until_time: delay_until_timestamp,
            updated: now_stamp)
  else
    preserve_tbl.insert_conflict(target: :file_path,
        update: {
            storage_location: storage_loc.name,
            service_time: first_timestamp,
            delay_until_time: delay_until_timestamp,
            updated: now_stamp } )
        .insert(file_path: file_path,
            storage_location: storage_loc.name,
            service_time: first_timestamp,
            delay_until_time: delay_until_timestamp,
            updated: now_stamp)
  end
end

#is_stale?Boolean

Returns true if the application configuration does not match the configuration used for the last reindex.

Returns:

  • (Boolean)


52
53
54
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 52

def is_stale?
  db_conn[INDEX_STATE_TBL].where(config_md5: @config_md5).count == 0
end

#paths_with_stale_services(file_selector, stale_datetime) ⇒ Array

Retrieves page of file paths which have one or more services which need to run.

Parameters:

  • file_selector (FileSelector)

    selector for what paths to search for files

  • stale_datetime (DateTime)

    find file_paths with services needing to be run before this value

Returns:

  • (Array)

    array of file paths that need one or more services run.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 220

def paths_with_stale_services(file_selector, stale_datetime)
  if @preserve_dataset.nil?
    @preserve_dataset = db_conn
        .from(PRESERVE_TBL)
        .exclude(service_time: nil)
        .limit(@page_size)
        .order(Sequel.asc(:service_time))
  end

  # retrieve and return a page of results
  ds = add_path_restrictions(@preserve_dataset, file_selector)
      .where { service_time <= stale_datetime }
      .where { delay_until_time < stale_datetime }
      .select_map(:file_path)
end

#registered_paths(file_selector) ⇒ Array

Retrieves a page of paths for registered files.

Parameters:

  • file_selector (FileSelector)

    selector for what paths to search for files

Returns:

  • (Array)

    array of file paths that are registered



239
240
241
242
243
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 239

def registered_paths(file_selector)
  # retrieve and return a page of results
  add_path_restrictions(registered_dataset, file_selector)
      .select_map(:file_path)
end

#remove(remove_me) ⇒ Object

Remove an entry from the index

Parameters:

  • remove_me

    The record to remove from the index. May be a FileRecord or a String.



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 137

def remove(remove_me)
  if remove_me.is_a?(FileRecord)
    path = remove_me.path
  else
    path = remove_me
  end

  result = preserve_tbl.where(file_path: path).delete
  if result == 0
    logger.warn("Could not remove #{path} from the index, path was not present.")
  end
end

#setup_indexObject

Initialize the index’s database using the provided configuration



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 163

def setup_index
  # Create the table for tracking when files will need preservation services run on them.
  case @adapter
  when :mysql, :mysql2
    # mysql does not support 'text' fields as primary keys
    db_conn.create_table!(PRESERVE_TBL) do
      String :file_path, primary_key: true, size: 768
      column :storage_location, 'varchar(128)'
      column :service_time, 'timestamp(3)', { :null => true }
      column :delay_until_time, 'timestamp(3)'
      column :updated, 'timestamp(3)'
    end
  else
    db_conn.create_table!(PRESERVE_TBL) do
      String :file_path, primary_key: true, text: true
      column :storage_location, 'varchar(128)'
      column :service_time, 'timestamp(3)', { :null => true }
      column :delay_until_time, 'timestamp(3)'
      column :updated, 'timestamp(3)'
    end
  end

  # Setup database indexes
  case @adapter
  when :postgres
    db_conn.run("CREATE INDEX service_times_file_path_text_index ON preserve_service_times (file_path text_pattern_ops)")
  when :sqlite, :amalgalite
    db_conn.run("CREATE INDEX service_times_file_path_text_index ON preserve_service_times (file_path collate nocase)")
  end
  db_conn.run("CREATE INDEX service_times_storage_location_index ON preserve_service_times (storage_location)")

  # Create table for tracking the state of the index
  db_conn.create_table!(INDEX_STATE_TBL) do
    String :config_md5
    DateTime :last_reindexed
    String :longleaf_version
  end

  # Prepopulate the index state information
  update_index_state
end

#update_index_stateObject

Updates the state information for the index to indicate that the index has been refreshed or is in sync with the application’s configuration.



207
208
209
210
211
212
213
214
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 207

def update_index_state
  index_state_tbl = db_conn[INDEX_STATE_TBL]
  index_state_tbl.delete
  index_state_tbl.insert(
      config_md5: @config_md5,
      last_reindexed: Time.now.utc,
      longleaf_version: Longleaf::VERSION)
end