Class: Aspera::Agent::Node
Overview
this singleton class is used by the CLI to provide a common interface to start a transfer before using it, the use must set the ‘node_api` member.
Constant Summary
Constants inherited from Base
Instance Method Summary collapse
-
#initialize(url:, username:, password:, root_id: nil, **base_options) ⇒ Node
constructor
A new instance of Node.
-
#node_api=(new_value) ⇒ Object
use this to set the node_api end point before using the class.
-
#node_api_ ⇒ Object
used internally to ensure node api is set before using.
-
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
generic method.
-
#wait_for_transfers_completion ⇒ Object
generic method.
Methods inherited from Base
agent_list, factory_create, to_move_options, #wait_for_completion
Constructor Details
#initialize(url:, username:, password:, root_id: nil, **base_options) ⇒ Node
Returns a new instance of Node.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/aspera/agent/node.rb', line 21 def initialize( url:, username:, password:, root_id: nil, ** ) super(**) # root id is required for access key @root_id = root_id rest_params = { base_url: url} if OAuth::Factory.bearer?(password) Aspera.assert(!@root_id.nil?){'root_id not allowed for access key'} rest_params[:headers] = Api::Node.bearer_headers(password, access_key: username) else rest_params[:auth] = { type: :basic, username: username, password: password } end @node_api = Rest.new(**rest_params) # TODO: currently only supports one transfer. This is bad shortcut. but ok for CLI. @transfer_id = nil end |
Instance Method Details
#node_api=(new_value) ⇒ Object
use this to set the node_api end point before using the class.
54 55 56 57 58 59 |
# File 'lib/aspera/agent/node.rb', line 54 def node_api=(new_value) if !@node_api.nil? && !new_value.nil? Log.log.warn('overriding existing node api value') end @node_api = new_value end |
#node_api_ ⇒ Object
used internally to ensure node api is set before using.
48 49 50 51 |
# File 'lib/aspera/agent/node.rb', line 48 def node_api_ Aspera.assert(!@node_api.nil?){'Before using this object, set the node_api attribute to a Aspera::Rest object'} return @node_api end |
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
generic method
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/aspera/agent/node.rb', line 62 def start_transfer(transfer_spec, token_regenerator: nil) # add root id if access key if !@root_id.nil? case transfer_spec['direction'] when Transfer::Spec::DIRECTION_SEND then transfer_spec['source_root_id'] = @root_id when Transfer::Spec::DIRECTION_RECEIVE then transfer_spec['destination_root_id'] = @root_id else Aspera.error_unexpected_value(transfer_spec['direction']) end end # add mandatory retry parameter for node api = transfer_spec['tags'] if .is_a?(Hash) && [Transfer::Spec::TAG_RESERVED].is_a?(Hash) [Transfer::Spec::TAG_RESERVED]['xfer_retry'] ||= 150 end # Optimization in case of sending to the same node # TODO: probably remove this, as /etc/hosts shall be used for that if !transfer_spec['wss_enabled'] && transfer_spec['remote_host'].eql?(URI.parse(node_api_.base_url).host) transfer_spec['remote_host'] = '127.0.0.1' end resp = node_api_.create('ops/transfers', transfer_spec)[:data] @transfer_id = resp['id'] Log.log.debug{"tr_id=#{@transfer_id}"} return @transfer_id end |
#wait_for_transfers_completion ⇒ Object
generic method
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/aspera/agent/node.rb', line 88 def wait_for_transfers_completion # set to true when we know the total size of the transfer session_started = false bytes_expected = nil # lets emulate management events to display progress bar loop do # status is empty sometimes with status 200... transfer_data = node_api_.read("ops/transfers/#{@transfer_id}")[:data] || {'status' => 'unknown'} rescue {'status' => 'waiting(api error)'} case transfer_data['status'] when 'waiting', 'partially_completed', 'unknown', 'waiting(read error)' notify_progress(session_id: nil, type: :pre_start, info: transfer_data['status']) when 'running' if !session_started notify_progress(session_id: @transfer_id, type: :session_start) session_started = true end = transfer_data['status'] = "#{} (#{transfer_data['error_desc']})" if !transfer_data['error_desc']&.empty? notify_progress(session_id: nil, type: :pre_start, info: ) if bytes_expected.nil? && transfer_data['precalc'].is_a?(Hash) && transfer_data['precalc']['status'].eql?('ready') bytes_expected = transfer_data['precalc']['bytes_expected'] notify_progress(type: :session_size, session_id: @transfer_id, info: bytes_expected) end notify_progress(type: :transfer, session_id: @transfer_id, info: transfer_data['bytes_transferred']) when 'completed' notify_progress(type: :transfer, session_id: @transfer_id, info: bytes_expected) if bytes_expected notify_progress(type: :end, session_id: @transfer_id) break when 'failed' notify_progress(type: :end, session_id: @transfer_id) # Bug in HSTS ? transfer is marked failed, but there is no reason break if transfer_data['error_code'].eql?(0) && transfer_data['error_desc'].empty? raise Transfer::Error, "status: #{transfer_data['status']}. code: #{transfer_data['error_code']}. description: #{transfer_data['error_desc']}" else Log.log.warn{"transfer_data -> #{transfer_data}"} raise Transfer::Error, "status: #{transfer_data['status']}. code: #{transfer_data['error_code']}. description: #{transfer_data['error_desc']}" end sleep(1.0) end # TODO: get status of sessions return [] end |