Class: Envoi::Restore::Agent
Constant Summary
collapse
- DEFAULT_ASPERA_ARGS =
'-v -k3 --overwrite=diff -P 33001'
- DEFAULT_DESTINATION_PATH =
'.'
- DEFAULT_THREAD_LIMIT =
10
Constants inherited
from Mam::Agent
Mam::Agent::VERSION
Instance Attribute Summary collapse
Attributes inherited from Mam::Agent
#api_client, #config, #initial_args, #logger
Instance Method Summary
collapse
Methods inherited from Mam::Agent
#dry_run?, #initialize, #initialize_api_client, #initialize_logger, load_config_and_init, load_config_from_file, load_config_from_service, load_from_config_file, load_from_config_service, #notify, #run_operation, #shell_execute
Instance Attribute Details
#default_aspera_ascp_args ⇒ Object
Returns the value of attribute default_aspera_ascp_args.
64
65
66
|
# File 'lib/envoi/restore/agent.rb', line 64
def default_aspera_ascp_args
@default_aspera_ascp_args
end
|
#default_aspera_ascp_path ⇒ Object
Returns the value of attribute default_aspera_ascp_path.
64
65
66
|
# File 'lib/envoi/restore/agent.rb', line 64
def default_aspera_ascp_path
@default_aspera_ascp_path
end
|
Instance Method Details
#after_initialize ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/envoi/restore/agent.rb', line 67
def after_initialize
args = initial_args
@running = false
@should_stop = false
@default_aspera_ascp_path = args[:default_aspera_ascp_path]
@default_aspera_args = args[:default_ascp_args] || DEFAULT_ASPERA_ARGS
@thread_limit = args.fetch(:thread_limit, DEFAULT_THREAD_LIMIT)
initialize_queue_handler
initialize_event_handler
end
|
#initialize_event_handler ⇒ Object
95
96
97
|
# File 'lib/envoi/restore/agent.rb', line 95
def initialize_event_handler
@event_handler = GlacierRestoreEventHandler.new(config: sqs_config, logger: logger)
end
|
#initialize_queue_handler ⇒ Object
88
89
90
91
92
93
|
# File 'lib/envoi/restore/agent.rb', line 88
def initialize_queue_handler
@queue_handler = SQSQueueWorker.new(config: sqs_config)
@queue_handler.poller.before_request do |stats|
throw :stop_polling if @should_stop
end
end
|
#run ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/envoi/restore/agent.rb', line 99
def run
@running = true
logger.info { "Running..." }
@threads = []
@queue_handler.poll do |messages, stats|
messages = [ messages ] unless messages.is_a?(Array)
messages.each do |msg|
break if @should_stop
break if @thread_limit && @threads.length >= @thread_limit
logger.info { "Processing Message #{msg.message_id}... "}
logger.debug { "Message: #{msg}" }
message_handler = SQSMessageHandler.new(logger: logger,
queue_handler: @queue_handler.dup,
message: msg.dup,
event_handler: @event_handler.dup)
t = Thread.new(message_handler) { |mh| mh.process }
@threads << t
end
@threads.keep_if(&:alive?)
throw :stop_polling if @should_stop
throw :skip_delete
end
@running = false
end
|
#sqs_config ⇒ Object
84
85
86
|
# File 'lib/envoi/restore/agent.rb', line 84
def sqs_config
@sqs_config ||= system_config['sqs'] || { skip_delete: true}
end
|
#stop ⇒ Object
126
127
128
|
# File 'lib/envoi/restore/agent.rb', line 126
def stop
@should_stop = true
end
|
#system_config ⇒ Object
80
81
82
|
# File 'lib/envoi/restore/agent.rb', line 80
def system_config
@system_config ||= config['restore'] || { }
end
|