Class: Aspera::Agent::Trsdk
Constant Summary collapse
- LOCAL_SOCKET_ADDR =
'127.0.0.1'- PORT_SEP =
':'- AUTO_LOCAL_TCP_PORT =
port zero means select a random available high port
"#{PORT_SEP}0"
Constants inherited from Base
Class Method Summary collapse
-
.daemon_port_from_log(log_file) ⇒ Object
Well, the port number is only in log file.
Instance Method Summary collapse
-
#initialize(user_opts = {}) ⇒ Trsdk
constructor
options come from transfer_info.
- #shutdown ⇒ Object
- #start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
- #stop_daemon ⇒ Object
- #wait_for_transfers_completion ⇒ Object
Methods inherited from Base
agent_list, options, #wait_for_completion
Constructor Details
#initialize(user_opts = {}) ⇒ Trsdk
options come from transfer_info
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/aspera/agent/trsdk.rb', line 48 def initialize(user_opts={}) super(user_opts) = Base.(default: DEFAULT_OPTIONS, options: user_opts) is_local_auto_port = [:url].eql?(AUTO_LOCAL_TCP_PORT) raise 'Cannot use options `keep` or `external` with port zero' if is_local_auto_port && ([:keep] || [:external]) Log.log.debug{Log.dump(:agent_options, )} # load SDK stub class on demand, as it's an optional gem $LOAD_PATH.unshift(Ascp::Installation.instance.sdk_ruby_folder) require 'transfer_services_pb' # keep PID for optional shutdown @daemon_pid = nil daemon_endpoint = [:url] Log.log.debug{Log.dump(:daemon_endpoint, daemon_endpoint)} # retry loop begin # no address: local bind daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{daemon_endpoint}" if daemon_endpoint.match?(/^#{PORT_SEP}[0-9]+$/o) # Create stub (without credentials) @transfer_client = Transfersdk::TransferService::Stub.new(daemon_endpoint, :this_channel_is_insecure) # Initiate actual connection get_info_response = @transfer_client.get_info(Transfersdk::InstanceInfoRequest.new) Log.log.debug{"Daemon info: #{get_info_response}"} Log.log.warn{'Attached to existing daemon'} unless @daemon_pid || [:external] || [:keep] at_exit{shutdown} rescue GRPC::Unavailable => e # if transferd is external: do not start it, or other error raise if [:external] || !e..include?('failed to connect') # we already tried to start a daemon, but it failed Aspera.assert(@daemon_pid.nil?){"Daemon started with PID #{@daemon_pid}, but connection failed to #{daemon_endpoint}}"} Log.log.warn('no daemon present, starting daemon...') if [:external] # location of daemon binary sdk_folder = File.realpath(File.join(Ascp::Installation.instance.sdk_ruby_folder, '..')) # transferd only supports local ip and port daemon_uri = URI.parse("ipv4://#{daemon_endpoint}") Aspera.assert(daemon_uri.scheme.eql?('ipv4')){"Invalid scheme daemon URI #{daemon_endpoint}"} # create a config file for daemon config = { address: daemon_uri.host, port: daemon_uri.port, fasp_runtime: { use_embedded: false, user_defined: { bin: sdk_folder, etc: sdk_folder } } } # config file and logs are created in same folder transferd_base_tmp = TempFileManager.instance.new_file_path_global('transferd') Log.log.debug{"transferd base tmp #{transferd_base_tmp}"} conf_file = "#{transferd_base_tmp}.conf" log_stdout = "#{transferd_base_tmp}.out" log_stderr = "#{transferd_base_tmp}.err" File.write(conf_file, config.to_json) @daemon_pid = Process.spawn(Ascp::Installation.instance.path(:transferd), '--config', conf_file, out: log_stdout, err: log_stderr) begin # wait for process to initialize, max 2 seconds Timeout.timeout(2.0) do # this returns if process dies (within 2 seconds) _, status = Process.wait2(@daemon_pid) raise "Transfer daemon exited with status #{status.exitstatus}. Check files: #{log_stdout} and #{log_stderr}" end rescue Timeout::Error nil end Log.log.debug{"Daemon started with pid #{@daemon_pid}"} Process.detach(@daemon_pid) if [:keep] at_exit {shutdown} # update port for next connection attempt (if auto high port was requested) daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{PORT_SEP}#{self.class.daemon_port_from_log(log_stdout)}" if is_local_auto_port # local daemon started, try again retry end end |
Class Method Details
.daemon_port_from_log(log_file) ⇒ Object
Well, the port number is only in log file
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/aspera/agent/trsdk.rb', line 29 def daemon_port_from_log(log_file) result = nil # if port is zero, a dynamic port was created, get it File.open(log_file, 'r') do |file| file.each_line do |line| # Well, it's tricky to depend on log if (m = line.match(/Info: API Server: Listening on ([^:]+):(\d+) /)) result = m[2].to_i # no "break" , need to read last matching log line end end end raise 'Port not found in daemon logs' if result.nil? Log.log.debug{"Got port #{result} from log"} return result end |
Instance Method Details
#shutdown ⇒ Object
173 174 175 |
# File 'lib/aspera/agent/trsdk.rb', line 173 def shutdown stop_daemon unless [:keep] end |
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/aspera/agent/trsdk.rb', line 123 def start_transfer(transfer_spec, token_regenerator: nil) # create a transfer request transfer_request = Transfersdk::TransferRequest.new( transferType: Transfersdk::TransferType::FILE_REGULAR, # transfer type (file/stream) config: Transfersdk::TransferConfig.new, # transfer configuration transferSpec: transfer_spec.to_json) # transfer definition # send start transfer request to the transfer manager daemon start_transfer_response = @transfer_client.start_transfer(transfer_request) Log.log.debug{"start transfer response #{start_transfer_response}"} @transfer_id = start_transfer_response.transferId Log.log.debug{"transfer started with id #{@transfer_id}"} end |
#stop_daemon ⇒ Object
177 178 179 180 181 182 183 184 185 |
# File 'lib/aspera/agent/trsdk.rb', line 177 def stop_daemon if !@daemon_pid.nil? Log.log.debug("Stopping daemon #{@daemon_pid}") Process.kill('INT', @daemon_pid) _, status = Process.wait2(@daemon_pid) Log.log.debug("daemon stopped #{status}") @daemon_pid = nil end end |
#wait_for_transfers_completion ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/aspera/agent/trsdk.rb', line 136 def wait_for_transfers_completion # set to true when we know the total size of the transfer session_started = false bytes_expected = nil # monitor transfer status @transfer_client.monitor_transfers(Transfersdk::RegistrationRequest.new(transferId: [@transfer_id])) do |response| Log.log.debug{Log.dump(:response, response.to_h)} # Log.log.debug{"#{response.sessionInfo.preTransferBytes} #{response.transferInfo.bytesTransferred}"} case response.status when :RUNNING if !session_started notify_progress(session_id: @transfer_id, type: :session_start) session_started = true end if bytes_expected.nil? && !response.sessionInfo.preTransferBytes.eql?(0) bytes_expected = response.sessionInfo.preTransferBytes notify_progress(type: :session_size, session_id: @transfer_id, info: bytes_expected) end notify_progress(type: :transfer, session_id: @transfer_id, info: response.transferInfo.bytesTransferred) when :COMPLETED notify_progress(type: :transfer, session_id: @transfer_id, info: bytes_expected) if bytes_expected notify_progress(type: :end, session_id: @transfer_id) break when :FAILED, :CANCELED notify_progress(type: :end, session_id: @transfer_id) raise Transfer::Error, JSON.parse(response.)['Description'] when :QUEUED, :UNKNOWN_STATUS, :PAUSED, :ORPHANED notify_progress(session_id: nil, type: :pre_start, info: response.status.to_s.downcase) else Log.log.error{"unknown status#{response.status}"} end end # TODO: return status return [] end |