Class: Aspera::Cli::TransferAgent

Inherits:
Object
  • Object
show all
Defined in:
lib/aspera/cli/transfer_agent.rb

Overview

The Transfer agent is a common interface to start a transfer using one of the supported transfer agents provides CLI options to select one of the transfer agents (FASP/ascp client)

Constant Summary collapse

TRANSFER_AGENTS =
i[direct node connect httpgw trsdk].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opt_mgr, config) ⇒ TransferAgent

Returns a new instance of TransferAgent.

Parameters:

  • env

    external objects: option manager, config file manager



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/aspera/cli/transfer_agent.rb', line 46

def initialize(opt_mgr, config)
  @opt_mgr = opt_mgr
  @config = config
  # command line can override transfer spec
  @transfer_spec_cmdline = {'create_dir' => true}
  @transfer_info = {}
  # the currently selected transfer agent
  @agent = nil
  @progress_listener = Listener::ProgressMulti.new
  # source/destination pair, like "paths" of transfer spec
  @transfer_paths = nil
  @opt_mgr.declare(:ts, 'Override transfer spec values', types: Hash, handler: {o: self, m: :option_transfer_spec})
  @opt_mgr.declare(:to_folder, 'Destination folder for transferred files')
  @opt_mgr.declare(:sources, "How list of transferred files is provided (#{FILE_LIST_OPTIONS.join(',')})")
  @opt_mgr.declare(:src_type, 'Type of file list', values: i[list pair], default: :list)
  @opt_mgr.declare(:transfer, 'Type of transfer agent', values: TRANSFER_AGENTS, default: :direct)
  @opt_mgr.declare(:transfer_info, 'Parameters for transfer agent', types: Hash, handler: {o: self, m: :option_transfer_info})
  @opt_mgr.declare(:progress, 'Type of progress bar', values: i[none native multi], default: :native)
  @opt_mgr.parse_options!
end

Class Method Details

.session_status(statuses) ⇒ Object

else return the first error exception object

Returns:

  • :success if all sessions statuses returned by “start” are success



38
39
40
41
42
# File 'lib/aspera/cli/transfer_agent.rb', line 38

def session_status(statuses)
  error_statuses = statuses.reject{|i|i.eql?(:success)}
  return :success if error_statuses.empty?
  return error_statuses.first
end

Instance Method Details

#agent_instance=(instance) ⇒ Object



94
95
96
97
98
99
100
101
102
# File 'lib/aspera/cli/transfer_agent.rb', line 94

def agent_instance=(instance)
  @agent = instance
  @agent.add_listener(Listener::Logger.new)
  # use local progress bar if asked so, or if native and non local ascp (because only local ascp has native progress bar)
  if @opt_mgr.get_option(:progress, mandatory: true).eql?(:multi) ||
      (@opt_mgr.get_option(:progress, mandatory: true).eql?(:native) && !instance.class.to_s.eql?('Aspera::Fasp::AgentDirect'))
    @agent.add_listener(@progress_listener)
  end
end

#destination_folder(direction) ⇒ Object

return destination folder for transfers sets default if needed param: ‘send’ or ‘receive’



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/aspera/cli/transfer_agent.rb', line 134

def destination_folder(direction)
  dest_folder = @opt_mgr.get_option(:to_folder)
  # do not expand path, if user wants to expand path: user @path:
  return dest_folder unless dest_folder.nil?
  dest_folder = @transfer_spec_cmdline['destination_root']
  return dest_folder unless dest_folder.nil?
  # default: / on remote, . on local
  case direction.to_s
  when Fasp::TransferSpec::DIRECTION_SEND then dest_folder = '/'
  when Fasp::TransferSpec::DIRECTION_RECEIVE then dest_folder = '.'
  else raise "wrong direction: #{direction}"
  end
  return dest_folder
end

#option_transfer_infoObject



86
# File 'lib/aspera/cli/transfer_agent.rb', line 86

def option_transfer_info; @transfer_info; end

#option_transfer_info=(value) ⇒ Object

multiple option are merged



89
90
91
92
# File 'lib/aspera/cli/transfer_agent.rb', line 89

def option_transfer_info=(value)
  raise 'option transfer_info shall be a Hash' unless value.is_a?(Hash)
  @transfer_info.deep_merge!(value)
end

#option_transfer_specObject



67
# File 'lib/aspera/cli/transfer_agent.rb', line 67

def option_transfer_spec; @transfer_spec_cmdline; end

#option_transfer_spec=(value) ⇒ Object

multiple option are merged



70
71
72
73
# File 'lib/aspera/cli/transfer_agent.rb', line 70

def option_transfer_spec=(value)
  raise 'option ts shall be a Hash' unless value.is_a?(Hash)
  @transfer_spec_cmdline.deep_merge!(value)
end

#option_transfer_spec_deep_merge(ts) ⇒ Object

add other transfer spec parameters



76
# File 'lib/aspera/cli/transfer_agent.rb', line 76

def option_transfer_spec_deep_merge(ts); @transfer_spec_cmdline.deep_merge!(ts); end

#send_email_transfer_notification(transfer_spec, statuses) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
# File 'lib/aspera/cli/transfer_agent.rb', line 246

def send_email_transfer_notification(transfer_spec, statuses)
  return if @opt_mgr.get_option(:notif_to).nil?
  global_status = self.class.session_status(statuses)
  email_vars = {
    global_transfer_status: global_status,
    subject:                "#{PROGRAM_NAME} transfer: #{global_status}",
    body:                   "Transfer is: #{global_status}",
    ts:                     transfer_spec
  }
  @config.send_email_template(email_template_default: DEFAULT_TRANSFER_NOTIF_TMPL, values: email_vars)
end

#set_agent_by_optionsObject

analyze options and create new agent if not already created or set

Raises:



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/aspera/cli/transfer_agent.rb', line 105

def set_agent_by_options
  return nil unless @agent.nil?
  agent_type = @opt_mgr.get_option(:transfer, mandatory: true)
  # agent plugin is loaded on demand to avoid loading unnecessary dependencies
  require "aspera/fasp/agent_#{agent_type}"
  agent_options = @opt_mgr.get_option(:transfer_info)
  raise CliBadArgument, "the transfer agent configuration shall be Hash, not #{agent_options.class} (#{agent_options}), "\
    'e.g. use @json:<json>' unless agent_options.is_a?(Hash)
  # special case: use default node
  if agent_type.eql?(:node) && agent_options.empty?
    param_set_name = @config.get_plugin_default_config_name(:node)
    raise CliBadArgument, "No default node configured. Please specify #{Manager.option_name_to_line(:transfer_info)}" if param_set_name.nil?
    agent_options = @config.preset_by_name(param_set_name)
  end
  # special case: native progress bar
  if agent_type.eql?(:direct) && @opt_mgr.get_option(:progress, mandatory: true).eql?(:native)
    agent_options[:quiet] = false
  end
  # normalize after getting from user or default node
  agent_options = agent_options.symbolize_keys
  # get agent instance
  new_agent = Kernel.const_get("Aspera::Fasp::Agent#{agent_type.capitalize}").new(agent_options)
  self.agent_instance = new_agent
  return nil
end

#shutdownObject

shut down if agent requires it



259
260
261
# File 'lib/aspera/cli/transfer_agent.rb', line 259

def shutdown
  @agent.shutdown if @agent.respond_to?(:shutdown)
end

#source_listArray

Returns list of source files.

Returns:

  • (Array)

    list of source files



150
151
152
153
154
# File 'lib/aspera/cli/transfer_agent.rb', line 150

def source_list
  return ts_source_paths.map do |i|
    i['source']
  end
end

#start(transfer_spec, rest_token: nil) ⇒ Object

start a transfer and wait for completion, plugins shall use this method

Parameters:

  • transfer_spec (Hash)
  • rest_token (Rest) (defaults to: nil)

    if oauth token regeneration supported



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/aspera/cli/transfer_agent.rb', line 208

def start(transfer_spec, rest_token: nil)
  # check parameters
  raise 'transfer_spec must be hash' unless transfer_spec.is_a?(Hash)
  # process :src option
  case transfer_spec['direction']
  when Fasp::TransferSpec::DIRECTION_RECEIVE
    # init default if required in any case
    @transfer_spec_cmdline['destination_root'] ||= destination_folder(transfer_spec['direction'])
  when Fasp::TransferSpec::DIRECTION_SEND
    if transfer_spec.dig('tags', Fasp::TransferSpec::TAG_RESERVED, 'node', 'access_key')
      # gen4
      @transfer_spec_cmdline.delete('destination_root') if @transfer_spec_cmdline.key?('destination_root_id')
    elsif transfer_spec.key?('token')
      # gen3
      # in that case, destination is set in return by application (API/upload_setup)
      # but to_folder was used in initial API call
      @transfer_spec_cmdline.delete('destination_root')
    else
      # init default if required
      @transfer_spec_cmdline['destination_root'] ||= destination_folder(transfer_spec['direction'])
    end
  end
  # update command line paths, unless destination already has one
  @transfer_spec_cmdline['paths'] = transfer_spec['paths'] || ts_source_paths
  # updated transfer spec with command line
  updated_ts(transfer_spec)
  # create transfer agent
  set_agent_by_options
  Log.log.debug{"transfer agent is a #{@agent.class}"}
  @agent.start_transfer(transfer_spec, token_regenerator: rest_token)
  # list of: :success or "error message string"
  result = @agent.wait_for_transfers_completion
  @progress_listener.reset
  Fasp::AgentBase.validate_status_list(result)
  send_email_transfer_notification(transfer_spec, result)
  return result
end

#ts_source_pathsHash

This is how the list of files to be transferred is specified get paths suitable for transfer spec from command line computation is done only once, cache is kept in @transfer_paths

Returns:

  • (Hash)

    (mandatory), destination: (optional)



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/aspera/cli/transfer_agent.rb', line 160

def ts_source_paths
  # return cache if set
  return @transfer_paths unless @transfer_paths.nil?
  # start with lower priority : get paths from transfer spec on command line
  @transfer_paths = @transfer_spec_cmdline['paths'] if @transfer_spec_cmdline.key?('paths')
  # is there a source list option ?
  file_list = @opt_mgr.get_option(:sources)
  case file_list
  when nil, FILE_LIST_FROM_ARGS
    Log.log.debug('getting file list as parameters')
    # get remaining arguments
    file_list = @opt_mgr.get_next_argument('source file list', expected: :multiple)
    raise CliBadArgument, 'specify at least one file on command line or use '\
      "--sources=#{FILE_LIST_FROM_TRANSFER_SPEC} to use transfer spec" if !file_list.is_a?(Array) || file_list.empty?
  when FILE_LIST_FROM_TRANSFER_SPEC
    Log.log.debug('assume list provided in transfer spec')
    special_case_direct_with_list =
      @opt_mgr.get_option(:transfer, mandatory: true).eql?(:direct) &&
      Fasp::Parameters.ts_has_ascp_file_list(@transfer_spec_cmdline, @opt_mgr.get_option(:transfer_info))
    raise CliBadArgument, 'transfer spec on command line must have sources' if @transfer_paths.nil? && !special_case_direct_with_list
    # here we assume check of sources is made in transfer agent
    return @transfer_paths
  when Array
    Log.log.debug('getting file list as extended value')
    raise CliBadArgument, 'sources must be a Array of String' if !file_list.reject{|f|f.is_a?(String)}.empty?
  else
    raise CliBadArgument, "sources must be a Array, not #{file_list.class}"
  end
  # here, file_list is an Array or String
  if !@transfer_paths.nil?
    Log.log.warn('--sources overrides paths from --ts')
  end
  case @opt_mgr.get_option(:src_type, mandatory: true)
  when :list
    # when providing a list, just specify source
    @transfer_paths = file_list.map{|i|{'source' => i}}
  when :pair
    raise CliBadArgument, "When using pair, provide an even number of paths: #{file_list.length}" unless file_list.length.even?
    @transfer_paths = file_list.each_slice(2).to_a.map{|s, d|{'source' => s, 'destination' => d}}
  else raise 'Unsupported src_type'
  end
  Log.log.debug{"paths=#{@transfer_paths}"}
  return @transfer_paths
end

#updated_ts(transfer_spec = {}) ⇒ Hash

Returns transfer spec with updated values from command line, including removed values.

Returns:

  • (Hash)

    transfer spec with updated values from command line, including removed values



79
80
81
82
83
84
# File 'lib/aspera/cli/transfer_agent.rb', line 79

def updated_ts(transfer_spec={})
  transfer_spec.deep_merge!(@transfer_spec_cmdline)
  # recursively remove values that are nil (user wants to delete)
  transfer_spec.deep_do { |hash, key, value, _unused| hash.delete(key) if value.nil?}
  return transfer_spec
end