Class: Proxy::RemoteExecution::Ssh::Actions::PullScript

Inherits:
Dynflow::Action::Runner
  • Object
show all
Includes:
Dynflow::Action::Timeouts
Defined in:
lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb

Constant Summary collapse

JobDelivered =
Class.new
PickupTimeout =
Class.new
READY_FOR_PICKUP =

The proxy has the job stored in its job storage

'ready_for_pickup'
NOTIFIED =

The host was notified over MQTT at least once

'notified'
DELIVERED =

The host has picked up the job

'delivered'
RUNNING =

We received at least one output from the host

'running'

Instance Method Summary collapse

Instance Method Details

#cleanup(_plan = nil) ⇒ Object



62
63
64
65
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 62

def cleanup(_plan = nil)
  job_storage.drop_job(execution_plan_id, run_step_id)
  Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance.done(input[:job_uuid]) if with_mqtt?
end

#host_nameObject



171
172
173
174
175
176
177
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 171

def host_name
  alternative_names = input.fetch(:alternative_names, {})

  alternative_names[:consumer_uuid] ||
    alternative_names[:fqdn] ||
    input[:hostname]
end

#init_runObject



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 49

def init_run
  plan_event(PickupTimeout, input[:time_to_pickup], optional: true) if input[:time_to_pickup]

  input[:job_uuid] = 
    job_storage.store_job(host_name, execution_plan_id, run_step_id, input[:script].tr("\r", ''), 
                          effective_user: input[:effective_user])
  output[:state] = READY_FOR_PICKUP
  output[:result] = []

  mqtt_start if with_mqtt?
  suspend
end

#job_storageObject



183
184
185
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 183

def job_storage
  Proxy::RemoteExecution::Ssh.job_storage
end

#kill_run(fail_msg = 'The job was cancelled by the user') ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 115

def kill_run(fail_msg = 'The job was cancelled by the user')
  continuous_output = Proxy::Dynflow::ContinuousOutput.new
  exit_code = nil

  case output[:state]
  when READY_FOR_PICKUP, NOTIFIED
    # If the job is not running yet on the client, wipe it from storage
    cleanup
    exit_code = 'EXCEPTION'
  when DELIVERED, RUNNING
    # Client was notified or is already running, dealing with this situation
    # is only supported if mqtt is available
    # Otherwise we have to wait it out
    if with_mqtt?
      mqtt_cancel 
      fail_msg += ', notifying the host over MQTT'
    else
      fail_msg += ', however the job was triggered without MQTT and cannot be stopped'
    end
  end
  continuous_output.add_output(fail_msg + "\n")
  process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code))

  suspend unless exit_code
end

#mqtt_cancelObject



156
157
158
159
160
161
162
163
164
165
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 156

def mqtt_cancel
  payload = mqtt_payload_base.merge(
    content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/cancel",
    metadata: {
      'event': 'cancel',
      'job_uuid': input[:job_uuid]
    }
  )
  mqtt_notify payload
end

#mqtt_notify(payload) ⇒ Object



167
168
169
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 167

def mqtt_notify(payload)
  Proxy::RemoteExecution::Ssh::MQTT.publish(mqtt_topic, JSON.dump(payload))
end

#mqtt_payload_baseObject



187
188
189
190
191
192
193
194
195
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 187

def mqtt_payload_base
  {
    type: 'data',
    message_id: SecureRandom.uuid,
    version: 1,
    sent: DateTime.now.iso8601,
    directive: 'foreman'
  }
end

#mqtt_startObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 141

def mqtt_start
  payload = mqtt_payload_base.merge(
    content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}",
    metadata: {
      'event': 'start',
      'job_uuid': input[:job_uuid],
      'return_url': "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/update",
      'version': 'v1',
      'effective_user': input[:effective_user]
    },
  )
  Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance.new(input[:job_uuid], mqtt_topic, payload)
  output[:state] = NOTIFIED
end

#mqtt_topicObject



179
180
181
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 179

def mqtt_topic
  "yggdrasil/#{host_name}/data/in"
end

#plan(action_input) ⇒ Object



23
24
25
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 23

def plan(action_input)
  super(action_input)
end

#process_external_event(event) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 67

def process_external_event(event)
  output[:state] = RUNNING
  data = event.data
  case data['version']
  when nil
    process_external_unversioned(data)
  when '1'
    process_external_v1(data)
  else
    raise "Unexpected update message version '#{data['version']}'"
  end
end

#process_external_unversioned(payload) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 80

def process_external_unversioned(payload)
  continuous_output = Proxy::Dynflow::ContinuousOutput.new
  if payload.key?('output')
    Array(payload['output']).each do |line|
      continuous_output.add_output(line, payload['type'])
    end
  end
  exit_code = payload['exit_code'].to_i if payload['exit_code']
  process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code))
end

#process_external_v1(payload) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 91

def process_external_v1(payload)
  continuous_output = Proxy::Dynflow::ContinuousOutput.new
  exit_code = nil

  payload['updates'].each do |update|
    time = Time.parse update['timestamp']
    type = update['type']
    case type
    when 'output'
      continuous_output.add_output(update['content'], update['stream'], time)
    when 'exit'
      exit_code = update['exit_code'].to_i
    else
      raise "Unexpected update type '#{update['type']}'"
    end
  end

  process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code))
end

#process_pickup_timeoutObject



197
198
199
200
201
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 197

def process_pickup_timeout
  suspend unless [READY_FOR_PICKUP, NOTIFIED].include? output[:state]

  kill_run 'The job was not picked up in time'
end

#process_timeoutObject



111
112
113
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 111

def process_timeout
  kill_run "Execution timeout exceeded"
end

#run(event = nil) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 27

def run(event = nil)
  if event == JobDelivered
    output[:state] = DELIVERED
    suspend
  elsif event == PickupTimeout
    process_pickup_timeout
  elsif event == ::Dynflow::Action::Timeouts::Timeout
    process_timeout
  elsif event.nil?
    if (timeout = input['execution_timeout_interval'])
      schedule_timeout(timeout, optional: true)
    end
    super
  else
    super
  end
rescue => e
  cleanup
  action_logger.error(e)
  process_update(Proxy::Dynflow::Runner::Update.encode_exception('Proxy error', e))
end

#with_mqtt?Boolean

Returns:

  • (Boolean)


203
204
205
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 203

def with_mqtt?
  ::Proxy::RemoteExecution::Ssh.with_mqtt?
end