Class: Envoi::WatchFolderUtility::WatchFolder

Inherits:
Object
  • Object
show all
Defined in:
lib/envoi/watch_folder_utility/watch_folder.rb,
lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb,
lib/cantemo/portal/agent/cli/commands/watch_folders-working.rb

Defined Under Namespace

Classes: Foreman, Handler, State, Worker

Constant Summary collapse

DEFAULT_POLL_INTERVAL =
15
DEFAULT_PROCESSOR_COUNT_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = { }) ⇒ WatchFolder

Returns a new instance of WatchFolder.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 23

def initialize(args = { })
  initialize_logger(args)
  @definition = args[:definition].dup

  logger.debug { "Initializing Watch Folder. #{Object.__id__}" }

  @ignored_file_paths = [ ]
  @ignored_file_paths_lock = Mutex.new

  @threaded = args.fetch(:threaded, true)

  @default_maximum_active_processors = DEFAULT_PROCESSOR_COUNT_LIMIT
  @processors = { }
  # @processors = Hash.new { |h, k| h[k] = {} }


  @default_handler_class = Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
  process_watch_folder_def

  initialize_handler

  @default_agent = args[:default_agent]
  @default_agent_class = args[:default_agent_class] || @default_agent.class

  @last_poll_time = nil

  process_agent_defs
  logger.debug { "Watch Folder Initialized. #{Object.__id__}" }
end

Instance Attribute Details

#agentObject

Returns the value of attribute agent.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def agent
  @agent
end

#definitionObject

Returns the value of attribute definition.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def definition
  @definition
end

#handlerObject

Returns the value of attribute handler.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def handler
  @handler
end

#last_poll_timeObject

Returns the value of attribute last_poll_time.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def last_poll_time
  @last_poll_time
end

#loggerObject

Returns the value of attribute logger.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def logger
  @logger
end

#min_stable_poll_countObject

Returns the value of attribute min_stable_poll_count.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def min_stable_poll_count
  @min_stable_poll_count
end

#min_stable_timeObject

Returns the value of attribute min_stable_time.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def min_stable_time
  @min_stable_time
end

#poll_intervalObject

Returns the value of attribute poll_interval.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def poll_interval
  @poll_interval
end

#processorsObject

Returns the value of attribute processors.



16
17
18
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 16

def processors
  @processors
end

Instance Method Details

#add_file_to_ignored_file_paths(file) ⇒ Object

Parameters:

  • file (Object)


96
97
98
99
100
101
102
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 96

def add_file_to_ignored_file_paths(file)
  logger.debug { "Adding File to Ignore Cache: '#{file.path}'" }
  @ignored_file_paths_lock.synchronize do
    @ignored_file_paths << file.path
    file.ignore if file.respond_to?(:ignore)
  end
end

#find_in_patterns(patterns, file) ⇒ Object

Used to compare file to patterns



105
106
107
108
109
110
111
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 105

def find_in_patterns(patterns, file)
  patterns.find do |pattern|
    matched = pattern.is_a?(Regexp) ? pattern.match(file.path) : File.fnmatch(pattern, file.path)
    logger.debug { "#{pattern} #{matched ? 'matched' : "didn't match"} #{file.path}" }
    matched
  end
end

#ignored_file_pathsObject



113
114
115
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 113

def ignored_file_paths
  handler.ignored_files_map.keys
end

#initialize_agent(args = {}) ⇒ Object



75
76
77
78
79
80
81
82
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 75

def initialize_agent(args = {})
  @agent ||= begin
    logger.debug { "Initializing Agent. #{@default_agent_class} #{args}" }
    _agent = @default_agent_class.new(config: args, logger: logger, default_preserve_file_path: false)
    logger.debug { "Agent Instance created." }
    _agent
  end
end

#initialize_handler(watch_folder_def = @definition) ⇒ Object



84
85
86
87
88
89
90
91
92
93
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 84

def initialize_handler(watch_folder_def = @definition)
  args_out              = {}
  args_out[:logger]     ||= logger.dup
  args_out[:definition] = watch_folder_def

  handler_class = @default_handler_class
  logger.debug { "Creating Watch Folder Handler Instance. #{handler_class.name}" }
  @handler = handler_class.new(args_out)
  logger.debug { "Watch Folder Handler Instance Created. #{handler_class.name}" }
end

#initialize_logger(args = {}) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 53

def initialize_logger(args = {})
  @logger = args[:logger]
  # @logger   = args[:logger] ||= begin
  #   _log_to = MultiIO.new(STDOUT)
  #   log_to = args[:log_to]
  #   log_age = args[:log_age] || 'daily'
  #   _log_to.add_target(File.open(log_to, 'a')) if log_to
  #   _logger = Logger.new(_log_to, log_age)
  # end
  #
  # log_level = args[:log_level] ||= Logger::INFO
  # if log_level
  #   if log_level.is_a?(String)
  #     log_level.downcase!
  #     _log_level = %w(fatal error warn info debug).find { |v| v == log_level }
  #     log_level = _log_level ? Logger::Severity.const_get(_log_level.to_sym.upcase) : Logger::INFO
  #   end
  #   @logger.level = log_level
  # end
  @logger
end

#nameObject



138
139
140
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 138

def name
  @name ||= definition['name'] || definition['paths'] || definition['path'] || Object.__id__
end

#pollObject



117
118
119
120
121
122
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 117

def poll
  @previous_poll_time = @last_poll_time
  @last_poll_time = Time.now

  handler.poll if handler.respond_to?(:poll)
end

#poll_interval_elapsed?Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 124

def poll_interval_elapsed?
  !last_poll_time || (Time.now - last_poll_time) >= poll_interval
end

#process_agent_def(agent_def) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 128

def process_agent_def(agent_def)
  if @default_agent
    _agent_def_storages = agent_def['storages'] || {}
    if _agent_def_storages
      agent_def['storages'] = default_agent.agent_config_storages.merge(_agent_def_storages)
    end
  end
  initialize_agent(agent_def)
end

#process_agent_defsObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 142

def process_agent_defs
  agent_defs = definition['agents']
  if agent_defs
    if agent_defs.is_a?(Hash)
      _agents = agent_defs.map do |name, agent_def|
        agent_def['name'] ||= name
        # agent_def['type'] ||= name
        process_agent_def(agent_def)
      end
    elsif agent_defs.is_a?(Array)
      _agents = agent_defs.map { |agent_def| process_agent_def(agent_def) }
    else

    end
  end
  @agent ||= @default_agent
end

#process_stable_file(file) ⇒ Object



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 357

def process_stable_file(file)
  file.processing = true
  file_name       = file.name || file.path
  logger.debug { "Processing File '#{file_name}'" }

  storage_id                = definition['upload_to_storage_id']

  unless storage_id
    logger.warn { "Skipping processing of file because of missing storage ID." }
    return { success: false, message: 'Missing storage ID.' }
  end

  quarantine_directory_path = definition['quarantine_directory_path']
  completed_directory_path  = definition['completed_directory_path']
  watch_folder_upload_args  = definition['upload_args']

  # full_file_path = File.join(watch_folder.path, file.path)
  full_file_path = file.path

  upload_args = {
      file_path: full_file_path,
      storage_id: storage_id
  }
  upload_args.merge!(watch_folder_upload_args) if watch_folder_upload_args.is_a?(Hash)

  logger.debug { "Executing Upload. #{upload_args}" }
  _response = agent.upload(upload_args)
  _response = { success: _response } if _response == true || _response == false

  if _response[:success]
    if completed_directory_path
      if Dir.exist?(completed_directory_path)
        logger.debug { "Moving '#{full_file_path}' to completed directory path '#{completed_directory_path}'" }
        FileUtils.mv full_file_path, completed_directory_path
      else
        logger.warn { "Completed directory path not found: '#{completed_directory_path}'" }
        add_file_to_ignored_file_paths(file)
      end
    else
      FileUtils.rm full_file_path
    end
  else
    if quarantine_directory_path && Dir.exist?(quarantine_directory_path)
      logger.warn { "Moving '#{full_file_path}' to quarantine directory path '#{quarantine_directory_path}'" }
      FileUtils.mv full_file_path, quarantine_directory_path
    else
      logger.warn { "Adding '#{full_file_path}' to the temporary ignore list." }
      add_file_to_ignored_file_paths(file)
    end
  end

  file.processed = true

  _response
rescue => e
  file.exception = e
  raise e
ensure
  file.processing = false
end

#process_stable_filesObject



419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 419

def process_stable_files
  maximum_active_processors = definition['maximum_active_processors']

  includes = definition['includes']
  excludes = definition['excludes']

  stable_files.each do |file|
    file.watch_folder ||= self
    next if file.respond_to?(:ignore?) ? file.ignore? : ignored_file_paths.include?(file.path)
    next if file.processing || file.processed

    if includes && !includes.empty?
      should_include = find_in_patterns(includes, file)
      unless should_include
        add_file_to_ignored_file_paths(file)
        next
      end
    end

    should_exclude = find_in_patterns(excludes, file)
    if should_exclude
      add_file_to_ignored_file_paths(file)
      next
    end

    if @threaded
      processors.keep_if { |k, v| k.processing }
      if processors.length >= maximum_active_processors
        logger.debug { "Maximum number of active processors reached for watch folder. #{wf.name || wf.paths}" }
        break
      end
      t = Thread.new(file) do |file|
        begin
          process_stable_file(file)
        rescue => e
          logger.error { "Exception '#{e.message}' in thread for `#{name}` `#{file.path}`. " }
          raise e
        ensure
          file.processing = false rescue nil
        end
      end
      # t.join
      processors[file] = t if file.processing
    else
      process_stable_file(file)
    end

  end
end

#process_watch_folder_def(watch_folder_def = @definition) ⇒ Object

Parameters:

  • watch_folder_def (Hash) (defaults to: @definition)

Options Hash (watch_folder_def):

  • path (String)
  • upload_to_storage_id (String)
  • name (String) — default: path
  • paths (Array<String>) — default: [path]
  • exclude (String) — default: '**/.*'
  • excludes (Array<string>) — default: [exclude]
  • include (String)
  • includes (Array<String>) — default: [include]
  • quarantine_directory_path (String)
  • completed_directory_path (String)
  • import_args (Hash) — default: {}
  • import_options (Hash) — default: {}
  • maximum_active_processors (Integer|False) — default: @default_maximum_active_processors
  • logging (Hash)


176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 176

def process_watch_folder_def(watch_folder_def = @definition)
  logger.debug { "Initializing Watch Folder #{watch_folder_def.inspect}" }

  logger.debug { "Initializing parameter 'paths'." }
  name = watch_folder_def['name']

  path = watch_folder_def['path']

  paths = watch_folder_def['paths'] ||= []
  paths = [ paths ] if paths.is_a?(String)
  paths.concat [*path] if path
  paths.map! { |p| File.expand_path(p) }
  if paths.empty?
    name_as_path = File.expand_path(name)
    paths.concat name_as_path if Dir.exist?(name_as_path)
  end
  paths.uniq!
  watch_folder_def['paths'] = paths
  # watch_folder_def['path'] ||= paths.first if paths.length == 1
  watch_folder_def.delete('path')

  if paths.empty?
    logger.error { "Failed to initialize watch folder. No path found in watch folder definition." }
    return false
  end
  logger.debug { "Parameter 'paths' initialized." }

  logger.debug { "Initializing parameter 'includes'." }
  include  = watch_folder_def['include']
  includes = (watch_folder_def['includes'] ||= [])
  includes.concat [*include] if include
  includes.uniq!
  includes.map! { |e| Regexp.try_convert(e) || e }
  watch_folder_def['includes'] = includes
  watch_folder_def.delete('include')
  logger.debug { "Parameter `includes` initialized." }

  logger.debug { "Initializing parameter 'excludes'." }
  exclude  = watch_folder_def['exclude']
  exclude  ||= '**/.*'
  excludes = (watch_folder_def['excludes'] ||= [])
  excludes.concat [*exclude] if exclude
  excludes.uniq!
  excludes.map! { |e| Regexp.try_convert(e) || e }
  watch_folder_def['excludes'] = excludes
  watch_folder_def.delete('exclude')
  logger.debug { "Parameter `excludes` initialized." }

  logger.debug { "Initializing parameter `quarantine directory path`." }
  quarantine_directory_path = watch_folder_def['quarantine_directory_path'] || watch_folder_def['quarantine_path']
  if quarantine_directory_path
    quarantine_directory_path                     = File.expand_path(quarantine_directory_path)
    watch_folder_def['quarantine_directory_path'] = quarantine_directory_path

    unless Dir.exist?(quarantine_directory_path)
      logger.warn { "Quarantine directory path '#{quarantine_directory_path}' does not exist. Files will be ignored instead." }
    end
  end
  watch_folder_def.delete('quarantine_path')
  logger.debug { "Parameter `quarantine directory path` initialized." }

  logger.debug { "Initializing parameter 'completed directory path'." }
  completed_directory_path = watch_folder_def['completed_directory_path'] || watch_folder_def['completed_path']
  if completed_directory_path
    completed_directory_path                     = File.expand_path(completed_directory_path)
    watch_folder_def['completed_directory_path'] = completed_directory_path

    unless Dir.exist?(completed_directory_path)
      logger.warn { "Completed directory path '#{completed_directory_path}' does not exist. File will be ignored instead." }
    end
  end
  watch_folder_def.delete('completed_path')
  logger.debug { "Parameter 'completed directory path' initialized." }

  logger.debug { "Initializing parameter `upload to storage id`." }
  storage_id = watch_folder_def['upload_to_storage_id'] || watch_folder_def['storage_id']
  watch_folder_def['upload_to_storage_id'] ||= storage_id
  watch_folder_def.delete('storage_id')
  unless storage_id
    logger.warn { "No `upload to storage id` specified. Uploading will be skipped for this watch folder." }
  end
  logger.debug { "Parameter 'upload to storage id' initialized." }

  logger.debug { "Initializing parameter 'upload/import arguments'." }
  upload_args               = watch_folder_def['upload_args']
  item_add_args             = watch_folder_def['item_add_args'] || { }
  item_add_options          = watch_folder_def['item_add_options'] || { }
  import_args               = watch_folder_def['import_args'] || { }
  import_options            = watch_folder_def['import_options'] || { }

  import_args = Hash[import_args.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]
  import_options = Hash[import_options.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]

  # Allow adding to collection to be overridden
  add_item_to_collection = item_add_args.fetch(:add_item_to_collection,
                                               item_add_options.fetch(:add_item_to_collection,
                                                                  watch_folder_def['add_item_to_collection']))
  if add_item_to_collection.nil? || add_item_to_collection
    _add_item_to_collection = false
    collection_id = watch_folder_def['collection_id']
    if collection_id
      item_add_args[:collection_id] ||= collection_id
      _add_item_to_collection = true
      watch_folder_def.delete('collection_id')
    else
      collection_name = watch_folder_def['collection_name']
      if collection_name
        item_add_args[:collection_name] ||= collection_name
        _add_item_to_collection = true
        watch_folder_def.delete('collection_name')
      else
        file_path_collection_name_position = watch_folder_def['file_path_collection_name_position']
        if file_path_collection_name_position
          item_add_args[:file_path_collection_name_position] = file_path_collection_name_position
          _add_item_to_collection = true
          watch_folder_def.delete('file_path_collection_name_position')
        end
      end
    end
    import_options[:add_item_to_collection] ||= _add_item_to_collection
  end

   = watch_folder_def['metadata']
  if 
    item_add_args[:metadata] = 
    watch_folder_def.delete('metadata')
  end

  field_group = watch_folder_def['field_group']
  if field_group
    item_add_args[:field_group] = field_group
    watch_folder_def.delete('field_group')
  end

  ingest_group = watch_folder_def['ingest_group']
  if ingest_group
     = (import_args[:jobmetadata] ||= '')
     += ',' unless .empty?
     += "portal_groups:StringArray=#{ingest_group}"
    import_args[:jobmetadata] = 
    watch_folder_def.delete('ingest_group')
  end

  item_add_args = symbolize_keys(item_add_args)
  (item_add_args[:import_args] ||= {}).merge! import_args if import_args.is_a?(Hash)
  (item_add_args[:import_options] ||= {}).merge! import_options if import_options.is_a?(Hash)

  upload_args = symbolize_keys(upload_args)
  ((upload_args ||= {})[:item_add_args] ||= {}).merge! item_add_args if item_add_args.is_a?(Hash)
  ((upload_args ||= {})[:item_add_options] ||= {}).merge! symbolize_keys(item_add_options) if item_add_options.is_a?(Hash)

  watch_folder_def.delete('import_args')
  watch_folder_def.delete('import_options')

  watch_folder_def['upload_args'] = upload_args
  logger.debug { "Parameter 'upload/import arguments' initialized. #{upload_args}" }

  maximum_active_processors = watch_folder_def['maximum_active_processors']
  if maximum_active_processors.nil?
    maximum_active_processors = @default_maximum_active_processors
    watch_folder_def['maximum_active_processors'] = maximum_active_processors
  end

  logger.debug { "Initializing parameter 'agents'." }
  agent = watch_folder_def['agent']
  agents = watch_folder_def['agents'] ||= [ ]
  if agents.is_a?(Hash)
    agents = agents.map { |k,v| v['name'] ||= k;  v }
  end
  if agent
    if agent.is_a?(Hash) && agent.keys.length == 1
      agent = agent.map { |k,v| v['name'] ||= k; v }
    end
    agents.concat [*agent]
    watch_folder_def.delete('agent')
  end
  logger.debug { "Parameter 'agent' initialized." }

  @poll_interval = watch_folder_def['poll_interval'] ||= DEFAULT_POLL_INTERVAL
end

#runObject



469
470
471
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 469

def run
  handler.run if handler.respond_to?(:run)
end

#stable_filesObject



473
474
475
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 473

def stable_files
  handler.stable_files
end

#symbolize_keys(value, recursive = true) ⇒ Object

Converts hash keys to symbols

Parameters:

  • value (Hash)

    hash

  • recursive (Boolean) (defaults to: true)

    Will recurse into any values that are hashes or arrays



481
482
483
484
485
486
487
488
489
490
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 481

def symbolize_keys (value, recursive = true)
  case value
  when Hash
    Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }]
  when Array
    value.map { |v| symbolize_keys(v, recursive) }
  else
    value
  end
end