Class: Envoi::Restore::Agent

Inherits:
Mam::Agent show all
Defined in:
lib/envoi/restore/agent.rb

Constant Summary collapse

DEFAULT_ASPERA_ARGS =
'-v -k3 --overwrite=diff -P 33001'
DEFAULT_DESTINATION_PATH =
'.'
DEFAULT_THREAD_LIMIT =
10

Constants inherited from Mam::Agent

Mam::Agent::VERSION

Instance Attribute Summary collapse

Attributes inherited from Mam::Agent

#api_client, #config, #initial_args, #logger

Instance Method Summary collapse

Methods inherited from Mam::Agent

#dry_run?, #initialize, #initialize_api_client, #initialize_logger, load_config_and_init, load_config_from_file, load_config_from_service, load_from_config_file, load_from_config_service, #notify, #run_operation, #shell_execute

Constructor Details

This class inherits a constructor from Envoi::Mam::Agent

Instance Attribute Details

#default_aspera_ascp_argsObject

Returns the value of attribute default_aspera_ascp_args.



64
65
66
# File 'lib/envoi/restore/agent.rb', line 64

def default_aspera_ascp_args
  @default_aspera_ascp_args
end

#default_aspera_ascp_pathObject

Returns the value of attribute default_aspera_ascp_path.



64
65
66
# File 'lib/envoi/restore/agent.rb', line 64

def default_aspera_ascp_path
  @default_aspera_ascp_path
end

Instance Method Details

#after_initializeObject



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/envoi/restore/agent.rb', line 67

def after_initialize
  args = initial_args
  @running = false
  @should_stop = false
  @default_aspera_ascp_path   = args[:default_aspera_ascp_path]
  @default_aspera_args = args[:default_ascp_args] || DEFAULT_ASPERA_ARGS

  @thread_limit = args.fetch(:thread_limit, DEFAULT_THREAD_LIMIT)

  initialize_queue_handler
  initialize_event_handler
end

#initialize_event_handlerObject



95
96
97
# File 'lib/envoi/restore/agent.rb', line 95

def initialize_event_handler
  @event_handler = GlacierRestoreEventHandler.new(config: sqs_config, logger: logger)
end

#initialize_queue_handlerObject



88
89
90
91
92
93
# File 'lib/envoi/restore/agent.rb', line 88

def initialize_queue_handler
  @queue_handler = SQSQueueWorker.new(config: sqs_config)
  @queue_handler.poller.before_request do |stats|
    throw :stop_polling if @should_stop
  end
end

#runObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/envoi/restore/agent.rb', line 99

def run
  @running = true
  logger.info { "Running..." }
  @threads = []
  @queue_handler.poll do |messages, stats|
    messages = [ messages ] unless messages.is_a?(Array)
    messages.each do |msg|
      break if @should_stop
      break if @thread_limit && @threads.length >= @thread_limit
      logger.info { "Processing Message #{msg.message_id}... "}
      logger.debug { "Message: #{msg}" }

      message_handler = SQSMessageHandler.new(logger: logger,
                                              queue_handler: @queue_handler.dup,
                                              message: msg.dup,
                                              event_handler: @event_handler.dup)
      t = Thread.new(message_handler) { |mh| mh.process }
      @threads << t
    end
    @threads.keep_if(&:alive?)
    throw :stop_polling if @should_stop
    throw :skip_delete
  end

  @running = false
end

#sqs_configObject



84
85
86
# File 'lib/envoi/restore/agent.rb', line 84

def sqs_config
  @sqs_config ||= system_config['sqs'] || { skip_delete: true}
end

#stopObject



126
127
128
# File 'lib/envoi/restore/agent.rb', line 126

def stop
  @should_stop = true
end

#system_configObject



80
81
82
# File 'lib/envoi/restore/agent.rb', line 80

def system_config
  @system_config ||= config['restore'] || { }
end