Class: Aspera::Fasp::AgentTrsdk

Inherits:
AgentBase show all
Defined in:
lib/aspera/fasp/agent_trsdk.rb

Constant Summary

Constants inherited from AgentBase

Aspera::Fasp::AgentBase::LISTENER_SESSION_ID_B, Aspera::Fasp::AgentBase::LISTENER_SESSION_ID_S

Instance Method Summary collapse

Methods inherited from AgentBase

#add_listener, validate_status_list

Constructor Details

#initialize(user_opts) ⇒ AgentTrsdk

options come from transfer_info



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/aspera/fasp/agent_trsdk.rb', line 19

def initialize(user_opts)
  raise "expecting Hash (or nil), but have #{user_opts.class}" unless user_opts.nil? || user_opts.is_a?(Hash)
  # set default options and override if specified
  options = DEFAULT_OPTIONS.dup
  user_opts&.each do |k, v|
    raise "Unknown local agent parameter: #{k}, expect one of #{DEFAULT_OPTIONS.keys.map(&:to_s).join(',')}" unless DEFAULT_OPTIONS.key?(k)
    options[k] = v
  end
  Log.log.debug{"options= #{options}"}
  super()
  # load and create SDK stub
  $LOAD_PATH.unshift(Installation.instance.sdk_ruby_folder)
  require 'transfer_services_pb'
  @transfer_client = Transfersdk::TransferService::Stub.new("#{options[:address]}:#{options[:port]}", :this_channel_is_insecure)
  begin
    get_info_response = @transfer_client.get_info(Transfersdk::InstanceInfoRequest.new)
    Log.log.debug{"daemon info: #{get_info_response}"}
  rescue GRPC::Unavailable
    Log.log.warn('no daemon present, starting daemon...')
    # location of daemon binary
    bin_folder = File.realpath(File.join(Installation.instance.sdk_ruby_folder, '..'))
    # config file and logs are created in same folder
    conf_file = File.join(bin_folder, 'sdk.conf')
    log_base = File.join(bin_folder, 'transferd')
    # create a config file for daemon
    config = {
      address:      options[:address],
      port:         options[:port],
      fasp_runtime: {
        use_embedded: false,
        user_defined: {
          bin: bin_folder,
          etc: bin_folder
        }
      }
    }
    File.write(conf_file, config.to_json)
    trd_pid = Process.spawn(Installation.instance.path(:transferd), '--config', conf_file, out: "#{log_base}.out", err: "#{log_base}.err")
    Process.detach(trd_pid)
    sleep(2.0)
    retry
  end
end

Instance Method Details

#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/aspera/fasp/agent_trsdk.rb', line 63

def start_transfer(transfer_spec, token_regenerator: nil)
  # create a transfer request
  transfer_request = Transfersdk::TransferRequest.new(
    transferType: Transfersdk::TransferType::FILE_REGULAR, # transfer type (file/stream)
    config: Transfersdk::TransferConfig.new, # transfer configuration
    transferSpec: transfer_spec.to_json) # transfer definition
  # send start transfer request to the transfer manager daemon
  start_transfer_response = @transfer_client.start_transfer(transfer_request)
  Log.log.debug{"start transfer response #{start_transfer_response}"}
  @transfer_id = start_transfer_response.transferId
  Log.log.debug{"transfer started with id #{@transfer_id}"}
end

#wait_for_transfers_completionObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/aspera/fasp/agent_trsdk.rb', line 76

def wait_for_transfers_completion
  started = false
  # monitor transfer status
  @transfer_client.monitor_transfers(Transfersdk::RegistrationRequest.new(transferId: [@transfer_id])) do |response|
    Log.dump(:response, response.to_h)
    # Log.log.debug{"#{response.sessionInfo.preTransferBytes} #{response.transferInfo.bytesTransferred}"}
    case response.status
    when :RUNNING
      if !started && !response.sessionInfo.preTransferBytes.eql?(0)
        notify_begin(@transfer_id, response.sessionInfo.preTransferBytes)
        started = true
      elsif started
        notify_progress(@transfer_id, response.transferInfo.bytesTransferred)
      end
    when :FAILED, :COMPLETED, :CANCELED
      notify_end(@transfer_id)
      raise Fasp::Error, JSON.parse(response.message)['Description'] unless :COMPLETED.eql?(response.status)
      break
    when :QUEUED, :UNKNOWN_STATUS, :PAUSED, :ORPHANED
      # ignore
    else
      Log.log.error{"unknown status#{response.status}"}
    end
  end
  # TODO: return status
  return []
end