Class: Aspera::Agent::Trsdk

Inherits:
Base
  • Object
show all
Defined in:
lib/aspera/agent/trsdk.rb

Constant Summary collapse

LOCAL_SOCKET_ADDR =
'127.0.0.1'
PORT_SEP =
':'
AUTO_LOCAL_TCP_PORT =

port zero means select a random available high port

"#{PORT_SEP}0"

Constants inherited from Base

Base::RUBY_EXT

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

agent_list, factory_create, to_move_options, #wait_for_completion

Constructor Details

#initialize(url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, **base_options) ⇒ Trsdk

Returns a new instance of Trsdk.

Parameters:

  • url (String) (defaults to: AUTO_LOCAL_TCP_PORT)

    URL of the transfer manager daemon

  • external (Boolean) (defaults to: false)

    if true, expect that an external daemon is already running

  • keep (Boolean) (defaults to: false)

    if true, do not shutdown daemon on exit

  • base_options (Hash)

    base options



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/aspera/agent/trsdk.rb', line 44

def initialize(
  url: AUTO_LOCAL_TCP_PORT,
  external: false,
  keep:     false,
  **base_options
)
  super(**base_options)
  is_local_auto_port = @url.eql?(AUTO_LOCAL_TCP_PORT)
  raise 'Cannot use options `keep` or `external` with port zero' if is_local_auto_port && (@keep || @external)
  # load SDK stub class on demand, as it's an optional gem
  $LOAD_PATH.unshift(Ascp::Installation.instance.sdk_ruby_folder)
  require 'transfer_services_pb'
  # keep PID for optional shutdown
  @daemon_pid = nil
  daemon_endpoint = @url
  Log.log.debug{Log.dump(:daemon_endpoint, daemon_endpoint)}
  # retry loop
  begin
    # no address: local bind
    daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{daemon_endpoint}" if daemon_endpoint.match?(/^#{PORT_SEP}[0-9]+$/o)
    # Create stub (without credentials)
    @transfer_client = Transfersdk::TransferService::Stub.new(daemon_endpoint, :this_channel_is_insecure)
    # Initiate actual connection
    get_info_response = @transfer_client.get_info(Transfersdk::InstanceInfoRequest.new)
    Log.log.debug{"Daemon info: #{get_info_response}"}
    Log.log.warn{'Attached to existing daemon'} unless @daemon_pid || @external || @keep
    at_exit{shutdown}
  rescue GRPC::Unavailable => e
    # if transferd is external: do not start it, or other error
    raise if @external || !e.message.include?('failed to connect')
    # we already tried to start a daemon, but it failed
    Aspera.assert(@daemon_pid.nil?){"Daemon started with PID #{@daemon_pid}, but connection failed to #{daemon_endpoint}}"}
    Log.log.warn('no daemon present, starting daemon...') if @external
    # location of daemon binary
    sdk_folder = File.realpath(File.join(Ascp::Installation.instance.sdk_ruby_folder, '..'))
    # transferd only supports local ip and port
    daemon_uri = URI.parse("ipv4://#{daemon_endpoint}")
    Aspera.assert(daemon_uri.scheme.eql?('ipv4')){"Invalid scheme daemon URI #{daemon_endpoint}"}
    # create a config file for daemon
    config = {
      address:      daemon_uri.host,
      port:         daemon_uri.port,
      fasp_runtime: {
        use_embedded: false,
        user_defined: {
          bin: sdk_folder,
          etc: sdk_folder
        }
      }
    }
    # config file and logs are created in same folder
    transferd_base_tmp = TempFileManager.instance.new_file_path_global('transferd')
    Log.log.debug{"transferd base tmp #{transferd_base_tmp}"}
    conf_file = "#{transferd_base_tmp}.conf"
    log_stdout = "#{transferd_base_tmp}.out"
    log_stderr = "#{transferd_base_tmp}.err"
    File.write(conf_file, config.to_json)
    @daemon_pid = Process.spawn(Ascp::Installation.instance.path(:transferd), '--config', conf_file, out: log_stdout, err: log_stderr)
    begin
      # wait for process to initialize, max 2 seconds
      Timeout.timeout(2.0) do
        # this returns if process dies (within 2 seconds)
        _, status = Process.wait2(@daemon_pid)
        raise "Transfer daemon exited with status #{status.exitstatus}. Check files: #{log_stdout} and #{log_stderr}"
      end
    rescue Timeout::Error
      nil
    end
    Log.log.debug{"Daemon started with pid #{@daemon_pid}"}
    Process.detach(@daemon_pid) if @keep
    at_exit {shutdown}
    # update port for next connection attempt (if auto high port was requested)
    daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{PORT_SEP}#{self.class.daemon_port_from_log(log_stdout)}" if is_local_auto_port
    # local daemon started, try again
    retry
  end
end

Class Method Details

.daemon_port_from_log(log_file) ⇒ Object

Well, the port number is only in log file



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/aspera/agent/trsdk.rb', line 22

def daemon_port_from_log(log_file)
  result = nil
  # if port is zero, a dynamic port was created, get it
  File.open(log_file, 'r') do |file|
    file.each_line do |line|
      # Well, it's tricky to depend on log
      if (m = line.match(/Info: API Server: Listening on ([^:]+):(\d+) /))
        result = m[2].to_i
        # no "break" , need to read last matching log line
      end
    end
  end
  raise 'Port not found in daemon logs' if result.nil?
  Log.log.debug{"Got port #{result} from log"}
  return result
end

Instance Method Details

#shutdownObject



172
173
174
# File 'lib/aspera/agent/trsdk.rb', line 172

def shutdown
  stop_daemon unless @keep
end

#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/aspera/agent/trsdk.rb', line 122

def start_transfer(transfer_spec, token_regenerator: nil)
  # create a transfer request
  transfer_request = Transfersdk::TransferRequest.new(
    transferType: Transfersdk::TransferType::FILE_REGULAR, # transfer type (file/stream)
    config: Transfersdk::TransferConfig.new, # transfer configuration
    transferSpec: transfer_spec.to_json) # transfer definition
  # send start transfer request to the transfer manager daemon
  start_transfer_response = @transfer_client.start_transfer(transfer_request)
  Log.log.debug{"start transfer response #{start_transfer_response}"}
  @transfer_id = start_transfer_response.transferId
  Log.log.debug{"transfer started with id #{@transfer_id}"}
end

#stop_daemonObject



176
177
178
179
180
181
182
183
184
# File 'lib/aspera/agent/trsdk.rb', line 176

def stop_daemon
  if !@daemon_pid.nil?
    Log.log.debug("Stopping daemon #{@daemon_pid}")
    Process.kill('INT', @daemon_pid)
    _, status = Process.wait2(@daemon_pid)
    Log.log.debug("daemon stopped #{status}")
    @daemon_pid = nil
  end
end

#wait_for_transfers_completionObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/aspera/agent/trsdk.rb', line 135

def wait_for_transfers_completion
  # set to true when we know the total size of the transfer
  session_started = false
  bytes_expected = nil
  # monitor transfer status
  @transfer_client.monitor_transfers(Transfersdk::RegistrationRequest.new(transferId: [@transfer_id])) do |response|
    Log.log.debug{Log.dump(:response, response.to_h)}
    # Log.log.debug{"#{response.sessionInfo.preTransferBytes} #{response.transferInfo.bytesTransferred}"}
    case response.status
    when :RUNNING
      if !session_started
        notify_progress(session_id: @transfer_id, type: :session_start)
        session_started = true
      end
      if bytes_expected.nil? &&
          !response.sessionInfo.preTransferBytes.eql?(0)
        bytes_expected = response.sessionInfo.preTransferBytes
        notify_progress(type: :session_size, session_id: @transfer_id, info: bytes_expected)
      end
      notify_progress(type: :transfer, session_id: @transfer_id, info: response.transferInfo.bytesTransferred)
    when :COMPLETED
      notify_progress(type: :transfer, session_id: @transfer_id, info: bytes_expected) if bytes_expected
      notify_progress(type: :end, session_id: @transfer_id)
      break
    when :FAILED, :CANCELED
      notify_progress(type: :end, session_id: @transfer_id)
      raise Transfer::Error, JSON.parse(response.message)['Description']
    when :QUEUED, :UNKNOWN_STATUS, :PAUSED, :ORPHANED
      notify_progress(session_id: nil, type: :pre_start, info: response.status.to_s.downcase)
    else
      Log.log.error{"unknown status#{response.status}"}
    end
  end
  # TODO: return status
  return []
end