Class: Aspera::Agent::Trsdk
Constant Summary collapse
- LOCAL_SOCKET_ADDR =
'127.0.0.1'
- PORT_SEP =
':'
- AUTO_LOCAL_TCP_PORT =
port zero means select a random available high port
"#{PORT_SEP}0"
Constants inherited from Base
Class Method Summary collapse
-
.daemon_port_from_log(log_file) ⇒ Object
Well, the port number is only in log file.
Instance Method Summary collapse
-
#initialize(url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, **base_options) ⇒ Trsdk
constructor
A new instance of Trsdk.
- #shutdown ⇒ Object
- #start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
- #stop_daemon ⇒ Object
- #wait_for_transfers_completion ⇒ Object
Methods inherited from Base
agent_list, factory_create, to_move_options, #wait_for_completion
Constructor Details
#initialize(url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, **base_options) ⇒ Trsdk
Returns a new instance of Trsdk.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/aspera/agent/trsdk.rb', line 44 def initialize( url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, ** ) super(**) is_local_auto_port = @url.eql?(AUTO_LOCAL_TCP_PORT) raise 'Cannot use options `keep` or `external` with port zero' if is_local_auto_port && (@keep || @external) # load SDK stub class on demand, as it's an optional gem $LOAD_PATH.unshift(Ascp::Installation.instance.sdk_ruby_folder) require 'transfer_services_pb' # keep PID for optional shutdown @daemon_pid = nil daemon_endpoint = @url Log.log.debug{Log.dump(:daemon_endpoint, daemon_endpoint)} # retry loop begin # no address: local bind daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{daemon_endpoint}" if daemon_endpoint.match?(/^#{PORT_SEP}[0-9]+$/o) # Create stub (without credentials) @transfer_client = Transfersdk::TransferService::Stub.new(daemon_endpoint, :this_channel_is_insecure) # Initiate actual connection get_info_response = @transfer_client.get_info(Transfersdk::InstanceInfoRequest.new) Log.log.debug{"Daemon info: #{get_info_response}"} Log.log.warn{'Attached to existing daemon'} unless @daemon_pid || @external || @keep at_exit{shutdown} rescue GRPC::Unavailable => e # if transferd is external: do not start it, or other error raise if @external || !e..include?('failed to connect') # we already tried to start a daemon, but it failed Aspera.assert(@daemon_pid.nil?){"Daemon started with PID #{@daemon_pid}, but connection failed to #{daemon_endpoint}}"} Log.log.warn('no daemon present, starting daemon...') if @external # location of daemon binary sdk_folder = File.realpath(File.join(Ascp::Installation.instance.sdk_ruby_folder, '..')) # transferd only supports local ip and port daemon_uri = URI.parse("ipv4://#{daemon_endpoint}") Aspera.assert(daemon_uri.scheme.eql?('ipv4')){"Invalid scheme daemon URI #{daemon_endpoint}"} # create a config file for daemon config = { address: daemon_uri.host, port: daemon_uri.port, fasp_runtime: { use_embedded: false, user_defined: { bin: sdk_folder, etc: sdk_folder } } } # config file and logs are created in same folder transferd_base_tmp = TempFileManager.instance.new_file_path_global('transferd') Log.log.debug{"transferd base tmp #{transferd_base_tmp}"} conf_file = "#{transferd_base_tmp}.conf" log_stdout = "#{transferd_base_tmp}.out" log_stderr = "#{transferd_base_tmp}.err" File.write(conf_file, config.to_json) @daemon_pid = Process.spawn(Ascp::Installation.instance.path(:transferd), '--config', conf_file, out: log_stdout, err: log_stderr) begin # wait for process to initialize, max 2 seconds Timeout.timeout(2.0) do # this returns if process dies (within 2 seconds) _, status = Process.wait2(@daemon_pid) raise "Transfer daemon exited with status #{status.exitstatus}. Check files: #{log_stdout} and #{log_stderr}" end rescue Timeout::Error nil end Log.log.debug{"Daemon started with pid #{@daemon_pid}"} Process.detach(@daemon_pid) if @keep at_exit {shutdown} # update port for next connection attempt (if auto high port was requested) daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{PORT_SEP}#{self.class.daemon_port_from_log(log_stdout)}" if is_local_auto_port # local daemon started, try again retry end end |
Class Method Details
.daemon_port_from_log(log_file) ⇒ Object
Well, the port number is only in log file
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/aspera/agent/trsdk.rb', line 22 def daemon_port_from_log(log_file) result = nil # if port is zero, a dynamic port was created, get it File.open(log_file, 'r') do |file| file.each_line do |line| # Well, it's tricky to depend on log if (m = line.match(/Info: API Server: Listening on ([^:]+):(\d+) /)) result = m[2].to_i # no "break" , need to read last matching log line end end end raise 'Port not found in daemon logs' if result.nil? Log.log.debug{"Got port #{result} from log"} return result end |
Instance Method Details
#shutdown ⇒ Object
172 173 174 |
# File 'lib/aspera/agent/trsdk.rb', line 172 def shutdown stop_daemon unless @keep end |
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/aspera/agent/trsdk.rb', line 122 def start_transfer(transfer_spec, token_regenerator: nil) # create a transfer request transfer_request = Transfersdk::TransferRequest.new( transferType: Transfersdk::TransferType::FILE_REGULAR, # transfer type (file/stream) config: Transfersdk::TransferConfig.new, # transfer configuration transferSpec: transfer_spec.to_json) # transfer definition # send start transfer request to the transfer manager daemon start_transfer_response = @transfer_client.start_transfer(transfer_request) Log.log.debug{"start transfer response #{start_transfer_response}"} @transfer_id = start_transfer_response.transferId Log.log.debug{"transfer started with id #{@transfer_id}"} end |
#stop_daemon ⇒ Object
176 177 178 179 180 181 182 183 184 |
# File 'lib/aspera/agent/trsdk.rb', line 176 def stop_daemon if !@daemon_pid.nil? Log.log.debug("Stopping daemon #{@daemon_pid}") Process.kill('INT', @daemon_pid) _, status = Process.wait2(@daemon_pid) Log.log.debug("daemon stopped #{status}") @daemon_pid = nil end end |
#wait_for_transfers_completion ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/aspera/agent/trsdk.rb', line 135 def wait_for_transfers_completion # set to true when we know the total size of the transfer session_started = false bytes_expected = nil # monitor transfer status @transfer_client.monitor_transfers(Transfersdk::RegistrationRequest.new(transferId: [@transfer_id])) do |response| Log.log.debug{Log.dump(:response, response.to_h)} # Log.log.debug{"#{response.sessionInfo.preTransferBytes} #{response.transferInfo.bytesTransferred}"} case response.status when :RUNNING if !session_started notify_progress(session_id: @transfer_id, type: :session_start) session_started = true end if bytes_expected.nil? && !response.sessionInfo.preTransferBytes.eql?(0) bytes_expected = response.sessionInfo.preTransferBytes notify_progress(type: :session_size, session_id: @transfer_id, info: bytes_expected) end notify_progress(type: :transfer, session_id: @transfer_id, info: response.transferInfo.bytesTransferred) when :COMPLETED notify_progress(type: :transfer, session_id: @transfer_id, info: bytes_expected) if bytes_expected notify_progress(type: :end, session_id: @transfer_id) break when :FAILED, :CANCELED notify_progress(type: :end, session_id: @transfer_id) raise Transfer::Error, JSON.parse(response.)['Description'] when :QUEUED, :UNKNOWN_STATUS, :PAUSED, :ORPHANED notify_progress(session_id: nil, type: :pre_start, info: response.status.to_s.downcase) else Log.log.error{"unknown status#{response.status}"} end end # TODO: return status return [] end |