Module: Executor::PLGDataStorage

Defined in:
lib/hyperflow-amqp-executor/plgdata_storage.rb

Constant Summary collapse

PLGDATA_ENDPOINT =
(ENV['PLGDATA_ENDPOINT'] or 'https://data.plgrid.pl')

Instance Method Summary collapse

Instance Method Details

#stage_inObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/hyperflow-amqp-executor/plgdata_storage.rb', line 13

def stage_in
  @job.inputs.each do |file|
      url = PLGDATA_ENDPOINT+'/download/' + @job.options.prefix + "/" + file.name
      local_file_name = @workdir + "/" + file.name

      Executor::logger.debug "[#{@id}] Downloading #{url} to #{local_file_name}"
      File.open(local_file_name, File::RDWR|File::CREAT) do |local_file|
        payload = {proxy: @proxy_string}
        response = @http_client.get(url, payload) do |chunk|
          local_file.write(chunk)
        end
        raise Exception, "Failed downloading input file" unless response.ok?
      end
  end
end

#stage_outObject



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/hyperflow-amqp-executor/plgdata_storage.rb', line 29

def stage_out
  @job.outputs.each do |file|
    url = PLGDATA_ENDPOINT+'/upload/' + @job.options.prefix + "/" + File.dirname(file.name)
    local_file_name = @workdir+"/"+file.name

    Executor::logger.debug "[#{@id}] Uploading #{file.name} to #{url}"
    File.open(local_file_name) do |local_file|
      payload = {proxy: @proxy_string, file: local_file}
      response = @http_client.post(url, payload)
      raise Exception, "Failed uploading output file: #{response.content}" unless response.ok?
    end
  end
end

#storage_initObject

Raises:

  • (Exception)


6
7
8
9
10
11
# File 'lib/hyperflow-amqp-executor/plgdata_storage.rb', line 6

def storage_init
  @http_client = HTTPClient.new()
  
  raise Exception, "Unable to load proxy certificate" unless File.exists?(Executor::settings.plgdata.proxy)
  @proxy_string = File.read(Executor::settings.plgdata.proxy)
end

#workdir(&block) ⇒ Object



43
44
45
# File 'lib/hyperflow-amqp-executor/plgdata_storage.rb', line 43

def workdir(&block)
  Dir::mktmpdir(&block)
end