Class: Aspera::Fasp::AgentHttpgw

Inherits:
AgentBase show all
Defined in:
lib/aspera/fasp/agent_httpgw.rb

Overview

start a transfer using Aspera HTTP Gateway, using web socket session for uploads

Constant Summary collapse

MSG_SEND_SLICE_UPLOAD =
'slice_upload'
MSG_SEND_TRANSFER_SPEC =
'transfer_spec'
DEFAULT_BASE_PATH =
'/aspera/http-gwy'
LOG_WS_MAIN =
'ws: send: '.green
LOG_WS_THREAD =
'ws: ack: '.red

Constants inherited from AgentBase

Aspera::Fasp::AgentBase::LISTENER_SESSION_ID_B, Aspera::Fasp::AgentBase::LISTENER_SESSION_ID_S

Instance Method Summary collapse

Methods inherited from AgentBase

#add_listener, validate_status_list

Instance Method Details

#download(transfer_spec) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/aspera/fasp/agent_httpgw.rb', line 266

def download(transfer_spec)
  transfer_spec['zip_required'] ||= false
  transfer_spec['source_root'] ||= '/'
  # is normally provided by application, like package name
  if !transfer_spec.key?('download_name')
    # by default it is the name of first file
    download_name = File.basename(transfer_spec['paths'].first['source'])
    # we remove extension
    download_name = download_name.gsub(/\.@gw_api.*$/, '')
    # ands add indication of number of files if there is more than one
    if transfer_spec['paths'].length > 1
      download_name += " #{transfer_spec['paths'].length} Files"
    end
    transfer_spec['download_name'] = download_name
  end
  creation = @gw_api.create('v1/download', {'transfer_spec' => transfer_spec})[:data]
  transfer_uuid = creation['url'].split('/').last
  file_dest =
    if transfer_spec['zip_required'] || transfer_spec['paths'].length > 1
      # it is a zip file if zip is required or there is more than 1 file
      transfer_spec['download_name'] + '.zip'
    else
      # it is a plain file if we don't require zip and there is only one file
      File.basename(transfer_spec['paths'].first['source'])
    end
  file_dest = File.join(transfer_spec['destination_root'], file_dest)
  @gw_api.call({operation: 'GET', subpath: "v1/download/#{transfer_uuid}", save_to_file: file_dest})
end

#shutdownObject

terminates monitor thread



321
# File 'lib/aspera/fasp/agent_httpgw.rb', line 321

def shutdown; end

#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object

start FASP transfer based on transfer spec (hash table) note that it is asynchronous HTTP download only supports file list



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/aspera/fasp/agent_httpgw.rb', line 298

def start_transfer(transfer_spec, token_regenerator: nil)
  raise 'GW URL must be set' if @gw_api.nil?
  raise 'paths: must be Array' unless transfer_spec['paths'].is_a?(Array)
  raise 'only token based transfer is supported in GW' unless transfer_spec['token'].is_a?(String)
  Log.dump(:user_spec, transfer_spec)
  transfer_spec['authentication'] ||= 'token'
  case transfer_spec['direction']
  when Fasp::TransferSpec::DIRECTION_SEND
    upload(transfer_spec)
  when Fasp::TransferSpec::DIRECTION_RECEIVE
    download(transfer_spec)
  else
    raise "unexpected direction: [#{transfer_spec['direction']}]"
  end
end

#upload(transfer_spec) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/aspera/fasp/agent_httpgw.rb', line 87

def upload(transfer_spec)
  # total size of all files
  total_bytes_to_transfer = 0
  # we need to keep track of actual file path because transfer spec is modified to be sent in web socket
  source_paths = []
  # get source root or nil
  source_root = transfer_spec.key?('source_root') && !transfer_spec['source_root'].empty? ? transfer_spec['source_root'] : nil
  # source root is ignored by GW, used only here
  transfer_spec.delete('source_root')
  # compute total size of files to upload (for progress)
  # modify transfer spec to be suitable for GW
  transfer_spec['paths'].each do |item|
    # save actual file location to be able read contents later
    full_src_filepath = item['source']
    # add source root if needed
    full_src_filepath = File.join(source_root, full_src_filepath) unless source_root.nil?
    # GW expects a simple file name in 'source' but if user wants to change the name, we take it
    item['source'] = File.basename(item['destination'].nil? ? item['source'] : item['destination'])
    item['file_size'] = File.size(full_src_filepath)
    total_bytes_to_transfer += item['file_size']
    # save so that we can actually read the file later
    source_paths.push(full_src_filepath)
  end
  # identify this session uniquely
  session_id = SecureRandom.uuid
  upload_url = File.join(@gw_api.params[:base_url], @options[:api_version], 'upload')
  # uri = URI.parse(upload_url)
  # open web socket to end point (equivalent to Net::HTTP.start)
  http_socket = Rest.start_http_session(upload_url)
  # little hack to get the socket opened for HTTP, handy because HTTP debug will be available
  @ws_io = http_socket.instance_variable_get(:@socket)
  # @ws_io.debug_output = Log.log
  @ws_handshake = ::WebSocket::Handshake::Client.new(url: upload_url, headers: {})
  @ws_io.write(@ws_handshake.to_s)
  sleep(0.1)
  @ws_handshake << @ws_io.readuntil("\r\n\r\n")
  raise 'Error in websocket handshake' unless @ws_handshake.finished?
  Log.log.debug{"#{LOG_WS_MAIN}handshake success"}
  # data shared between main thread and read thread
  @shared_info = {
    read_exception: nil, # error message if any in callback
    count:          {
      received_data:     0, # number of files received on other side
      received_v2_slice: 0, # number of slices received on other side
      sent_other:        0,
      sent_v2_slice:     0
    },
    mutex:          Mutex.new,
    cond_var:       ConditionVariable.new
  }
  # start read thread
  ws_read_thread = Thread.new do
    Log.log.debug{"#{LOG_WS_THREAD}read started"}
    frame = ::WebSocket::Frame::Incoming::Client.new(version: @ws_handshake.version)
    loop do
      begin # rubocop:disable Style/RedundantBegin
        # unless (recv_data = @ws_io.getc)
        #  sleep(0.1)
        #  next
        # end
        # frame << recv_data
        # frame << @ws_io.readuntil("\n")
        # frame << @ws_io.read_all
        frame << @ws_io.read(1)
        while (msg = frame.next)
          Log.log.debug{"#{LOG_WS_THREAD}type: #{msg.class}"}
          message = msg.data
          Log.log.debug{"#{LOG_WS_THREAD}message: [#{message}]"}
          if message.eql?(MSG_RECV_DATA_RECEIVED_SIGNAL)
            @shared_info[:mutex].synchronize do
              @shared_info[:count][:received_data] += 1
              @shared_info[:cond_var].signal
            end
          elsif message.eql?(MSG_RECV_SLICE_UPLOAD_SIGNAL)
            @shared_info[:mutex].synchronize do
              @shared_info[:count][:received_v2_slice] += 1
              @shared_info[:cond_var].signal
            end
          else
            message.chomp!
            error_message =
              if message.start_with?('"') && message.end_with?('"')
                JSON.parse(Base64.strict_decode64(message.chomp[1..-2]))['message']
              elsif message.start_with?('{') && message.end_with?('}')
                JSON.parse(message)['message']
              else
                "unknown message from gateway: [#{message}]"
              end
            raise error_message
          end
          Log.log.debug{"#{LOG_WS_THREAD}counts: #{@shared_info[:count]}"}
        end # while
      rescue => e
        Log.log.debug{"#{LOG_WS_THREAD}Exception: #{e}"}
        @shared_info[:mutex].synchronize do
          @shared_info[:read_exception] = e unless e.is_a?(EOFError)
          @shared_info[:cond_var].signal
        end
        break
      end # begin
    end # loop
    Log.log.debug{"#{LOG_WS_THREAD}stopping (exc=#{@shared_info[:read_exception]},cls=#{@shared_info[:read_exception].class})"}
  end
  # notify progress bar
  notify_begin(session_id, total_bytes_to_transfer)
  # first step send transfer spec
  Log.dump(:ws_spec, transfer_spec)
  ws_snd_json(MSG_SEND_TRANSFER_SPEC, transfer_spec)
  wait_for_sent_msg_ack_or_exception
  # current file index
  file_index = 0
  # aggregate size sent
  sent_bytes = 0
  # last progress event
  last_progress_time = nil

  transfer_spec['paths'].each do |item|
    # TODO: get mime type?
    file_mime_type = ''
    file_size = item['file_size']
    file_name = File.basename(item[item['destination'].nil? ? 'source' : 'destination'])
    # compute total number of slices
    slice_total = ((file_size - 1) / @options[:upload_chunk_size]) + 1
    File.open(source_paths[file_index]) do |file|
      # current slice index
      slice_index = 0
      until file.eof?
        file_bin_data = file.read(@options[:upload_chunk_size])
        slice_data = {
          name:         file_name,
          type:         file_mime_type,
          size:         file_size,
          slice:        slice_index,
          total_slices: slice_total,
          fileIndex:    file_index
        }
        # Log.dump(:slice_data,slice_data) #if slice_index.eql?(0)
        # interrupt main thread if read thread failed
        raise @shared_info[:read_exception] unless @shared_info[:read_exception].nil?
        begin
          if @options[:api_version].eql?(API_V1)
            slice_data[:data] = Base64.strict_encode64(file_bin_data)
            ws_snd_json(MSG_SEND_SLICE_UPLOAD, slice_data)
          else
            ws_snd_json(MSG_SEND_SLICE_UPLOAD, slice_data) if slice_index.eql?(0)
            ws_send(file_bin_data, type: :binary)
            Log.log.debug{"#{LOG_WS_MAIN}sent bin buffer: #{file_index} / #{slice_index}"}
            ws_snd_json(MSG_SEND_SLICE_UPLOAD, slice_data) if slice_index.eql?(slice_total - 1)
          end
          wait_for_sent_msg_ack_or_exception
        rescue Errno::EPIPE => e
          raise @shared_info[:read_exception] unless @shared_info[:read_exception].nil?
          raise e
        rescue Net::ReadTimeout => e
          Log.log.warn{'A timeout condition using HTTPGW may signal a permission problem on destination. Check ascp logs on httpgw.'}
          raise e
        end
        sent_bytes += file_bin_data.length
        current_time = Time.now
        if last_progress_time.nil? || ((current_time - last_progress_time) > @options[:upload_bar_refresh_sec])
          notify_progress(session_id, sent_bytes)
          last_progress_time = current_time
        end
        slice_index += 1
      end
    end
    file_index += 1
  end

  Log.log.debug('Finished upload, waiting for end of read thread.')
  ws_read_thread.join
  Log.log.debug{"Read thread joined, result: #{@shared_info[:count][:received_data]} / #{@shared_info[:count][:sent_other]}"}
  ws_send(nil, type: :close) unless @ws_io.nil?
  @ws_io = nil
  http_socket&.finish
  notify_progress(session_id, sent_bytes)
  notify_end(session_id)
end

#url=(api_url) ⇒ Object



323
# File 'lib/aspera/fasp/agent_httpgw.rb', line 323

def url=(api_url); end

#wait_for_sent_msg_ack_or_exceptionObject

wait for all message sent to be acknowledged by HTTPGW server



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/aspera/fasp/agent_httpgw.rb', line 74

def wait_for_sent_msg_ack_or_exception
  return unless @options[:synchronous]
  @shared_info[:mutex].synchronize do
    while (@shared_info[:count][:received_data] != @shared_info[:count][:sent_other]) ||
        (@shared_info[:count][:received_v2_slice] != @shared_info[:count][:sent_v2_slice])
      Log.log.debug{"#{LOG_WS_MAIN}wait: counts: #{@shared_info[:count]}"}
      @shared_info[:cond_var].wait(@shared_info[:mutex], 1.0)
      raise @shared_info[:read_exception] unless @shared_info[:read_exception].nil?
    end
  end
  Log.log.debug{"#{LOG_WS_MAIN}sync ok: counts: #{@shared_info[:count]}"}
end

#wait_for_transfers_completionObject

wait for completion of all jobs started

Returns:

  • list of :success or error message



316
317
318
# File 'lib/aspera/fasp/agent_httpgw.rb', line 316

def wait_for_transfers_completion
  return [:success]
end

#ws_send(data_to_send, type: :text) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/aspera/fasp/agent_httpgw.rb', line 65

def ws_send(data_to_send, type: :text)
  Log.log.debug{"#{LOG_WS_MAIN}send low: type: #{type}"}
  @shared_info[:count][:sent_other] += 1 if type.eql?(:binary)
  Log.log.debug{"#{LOG_WS_MAIN}counts: #{@shared_info[:count]}"}
  frame = ::WebSocket::Frame::Outgoing::Client.new(data: data_to_send, type: type, version: @ws_handshake.version)
  @ws_io.write(frame.to_s)
end

#ws_snd_json(msg_type, payload) ⇒ Object

send message on http gw web socket



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/aspera/fasp/agent_httpgw.rb', line 51

def ws_snd_json(msg_type, payload)
  if msg_type.eql?(MSG_SEND_SLICE_UPLOAD) && @options[:api_version].eql?(API_V2)
    @shared_info[:count][:sent_v2_slice] += 1
  else
    @shared_info[:count][:sent_other] += 1
  end
  Log.log.debug do
    log_data = payload.dup
    log_data[:data] = "[data #{log_data[:data].length} bytes]" if log_data.key?(:data)
    "send_txt: #{msg_type}: #{JSON.generate(log_data)}"
  end
  ws_send(JSON.generate({msg_type => payload}))
end