Class: PipeFitter::DataPipelineClient

Inherits:
Object
  • Object
show all
Defined in:
lib/pipe_fitter/data_pipeline_client.rb

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ DataPipelineClient

Returns a new instance of DataPipelineClient.



10
11
12
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 10

def initialize(options)
  @options = options.map { |k, v| [k.to_sym, v] }.to_h
end

Instance Method Details

#activate(pipeline_id, parameter_file, start_timestamp) ⇒ Object



49
50
51
52
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 49

def activate(pipeline_id, parameter_file, start_timestamp)
  p = parameter_file ? load_pipeline(parameter_file) : Pipeline.new
  exec(:activate_pipeline, p.activate_opts(pipeline_id, start_timestamp)).to_h
end

#create(pipeline) ⇒ Object



44
45
46
47
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 44

def create(pipeline)
  res = exec(:create_pipeline, pipeline.create_opts)
  [res.pipeline_id, put_definition(res.pipeline_id, pipeline)]
end

#definition(pipeline_id) ⇒ Object



33
34
35
36
37
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 33

def definition(pipeline_id)
  res = exec(:get_pipeline_definition, pipeline_id: pipeline_id)
  desc = description(pipeline_id)
  Pipeline.create(res.to_h, desc.to_h)
end

#diff(pipeline_id, definition_file, format = :color) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 19

def diff(pipeline_id, definition_file, format = :color)
  p = load_pipeline(definition_file)
  [
    definition(pipeline_id).diff(p, format.to_sym),
    diff_deploy_files(definition_file, format.to_sym),
  ].compact.reject(&:empty?).join("\n")
end

#diff_deploy_files(definition_file, format = :color) ⇒ Object



64
65
66
67
68
69
70
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 64

def diff_deploy_files(definition_file, format = :color)
  p = load_pipeline(definition_file)
  p.deploy_files.map do |df|
    c = S3diff::Comparator.new(df[:dst], df[:src], sdk_opts)
    c.diff.to_s(format.to_sym) unless c.same?
  end.compact
end

#find_registered(definition_file) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 54

def find_registered(definition_file)
  p = load_pipeline(definition_file)
  pls = list_pipelines.select { |l| l.name == p.pipeline_description.name }
  res = pls.find do |pl|
    d = Pipeline::PipelineDescription.create(description(pl.id))
    d.unique_id == p.pipeline_description.unique_id
  end
  res
end

#put_definition(pipeline_id, pipeline) ⇒ Object



39
40
41
42
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 39

def put_definition(pipeline_id, pipeline)
  sync_tags(pipeline_id, pipeline)
  exec(:put_pipeline_definition, pipeline.put_definition_opts(pipeline_id)).to_h
end

#register(definition_file) ⇒ Object



14
15
16
17
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 14

def register(definition_file)
  p = load_pipeline(definition_file)
  create(p)
end

#update(pipeline_id, definition_file) ⇒ Object



27
28
29
30
31
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 27

def update(pipeline_id, definition_file)
  upload_deploy_files(definition_file)
  p = load_pipeline(definition_file)
  put_definition(pipeline_id, p)
end

#upload_deploy_files(definition_file) ⇒ Object



72
73
74
75
76
77
# File 'lib/pipe_fitter/data_pipeline_client.rb', line 72

def upload_deploy_files(definition_file)
  p = load_pipeline(definition_file)
  p.deploy_files.each do |df|
    put_object(df[:src], df[:dst])
  end
end