Class: Envoi::Mam::Cantemo::Agent::WatchFolderManager
- Inherits:
-
Object
- Object
- Envoi::Mam::Cantemo::Agent::WatchFolderManager
- Defined in:
- lib/envoi/mam/cantemo/agent/watch_folder_manager.rb
Defined Under Namespace
Classes: MultiLogger
Constant Summary collapse
- LWF =
AWF = Envoi::Aspera::WatchService::WatchFolder # Aspera Watch Folder
Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
- DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT =
Listen Watch Folder
10
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#default_agent ⇒ Object
Returns the value of attribute default_agent.
-
#default_agent_class ⇒ Object
Returns the value of attribute default_agent_class.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#watch_folder_defs ⇒ Object
Returns the value of attribute watch_folder_defs.
-
#watch_folders ⇒ Object
Returns the value of attribute watch_folders.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ WatchFolderManager
constructor
A new instance of WatchFolderManager.
- #initialize_logger(args = {}) ⇒ Object
- #initialize_logger_from_config(_config = @config) ⇒ Object
- #pause ⇒ Object
- #poll ⇒ Object
- #process_watch_folder_def(watch_folder_def) ⇒ Object
-
#process_watch_folder_defs ⇒ Object
Iterates through watch_folder_defs and populates @watch_folders with watch folders initialized from the watch folder definitions.
- #resume ⇒ Object
-
#run ⇒ Object
The main execution method.
- #run_once ⇒ Object
- #stop ⇒ Object
-
#symbolize_keys(value, recursive = true) ⇒ Object
Converts hash keys to symbols.
Constructor Details
#initialize(args = {}) ⇒ WatchFolderManager
Returns a new instance of WatchFolderManager.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 39 def initialize(args = {}) initialize_logger(args) logger.info { 'Initializing Agent Watch Folder Manager.' } args[:default_preserve_file_path] = args.fetch(:default_preserve_file_path, false) @config = Envoi::Mam::Cantemo::Agent.load_config_from_file(args) initialize_logger_from_config args[:logger] = @logger @ignored_file_paths_by_watch_folder = Hash.new { |h, k| h[k] = [] } @ignored_file_paths_lock = Mutex.new @threaded = args.fetch(:threaded, config.fetch(:threaded, config.fetch('threaded', true))) @default_maximum_active_processors = DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT @processors_by_watch_folder = Hash.new { |h, k| h[k] = {} } @watch_folder_defs = config[:watch_folders] || config['watch_folders'] @default_agent_class = Envoi::Mam::Cantemo::Agent cantemo_config = config[:cantemo] || config['cantemo'] if cantemo_config logger.debug { 'Initializing Default Cantemo Portal Agent.' } @default_agent = @default_agent_class.new(args.merge({ config: cantemo_config })) logger.debug { 'Default Cantemo Portal Agent Initialized.' } @default_storages = @default_agent.agent_config_storages if !@watch_folder_defs @watch_folder_defs = cantemo_config[:watch_folders] || cantemo_config['watch_folders'] end else @default_agent = nil end process_watch_folder_defs end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
37 38 39 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37 def config @config end |
#default_agent ⇒ Object
Returns the value of attribute default_agent.
37 38 39 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37 def default_agent @default_agent end |
#default_agent_class ⇒ Object
Returns the value of attribute default_agent_class.
37 38 39 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37 def default_agent_class @default_agent_class end |
#logger ⇒ Object
Returns the value of attribute logger.
37 38 39 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37 def logger @logger end |
#watch_folder_defs ⇒ Object
Returns the value of attribute watch_folder_defs.
37 38 39 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37 def watch_folder_defs @watch_folder_defs end |
#watch_folders ⇒ Object
Returns the value of attribute watch_folders.
37 38 39 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 37 def watch_folders @watch_folders end |
Class Method Details
Instance Method Details
#initialize_logger(args = {}) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 78 def initialize_logger(args = {}) @logger = MultiLogger.new(Logger.new(STDOUT)) _logger = args[:logger] ||= begin log_to = args[:log_to] log_age = args.fetch(:log_age, 'daily') log_to ? Logger.new(log_to, log_age) : nil end @logger.targets << _logger if _logger log_level = args[:log_level] ||= Logger::INFO @logger.level = log_level if log_level @logger end |
#initialize_logger_from_config(_config = @config) ⇒ Object
94 95 96 97 98 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 94 def initialize_logger_from_config(_config = @config) logger_args = { } [ :log_to, :log_level, :log_age ].each { |k| v = _config[k] || _config[k.to_s]; logger_args[k] = v if v } initialize_logger(logger_args) unless logger_args.empty? end |
#pause ⇒ Object
216 217 218 219 220 221 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 216 def pause if @should_run logger.info { 'Pausing...' } watch_folders.each { |wf| wf.pause if wf.respond_to?(:pause) } end end |
#poll ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 149 def poll watch_folders_with_stable_files = [ ] watch_folders.each do |watch_folder| if watch_folder.respond_to?(:poll_interval_elapsed?) next unless watch_folder.poll_interval_elapsed? end if watch_folder.respond_to?(:poll) logger.debug { "Polling Watch Folder: #{watch_folder.name}" } watch_folder.poll end watch_folders_with_stable_files << watch_folder unless watch_folder.stable_files.empty? end # @TODO create processor/worker pool to limit total file processing across all watch folders watch_folders_with_stable_files.each do |watch_folder| #process_watch_folder_stable_files(watch_folder) total_active_processors = watch_folders.inject(0) { |sum, wf| sum + wf.processors.length } watch_folder.process_stable_files logger.debug { "Total active processors #{total_active_processors}" } end # Thread.list.each {|t| t.join unless t == Thread.current } # @processors_by_watch_folder.each { |wf, ap| ap.each { |f, t| t.join } } if @threaded end |
#process_watch_folder_def(watch_folder_def) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 113 def process_watch_folder_def(watch_folder_def) args_out = {} logging = watch_folder_def['logging'] || watch_folder_def log_to = logging['log_to'] log_level = logging['log_level'] # args_out[:log_to] = log_to if log_to && !log_to.empty? # args_out[:log_level] = log_level if log_level && !log_level.empty? args_out[:logger] = logger # unless log_to args_out[:default_agent] = default_agent args_out[:default_agent_class] = default_agent_class args_out[:definition] = watch_folder_def Envoi::WatchFolderUtility::WatchFolder.new(args_out) end |
#process_watch_folder_defs ⇒ Object
Iterates through watch_folder_defs and populates @watch_folders with watch folders initialized from the watch folder definitions
Supports both array and hash formats, the hash format will use the key as the mame if the definition doesn’t already have a name set
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 133 def process_watch_folder_defs logger.debug { 'Processing watch folder definitions.' } if watch_folder_defs.is_a?(Array) @watch_folders = watch_folder_defs.map { |watch_folder_def| process_watch_folder_def(watch_folder_def) } elsif watch_folder_defs.is_a?(Hash) @watch_folders = watch_folder_defs.map do |name, watch_folder_def| watch_folder_def['name'] ||= name process_watch_folder_def(watch_folder_def) end else raise "Unhandled format: #{watch_folder_defs.class.name}" end @watch_folders.keep_if { |wf| wf } logger.debug { 'Processing of watch folder definitions completed.' } end |
#resume ⇒ Object
223 224 225 226 227 228 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 223 def resume if @should_run logger.info { 'Resuming...' } watch_folders.each { |wf| wf.resume if wf.respond_to?(:resume) } end end |
#run ⇒ Object
The main execution method
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 182 def run @should_run = true # AWF.run_once(watch_folders) { |wf| pp wf } # AWF.run(watch_folders) { |wf| process_watch_folder(wf) } logger.info { 'Running...' } watch_folders.map { |wf| wf.respond_to?(:run) ? wf.run : logger.debug { "Skip run for #{wf}" } } logger.debug { 'Initial Run Complete.' } while(@should_run) do begin poll sleep 1 rescue Interrupt, SystemExit => e logger.debug { "Received Signal: #{e.class.name}" } break end end rescue => e logger.debug { "EXCEPTION #{e.} \n#{e.backtrace.join("\n")}\n" } logger.error { "An error occurred. #{e.}" } raise e ensure stop logger.info { 'Exiting...' } end |
#run_once ⇒ Object
177 178 179 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 177 def run_once # AWF.run_once(watch_folders) { |wf| process_watch_folder(wf) } end |
#stop ⇒ Object
208 209 210 211 212 213 214 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 208 def stop if @should_run @should_run = false logger.info { 'Stopping...' } watch_folders.each { |wf| wf.stop if wf.respond_to?(:stop) } end end |
#symbolize_keys(value, recursive = true) ⇒ Object
Converts hash keys to symbols
234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/envoi/mam/cantemo/agent/watch_folder_manager.rb', line 234 def symbolize_keys (value, recursive = true) case value when Hash Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }] when Array value.map { |v| symbolize_keys(v, recursive) } else value end # symbolize_keys end |