Class: Envoi::WatchFolderUtility::WatchFolder
- Inherits:
-
Object
- Object
- Envoi::WatchFolderUtility::WatchFolder
show all
- Defined in:
- lib/envoi/watch_folder_utility/watch_folder.rb,
lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb,
lib/cantemo/portal/agent/cli/commands/watch_folders-working.rb
Defined Under Namespace
Classes: Foreman, Handler, State, Worker
Constant Summary
collapse
- DEFAULT_POLL_INTERVAL =
15
- DEFAULT_PROCESSOR_COUNT_LIMIT =
10
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(args = { }) ⇒ WatchFolder
Returns a new instance of WatchFolder.
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 23
def initialize(args = { })
initialize_logger(args)
@definition = args[:definition].dup
logger.debug { "Initializing Watch Folder. #{Object.__id__}" }
@ignored_file_paths = [ ]
@ignored_file_paths_lock = Mutex.new
@threaded = args.fetch(:threaded, true)
@default_maximum_active_processors = DEFAULT_PROCESSOR_COUNT_LIMIT
@processors = { }
@default_handler_class = Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
process_watch_folder_def
initialize_handler
@default_agent = args[:default_agent]
@default_agent_class = args[:default_agent_class] || @default_agent.class
@last_poll_time = nil
process_agent_defs
logger.debug { "Watch Folder Initialized. #{Object.__id__}" }
end
|
Instance Attribute Details
#agent ⇒ Object
Returns the value of attribute agent.
14
15
16
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14
def agent
@agent
end
|
#definition ⇒ Object
Returns the value of attribute definition.
14
15
16
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14
def definition
@definition
end
|
#handler ⇒ Object
Returns the value of attribute handler.
14
15
16
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14
def handler
@handler
end
|
#last_poll_time ⇒ Object
Returns the value of attribute last_poll_time.
18
19
20
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18
def last_poll_time
@last_poll_time
end
|
#logger ⇒ Object
Returns the value of attribute logger.
14
15
16
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14
def logger
@logger
end
|
#min_stable_poll_count ⇒ Object
Returns the value of attribute min_stable_poll_count.
18
19
20
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18
def min_stable_poll_count
@min_stable_poll_count
end
|
#min_stable_time ⇒ Object
Returns the value of attribute min_stable_time.
18
19
20
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18
def min_stable_time
@min_stable_time
end
|
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
18
19
20
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18
def poll_interval
@poll_interval
end
|
#processors ⇒ Object
Returns the value of attribute processors.
16
17
18
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 16
def processors
@processors
end
|
Instance Method Details
#add_file_to_ignored_file_paths(file) ⇒ Object
96
97
98
99
100
101
102
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 96
def add_file_to_ignored_file_paths(file)
logger.debug { "Adding File to Ignore Cache: '#{file.path}'" }
@ignored_file_paths_lock.synchronize do
@ignored_file_paths << file.path
file.ignore if file.respond_to?(:ignore)
end
end
|
#find_in_patterns(patterns, file) ⇒ Object
Used to compare file to patterns
105
106
107
108
109
110
111
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 105
def find_in_patterns(patterns, file)
patterns.find do |pattern|
matched = pattern.is_a?(Regexp) ? pattern.match(file.path) : File.fnmatch(pattern, file.path)
logger.debug { "#{pattern} #{matched ? 'matched' : "didn't match"} #{file.path}" }
matched
end
end
|
#ignored_file_paths ⇒ Object
113
114
115
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 113
def ignored_file_paths
handler.ignored_files_map.keys
end
|
#initialize_agent(args = {}) ⇒ Object
75
76
77
78
79
80
81
82
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 75
def initialize_agent(args = {})
@agent ||= begin
logger.debug { "Initializing Agent. #{@default_agent_class} #{args}" }
_agent = @default_agent_class.new(config: args, logger: logger, default_preserve_file_path: false)
logger.debug { "Agent Instance created." }
_agent
end
end
|
#initialize_handler(watch_folder_def = @definition) ⇒ Object
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 84
def initialize_handler(watch_folder_def = @definition)
args_out = {}
args_out[:logger] ||= logger.dup
args_out[:definition] = watch_folder_def
handler_class = @default_handler_class
logger.debug { "Creating Watch Folder Handler Instance. #{handler_class.name}" }
@handler = handler_class.new(args_out)
logger.debug { "Watch Folder Handler Instance Created. #{handler_class.name}" }
end
|
#initialize_logger(args = {}) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 53
def initialize_logger(args = {})
@logger = args[:logger]
@logger
end
|
#name ⇒ Object
138
139
140
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 138
def name
@name ||= definition['name'] || definition['paths'] || definition['path'] || Object.__id__
end
|
#poll ⇒ Object
117
118
119
120
121
122
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 117
def poll
@previous_poll_time = @last_poll_time
@last_poll_time = Time.now
handler.poll if handler.respond_to?(:poll)
end
|
#poll_interval_elapsed? ⇒ Boolean
124
125
126
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 124
def poll_interval_elapsed?
!last_poll_time || (Time.now - last_poll_time) >= poll_interval
end
|
#process_agent_def(agent_def) ⇒ Object
128
129
130
131
132
133
134
135
136
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 128
def process_agent_def(agent_def)
if @default_agent
_agent_def_storages = agent_def['storages'] || {}
if _agent_def_storages
agent_def['storages'] = default_agent.agent_config_storages.merge(_agent_def_storages)
end
end
initialize_agent(agent_def)
end
|
#process_agent_defs ⇒ Object
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 142
def process_agent_defs
agent_defs = definition['agents']
if agent_defs
if agent_defs.is_a?(Hash)
_agents = agent_defs.map do |name, agent_def|
agent_def['name'] ||= name
process_agent_def(agent_def)
end
elsif agent_defs.is_a?(Array)
_agents = agent_defs.map { |agent_def| process_agent_def(agent_def) }
else
end
end
@agent ||= @default_agent
end
|
#process_stable_file(file) ⇒ Object
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 357
def process_stable_file(file)
file.processing = true
file_name = file.name || file.path
logger.debug { "Processing File '#{file_name}'" }
storage_id = definition['upload_to_storage_id']
unless storage_id
logger.warn { "Skipping processing of file because of missing storage ID." }
return { success: false, message: 'Missing storage ID.' }
end
quarantine_directory_path = definition['quarantine_directory_path']
completed_directory_path = definition['completed_directory_path']
watch_folder_upload_args = definition['upload_args']
full_file_path = file.path
upload_args = {
file_path: full_file_path,
storage_id: storage_id
}
upload_args.merge!(watch_folder_upload_args) if watch_folder_upload_args.is_a?(Hash)
logger.debug { "Executing Upload. #{upload_args}" }
_response = agent.upload(upload_args)
_response = { success: _response } if _response == true || _response == false
if _response[:success]
if completed_directory_path
if Dir.exist?(completed_directory_path)
logger.debug { "Moving '#{full_file_path}' to completed directory path '#{completed_directory_path}'" }
FileUtils.mv full_file_path, completed_directory_path
else
logger.warn { "Completed directory path not found: '#{completed_directory_path}'" }
add_file_to_ignored_file_paths(file)
end
else
FileUtils.rm full_file_path
end
else
if quarantine_directory_path && Dir.exist?(quarantine_directory_path)
logger.warn { "Moving '#{full_file_path}' to quarantine directory path '#{quarantine_directory_path}'" }
FileUtils.mv full_file_path, quarantine_directory_path
else
logger.warn { "Adding '#{full_file_path}' to the temporary ignore list." }
add_file_to_ignored_file_paths(file)
end
end
file.processed = true
_response
rescue => e
file.exception = e
raise e
ensure
file.processing = false
end
|
#process_stable_files ⇒ Object
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 419
def process_stable_files
maximum_active_processors = definition['maximum_active_processors']
includes = definition['includes']
excludes = definition['excludes']
stable_files.each do |file|
file.watch_folder ||= self
next if file.respond_to?(:ignore?) ? file.ignore? : ignored_file_paths.include?(file.path)
next if file.processing || file.processed
if includes && !includes.empty?
should_include = find_in_patterns(includes, file)
unless should_include
add_file_to_ignored_file_paths(file)
next
end
end
should_exclude = find_in_patterns(excludes, file)
if should_exclude
add_file_to_ignored_file_paths(file)
next
end
if @threaded
processors.keep_if { |k, v| k.processing }
if processors.length >= maximum_active_processors
logger.debug { "Maximum number of active processors reached for watch folder. #{wf.name || wf.paths}" }
break
end
t = Thread.new(file) do |file|
begin
process_stable_file(file)
rescue => e
logger.error { "Exception '#{e.message}' in thread for `#{name}` `#{file.path}`. " }
raise e
ensure
file.processing = false rescue nil
end
end
processors[file] = t if file.processing
else
process_stable_file(file)
end
end
end
|
#process_watch_folder_def(watch_folder_def = @definition) ⇒ Object
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 176
def process_watch_folder_def(watch_folder_def = @definition)
logger.debug { "Initializing Watch Folder #{watch_folder_def.inspect}" }
logger.debug { "Initializing parameter 'paths'." }
name = watch_folder_def['name']
path = watch_folder_def['path']
paths = watch_folder_def['paths'] ||= []
paths = [ paths ] if paths.is_a?(String)
paths.concat [*path] if path
paths.map! { |p| File.expand_path(p) }
if paths.empty?
name_as_path = File.expand_path(name)
paths.concat name_as_path if Dir.exist?(name_as_path)
end
paths.uniq!
watch_folder_def['paths'] = paths
watch_folder_def.delete('path')
if paths.empty?
logger.error { "Failed to initialize watch folder. No path found in watch folder definition." }
return false
end
logger.debug { "Parameter 'paths' initialized." }
logger.debug { "Initializing parameter 'includes'." }
include = watch_folder_def['include']
includes = (watch_folder_def['includes'] ||= [])
includes.concat [*include] if include
includes.uniq!
includes.map! { |e| Regexp.try_convert(e) || e }
watch_folder_def['includes'] = includes
watch_folder_def.delete('include')
logger.debug { "Parameter `includes` initialized." }
logger.debug { "Initializing parameter 'excludes'." }
exclude = watch_folder_def['exclude']
exclude ||= '**/.*'
excludes = (watch_folder_def['excludes'] ||= [])
excludes.concat [*exclude] if exclude
excludes.uniq!
excludes.map! { |e| Regexp.try_convert(e) || e }
watch_folder_def['excludes'] = excludes
watch_folder_def.delete('exclude')
logger.debug { "Parameter `excludes` initialized." }
logger.debug { "Initializing parameter `quarantine directory path`." }
quarantine_directory_path = watch_folder_def['quarantine_directory_path'] || watch_folder_def['quarantine_path']
if quarantine_directory_path
quarantine_directory_path = File.expand_path(quarantine_directory_path)
watch_folder_def['quarantine_directory_path'] = quarantine_directory_path
unless Dir.exist?(quarantine_directory_path)
logger.warn { "Quarantine directory path '#{quarantine_directory_path}' does not exist. Files will be ignored instead." }
end
end
watch_folder_def.delete('quarantine_path')
logger.debug { "Parameter `quarantine directory path` initialized." }
logger.debug { "Initializing parameter 'completed directory path'." }
completed_directory_path = watch_folder_def['completed_directory_path'] || watch_folder_def['completed_path']
if completed_directory_path
completed_directory_path = File.expand_path(completed_directory_path)
watch_folder_def['completed_directory_path'] = completed_directory_path
unless Dir.exist?(completed_directory_path)
logger.warn { "Completed directory path '#{completed_directory_path}' does not exist. File will be ignored instead." }
end
end
watch_folder_def.delete('completed_path')
logger.debug { "Parameter 'completed directory path' initialized." }
logger.debug { "Initializing parameter `upload to storage id`." }
storage_id = watch_folder_def['upload_to_storage_id'] || watch_folder_def['storage_id']
watch_folder_def['upload_to_storage_id'] ||= storage_id
watch_folder_def.delete('storage_id')
unless storage_id
logger.warn { "No `upload to storage id` specified. Uploading will be skipped for this watch folder." }
end
logger.debug { "Parameter 'upload to storage id' initialized." }
logger.debug { "Initializing parameter 'upload/import arguments'." }
upload_args = watch_folder_def['upload_args']
item_add_args = watch_folder_def['item_add_args'] || { }
item_add_options = watch_folder_def['item_add_options'] || { }
import_args = watch_folder_def['import_args'] || { }
import_options = watch_folder_def['import_options'] || { }
import_args = Hash[import_args.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]
import_options = Hash[import_options.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]
add_item_to_collection = item_add_args.fetch(:add_item_to_collection,
item_add_options.fetch(:add_item_to_collection,
watch_folder_def['add_item_to_collection']))
if add_item_to_collection.nil? || add_item_to_collection
_add_item_to_collection = false
collection_id = watch_folder_def['collection_id']
if collection_id
item_add_args[:collection_id] ||= collection_id
_add_item_to_collection = true
watch_folder_def.delete('collection_id')
else
collection_name = watch_folder_def['collection_name']
if collection_name
item_add_args[:collection_name] ||= collection_name
_add_item_to_collection = true
watch_folder_def.delete('collection_name')
else
file_path_collection_name_position = watch_folder_def['file_path_collection_name_position']
if file_path_collection_name_position
item_add_args[:file_path_collection_name_position] = file_path_collection_name_position
_add_item_to_collection = true
watch_folder_def.delete('file_path_collection_name_position')
end
end
end
import_options[:add_item_to_collection] ||= _add_item_to_collection
end
metadata = watch_folder_def['metadata']
if metadata
item_add_args[:metadata] = metadata
watch_folder_def.delete('metadata')
end
field_group = watch_folder_def['field_group']
if field_group
item_add_args[:field_group] = field_group
watch_folder_def.delete('field_group')
end
ingest_group = watch_folder_def['ingest_group']
if ingest_group
job_metadata = (import_args[:jobmetadata] ||= '')
job_metadata += ',' unless job_metadata.empty?
job_metadata += "portal_groups:StringArray=#{ingest_group}"
import_args[:jobmetadata] = job_metadata
watch_folder_def.delete('ingest_group')
end
item_add_args = symbolize_keys(item_add_args)
(item_add_args[:import_args] ||= {}).merge! import_args if import_args.is_a?(Hash)
(item_add_args[:import_options] ||= {}).merge! import_options if import_options.is_a?(Hash)
upload_args = symbolize_keys(upload_args)
((upload_args ||= {})[:item_add_args] ||= {}).merge! item_add_args if item_add_args.is_a?(Hash)
((upload_args ||= {})[:item_add_options] ||= {}).merge! symbolize_keys(item_add_options) if item_add_options.is_a?(Hash)
watch_folder_def.delete('import_args')
watch_folder_def.delete('import_options')
watch_folder_def['upload_args'] = upload_args
logger.debug { "Parameter 'upload/import arguments' initialized. #{upload_args}" }
maximum_active_processors = watch_folder_def['maximum_active_processors']
if maximum_active_processors.nil?
maximum_active_processors = @default_maximum_active_processors
watch_folder_def['maximum_active_processors'] = maximum_active_processors
end
logger.debug { "Initializing parameter 'agents'." }
agent = watch_folder_def['agent']
agents = watch_folder_def['agents'] ||= [ ]
if agents.is_a?(Hash)
agents = agents.map { |k,v| v['name'] ||= k; v }
end
if agent
if agent.is_a?(Hash) && agent.keys.length == 1
agent = agent.map { |k,v| v['name'] ||= k; v }
end
agents.concat [*agent]
watch_folder_def.delete('agent')
end
logger.debug { "Parameter 'agent' initialized." }
@poll_interval = watch_folder_def['poll_interval'] ||= DEFAULT_POLL_INTERVAL
end
|
#run ⇒ Object
469
470
471
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 469
def run
handler.run if handler.respond_to?(:run)
end
|
#stable_files ⇒ Object
473
474
475
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 473
def stable_files
handler.stable_files
end
|
#symbolize_keys(value, recursive = true) ⇒ Object
Converts hash keys to symbols
481
482
483
484
485
486
487
488
489
490
|
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 481
def symbolize_keys (value, recursive = true)
case value
when Hash
Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }]
when Array
value.map { |v| symbolize_keys(v, recursive) }
else
value
end
end
|