Class: Aspera::Agent::Node
Overview
this singleton class is used by the CLI to provide a common interface to start a transfer before using it, the use must set the node_api member.
Constant Summary collapse
- DEFAULT_OPTIONS =
{ url: :required, username: :required, password: :required, root_id: nil }.freeze
Constants inherited from Base
Instance Method Summary collapse
-
#initialize(opts) ⇒ Node
constructor
option include: root_id if the node is an access key attr_writer :options.
-
#node_api=(new_value) ⇒ Object
use this to set the node_api end point before using the class.
-
#node_api_ ⇒ Object
used internally to ensure node api is set before using.
-
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
generic method.
-
#wait_for_transfers_completion ⇒ Object
generic method.
Methods inherited from Base
agent_list, options, #wait_for_completion
Constructor Details
#initialize(opts) ⇒ Node
option include: root_id if the node is an access key attr_writer :options
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/aspera/agent/node.rb', line 25 def initialize(opts) Aspera.assert_type(opts, Hash){'node agent options'} super(opts) = Base.(default: DEFAULT_OPTIONS, options: opts) # root id is required for access key @root_id = [:root_id] rest_params = { base_url: [:url]} if OAuth::Factory.bearer?([:password]) Aspera.assert(!@root_id.nil?){'root_id not allowed for access key'} rest_params[:headers] = Api::Node.bearer_headers([:password], access_key: [:username]) else rest_params[:auth] = { type: :basic, username: [:username], password: [:password] } end @node_api = Rest.new(**rest_params) # TODO: currently only supports one transfer. This is bad shortcut. but ok for CLI. @transfer_id = nil # Log.log.debug{Log.dump(:agent_options, @options)} end |
Instance Method Details
#node_api=(new_value) ⇒ Object
use this to set the node_api end point before using the class.
55 56 57 58 59 60 |
# File 'lib/aspera/agent/node.rb', line 55 def node_api=(new_value) if !@node_api.nil? && !new_value.nil? Log.log.warn('overriding existing node api value') end @node_api = new_value end |
#node_api_ ⇒ Object
used internally to ensure node api is set before using.
49 50 51 52 |
# File 'lib/aspera/agent/node.rb', line 49 def node_api_ Aspera.assert(!@node_api.nil?){'Before using this object, set the node_api attribute to a Aspera::Rest object'} return @node_api end |
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
generic method
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/aspera/agent/node.rb', line 63 def start_transfer(transfer_spec, token_regenerator: nil) # add root id if access key if !@root_id.nil? case transfer_spec['direction'] when Transfer::Spec::DIRECTION_SEND then transfer_spec['source_root_id'] = @root_id when Transfer::Spec::DIRECTION_RECEIVE then transfer_spec['destination_root_id'] = @root_id else Aspera.error_unexpected_value(transfer_spec['direction']) end end # add mandatory retry parameter for node api = transfer_spec['tags'] if .is_a?(Hash) && [Transfer::Spec::TAG_RESERVED].is_a?(Hash) [Transfer::Spec::TAG_RESERVED]['xfer_retry'] ||= 150 end # Optimization in case of sending to the same node # TODO: probably remove this, as /etc/hosts shall be used for that if !transfer_spec['wss_enabled'] && transfer_spec['remote_host'].eql?(URI.parse(node_api_.base_url).host) transfer_spec['remote_host'] = '127.0.0.1' end resp = node_api_.create('ops/transfers', transfer_spec)[:data] @transfer_id = resp['id'] Log.log.debug{"tr_id=#{@transfer_id}"} return @transfer_id end |
#wait_for_transfers_completion ⇒ Object
generic method
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/aspera/agent/node.rb', line 89 def wait_for_transfers_completion # set to true when we know the total size of the transfer session_started = false bytes_expected = nil # lets emulate management events to display progress bar loop do # status is empty sometimes with status 200... transfer_data = node_api_.read("ops/transfers/#{@transfer_id}")[:data] || {'status' => 'unknown'} rescue {'status' => 'waiting(api error)'} case transfer_data['status'] when 'waiting', 'partially_completed', 'unknown', 'waiting(read error)' notify_progress(session_id: nil, type: :pre_start, info: transfer_data['status']) when 'running' if !session_started notify_progress(session_id: @transfer_id, type: :session_start) session_started = true end = transfer_data['status'] = "#{message} (#{transfer_data['error_desc']})" if !transfer_data['error_desc']&.empty? notify_progress(session_id: nil, type: :pre_start, info: ) if bytes_expected.nil? && transfer_data['precalc'].is_a?(Hash) && transfer_data['precalc']['status'].eql?('ready') bytes_expected = transfer_data['precalc']['bytes_expected'] notify_progress(type: :session_size, session_id: @transfer_id, info: bytes_expected) end notify_progress(type: :transfer, session_id: @transfer_id, info: transfer_data['bytes_transferred']) when 'completed' notify_progress(type: :transfer, session_id: @transfer_id, info: bytes_expected) if bytes_expected notify_progress(type: :end, session_id: @transfer_id) break when 'failed' notify_progress(type: :end, session_id: @transfer_id) # Bug in HSTS ? transfer is marked failed, but there is no reason break if transfer_data['error_code'].eql?(0) && transfer_data['error_desc'].empty? raise Transfer::Error, "status: #{transfer_data['status']}. code: #{transfer_data['error_code']}. description: #{transfer_data['error_desc']}" else Log.log.warn{"transfer_data -> #{transfer_data}"} raise Transfer::Error, "status: #{transfer_data['status']}. code: #{transfer_data['error_code']}. description: #{transfer_data['error_desc']}" end sleep(1.0) end # TODO: get status of sessions return [] end |