Class: Envoi::Mam::Cantemo::Agent::WatchFolderManager

Inherits:
Object
  • Object
show all
Defined in:
lib/envoi/mam/cantemo/agent/watch_folder_manager.rb

Defined Under Namespace

Classes: MultiLogger

Constant Summary collapse

LWF =

AWF = Envoi::Aspera::WatchService::WatchFolder # Aspera Watch Folder

Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT =

Listen Watch Folder

10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ WatchFolderManager

Returns a new instance of WatchFolderManager.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 39

def initialize(args = {})
  initialize_logger(args)

  logger.info { 'Initializing Agent Watch Folder Manager.' }
  args[:default_preserve_file_path] = args.fetch(:default_preserve_file_path, false)

  @config = Envoi::Mam::Cantemo::Agent.load_config_from_file(args)
  initialize_logger_from_config
  args[:logger] = @logger

  @ignored_file_paths_by_watch_folder = Hash.new { |h, k| h[k] = [] }
  @ignored_file_paths_lock            = Mutex.new

  @threaded = args.fetch(:threaded, config.fetch(:threaded, config.fetch('threaded', true)))

  @default_maximum_active_processors = DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT
  @processors_by_watch_folder = Hash.new { |h, k| h[k] = {} }

  @watch_folder_defs = config[:watch_folders] || config['watch_folders']

  @default_agent_class = Envoi::Mam::Cantemo::Agent

  cantemo_config = config[:cantemo] || config['cantemo']
  if cantemo_config
    logger.debug { 'Initializing Default Cantemo Portal Agent.' }
    @default_agent = @default_agent_class.new(args.merge({ config: cantemo_config }))
    logger.debug { 'Default Cantemo Portal Agent Initialized.' }
    @default_storages = @default_agent.agent_config_storages

    if !@watch_folder_defs
      @watch_folder_defs = cantemo_config[:watch_folders] || cantemo_config['watch_folders']
    end
  else
    @default_agent = nil
  end

  process_watch_folder_defs
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



37
38
39
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37

def config
  @config
end

#default_agentObject

Returns the value of attribute default_agent.



37
38
39
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37

def default_agent
  @default_agent
end

#default_agent_classObject

Returns the value of attribute default_agent_class.



37
38
39
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37

def default_agent_class
  @default_agent_class
end

#loggerObject

Returns the value of attribute logger.



37
38
39
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37

def logger
  @logger
end

#watch_folder_defsObject

Returns the value of attribute watch_folder_defs.



37
38
39
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37

def watch_folder_defs
  @watch_folder_defs
end

#watch_foldersObject

Returns the value of attribute watch_folders.



37
38
39
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37

def watch_folders
  @watch_folders
end

Class Method Details

.run(args) ⇒ Object



246
247
248
249
250
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 246

def self.run(args)
  w = self.new(args)
  w.run
  w
end

Instance Method Details

#initialize_logger(args = {}) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 78

def initialize_logger(args = {})
  @logger = MultiLogger.new(Logger.new(STDOUT))

  _logger = args[:logger] ||= begin
    log_to = args[:log_to]
    log_age = args.fetch(:log_age, 'daily')

    log_to ? Logger.new(log_to, log_age) : nil
  end
  @logger.targets << _logger if _logger

  log_level     = args[:log_level] ||= Logger::INFO
  @logger.level = log_level if log_level
  @logger
end

#initialize_logger_from_config(_config = @config) ⇒ Object



94
95
96
97
98
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 94

def initialize_logger_from_config(_config = @config)
  logger_args = { }
  [ :log_to, :log_level, :log_age ].each { |k| v = _config[k] || _config[k.to_s]; logger_args[k] = v if v }
  initialize_logger(logger_args) unless logger_args.empty?
end

#pauseObject



216
217
218
219
220
221
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 216

def pause
  if @should_run
    logger.info { 'Pausing...' }
    watch_folders.each { |wf| wf.pause if wf.respond_to?(:pause) }
  end
end

#pollObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 149

def poll
  watch_folders_with_stable_files = [ ]
  watch_folders.each do |watch_folder|
    if watch_folder.respond_to?(:poll_interval_elapsed?)
      next unless watch_folder.poll_interval_elapsed?
    end

    if watch_folder.respond_to?(:poll)
      logger.debug { "Polling Watch Folder: #{watch_folder.name}" }
      watch_folder.poll
    end

    watch_folders_with_stable_files << watch_folder unless watch_folder.stable_files.empty?
  end

  # @TODO create processor/worker pool to limit total file processing across all watch folders
  watch_folders_with_stable_files.each do |watch_folder|
    #process_watch_folder_stable_files(watch_folder)

    total_active_processors = watch_folders.inject(0) { |sum, wf| sum + wf.processors.length }
    watch_folder.process_stable_files
    logger.debug { "Total active processors #{total_active_processors}" }
  end

  # Thread.list.each {|t| t.join unless t == Thread.current }
  # @processors_by_watch_folder.each { |wf, ap| ap.each { |f, t| t.join } } if @threaded
end

#process_watch_folder_def(watch_folder_def) ⇒ Object

Parameters:

  • watch_folder_def (Hash)

Options Hash (watch_folder_def):

  • path (String)
  • upload_to_storage_id (String)
  • name (String) — default: path
  • paths (Array<String>) — default: [path]
  • exclude (String) — default: '**/.*'
  • excludes (Array<string>) — default: [exclude]
  • include (String)
  • includes (Array<String>) — default: [include]
  • quarantine_directory_path (String)
  • completed_directory_path (String)
  • maximum_active_processors (Integer|False) — default: @default_maximum_active_processors
  • logging (Hash)


113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 113

def process_watch_folder_def(watch_folder_def)
  args_out              = {}
  logging               = watch_folder_def['logging'] || watch_folder_def
  log_to                = logging['log_to']
  log_level             = logging['log_level']
  # args_out[:log_to]     = log_to if log_to && !log_to.empty?
  # args_out[:log_level]  = log_level if log_level && !log_level.empty?
  args_out[:logger]     = logger # unless log_to
  args_out[:default_agent] = default_agent
  args_out[:default_agent_class] = default_agent_class
  args_out[:definition] = watch_folder_def

  Envoi::WatchFolderUtility::WatchFolder.new(args_out)
end

#process_watch_folder_defsObject

Iterates through watch_folder_defs and populates @watch_folders with watch folders initialized from the watch folder definitions

Supports both array and hash formats, the hash format will use the key as the mame if the definition doesn’t already have a name set



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 133

def process_watch_folder_defs
  logger.debug { 'Processing watch folder definitions.' }
  if watch_folder_defs.is_a?(Array)
    @watch_folders = watch_folder_defs.map { |watch_folder_def| process_watch_folder_def(watch_folder_def) }
  elsif watch_folder_defs.is_a?(Hash)
    @watch_folders = watch_folder_defs.map do |name, watch_folder_def|
      watch_folder_def['name'] ||= name
      process_watch_folder_def(watch_folder_def)
    end
  else
    raise "Unhandled format: #{watch_folder_defs.class.name}"
  end
  @watch_folders.keep_if { |wf| wf }
  logger.debug { 'Processing of watch folder definitions completed.' }
end

#resumeObject



223
224
225
226
227
228
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 223

def resume
  if @should_run
    logger.info { 'Resuming...' }
    watch_folders.each { |wf| wf.resume if wf.respond_to?(:resume) }
  end
end

#runObject

The main execution method



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 182

def run
  @should_run = true

  # AWF.run_once(watch_folders) { |wf| pp wf }
  # AWF.run(watch_folders) { |wf| process_watch_folder(wf) }
  logger.info { 'Running...' }
  watch_folders.map { |wf| wf.respond_to?(:run) ? wf.run : logger.debug { "Skip run for #{wf}" } }
  logger.debug { 'Initial Run Complete.' }
  while(@should_run) do
    begin
      poll
      sleep 1
    rescue Interrupt, SystemExit => e
      logger.debug { "Received Signal: #{e.class.name}" }
      break
    end
  end
rescue => e
  logger.debug { "EXCEPTION #{e.message} \n#{e.backtrace.join("\n")}\n" }
  logger.error { "An error occurred. #{e.message}" }
  raise e
ensure
  stop
  logger.info { 'Exiting...' }
end

#run_onceObject



177
178
179
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 177

def run_once
  # AWF.run_once(watch_folders) { |wf| process_watch_folder(wf) }
end

#stopObject



208
209
210
211
212
213
214
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 208

def stop
  if @should_run
    @should_run = false
    logger.info { 'Stopping...' }
    watch_folders.each { |wf| wf.stop if wf.respond_to?(:stop) }
  end
end

#symbolize_keys(value, recursive = true) ⇒ Object

Converts hash keys to symbols

Parameters:

  • value (Hash)

    hash

  • recursive (Boolean) (defaults to: true)

    Will recurse into any values that are hashes or arrays



234
235
236
237
238
239
240
241
242
243
244
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 234

def symbolize_keys (value, recursive = true)
  case value
  when Hash
    Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }]
  when Array
    value.map { |v| symbolize_keys(v, recursive) }
  else
    value
  end
  # symbolize_keys
end