Class: Proxy::RemoteExecution::Ssh::Actions::PullScript
- Inherits:
-
Dynflow::Action::Runner
- Object
- Dynflow::Action::Runner
- Proxy::RemoteExecution::Ssh::Actions::PullScript
- Includes:
- Dynflow::Action::Timeouts
- Defined in:
- lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb
Constant Summary collapse
- JobDelivered =
Class.new
- PickupTimeout =
Class.new
- READY_FOR_PICKUP =
The proxy has the job stored in its job storage
'ready_for_pickup'
- NOTIFIED =
The host was notified over MQTT at least once
'notified'
- DELIVERED =
The host has picked up the job
'delivered'
- RUNNING =
We received at least one output from the host
'running'
Instance Method Summary collapse
- #cleanup(_plan = nil) ⇒ Object
- #host_name ⇒ Object
- #init_run ⇒ Object
- #job_storage ⇒ Object
- #kill_run(fail_msg = 'The job was cancelled by the user') ⇒ Object
- #mqtt_cancel ⇒ Object
- #mqtt_notify(payload) ⇒ Object
- #mqtt_payload_base ⇒ Object
- #mqtt_start ⇒ Object
- #mqtt_topic ⇒ Object
- #plan(action_input) ⇒ Object
- #process_external_event(event) ⇒ Object
- #process_external_unversioned(payload) ⇒ Object
- #process_external_v1(payload) ⇒ Object
- #process_pickup_timeout ⇒ Object
- #process_timeout ⇒ Object
- #run(event = nil) ⇒ Object
- #with_mqtt? ⇒ Boolean
Instance Method Details
#cleanup(_plan = nil) ⇒ Object
62 63 64 65 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 62 def cleanup(_plan = nil) job_storage.drop_job(execution_plan_id, run_step_id) Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance.done(input[:job_uuid]) if with_mqtt? end |
#host_name ⇒ Object
171 172 173 174 175 176 177 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 171 def host_name alternative_names = input.fetch(:alternative_names, {}) alternative_names[:consumer_uuid] || alternative_names[:fqdn] || input[:hostname] end |
#init_run ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 49 def init_run plan_event(PickupTimeout, input[:time_to_pickup], optional: true) if input[:time_to_pickup] input[:job_uuid] = job_storage.store_job(host_name, execution_plan_id, run_step_id, input[:script].tr("\r", ''), effective_user: input[:effective_user]) output[:state] = READY_FOR_PICKUP output[:result] = [] mqtt_start if with_mqtt? suspend end |
#job_storage ⇒ Object
183 184 185 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 183 def job_storage Proxy::RemoteExecution::Ssh.job_storage end |
#kill_run(fail_msg = 'The job was cancelled by the user') ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 115 def kill_run(fail_msg = 'The job was cancelled by the user') continuous_output = Proxy::Dynflow::ContinuousOutput.new exit_code = nil case output[:state] when READY_FOR_PICKUP, NOTIFIED # If the job is not running yet on the client, wipe it from storage cleanup exit_code = 'EXCEPTION' when DELIVERED, RUNNING # Client was notified or is already running, dealing with this situation # is only supported if mqtt is available # Otherwise we have to wait it out if with_mqtt? mqtt_cancel fail_msg += ', notifying the host over MQTT' else fail_msg += ', however the job was triggered without MQTT and cannot be stopped' end end continuous_output.add_output(fail_msg + "\n") process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code)) suspend unless exit_code end |
#mqtt_cancel ⇒ Object
156 157 158 159 160 161 162 163 164 165 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 156 def mqtt_cancel payload = mqtt_payload_base.merge( content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/cancel", metadata: { 'event': 'cancel', 'job_uuid': input[:job_uuid] } ) mqtt_notify payload end |
#mqtt_notify(payload) ⇒ Object
167 168 169 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 167 def mqtt_notify(payload) Proxy::RemoteExecution::Ssh::MQTT.publish(mqtt_topic, JSON.dump(payload)) end |
#mqtt_payload_base ⇒ Object
187 188 189 190 191 192 193 194 195 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 187 def mqtt_payload_base { type: 'data', message_id: SecureRandom.uuid, version: 1, sent: DateTime.now.iso8601, directive: 'foreman' } end |
#mqtt_start ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 141 def mqtt_start payload = mqtt_payload_base.merge( content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}", metadata: { 'event': 'start', 'job_uuid': input[:job_uuid], 'return_url': "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/update", 'version': 'v1', 'effective_user': input[:effective_user] }, ) Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance.new(input[:job_uuid], mqtt_topic, payload) output[:state] = NOTIFIED end |
#mqtt_topic ⇒ Object
179 180 181 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 179 def mqtt_topic "yggdrasil/#{host_name}/data/in" end |
#plan(action_input) ⇒ Object
23 24 25 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 23 def plan(action_input) super(action_input) end |
#process_external_event(event) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 67 def process_external_event(event) output[:state] = RUNNING data = event.data case data['version'] when nil process_external_unversioned(data) when '1' process_external_v1(data) else raise "Unexpected update message version '#{data['version']}'" end end |
#process_external_unversioned(payload) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 80 def process_external_unversioned(payload) continuous_output = Proxy::Dynflow::ContinuousOutput.new if payload.key?('output') Array(payload['output']).each do |line| continuous_output.add_output(line, payload['type']) end end exit_code = payload['exit_code'].to_i if payload['exit_code'] process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code)) end |
#process_external_v1(payload) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 91 def process_external_v1(payload) continuous_output = Proxy::Dynflow::ContinuousOutput.new exit_code = nil payload['updates'].each do |update| time = Time.parse update['timestamp'] type = update['type'] case type when 'output' continuous_output.add_output(update['content'], update['stream'], time) when 'exit' exit_code = update['exit_code'].to_i else raise "Unexpected update type '#{update['type']}'" end end process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code)) end |
#process_pickup_timeout ⇒ Object
197 198 199 200 201 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 197 def process_pickup_timeout suspend unless [READY_FOR_PICKUP, NOTIFIED].include? output[:state] kill_run 'The job was not picked up in time' end |
#process_timeout ⇒ Object
111 112 113 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 111 def process_timeout kill_run "Execution timeout exceeded" end |
#run(event = nil) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 27 def run(event = nil) if event == JobDelivered output[:state] = DELIVERED suspend elsif event == PickupTimeout process_pickup_timeout elsif event == ::Dynflow::Action::Timeouts::Timeout process_timeout elsif event.nil? if (timeout = input['execution_timeout_interval']) schedule_timeout(timeout, optional: true) end super else super end rescue => e cleanup action_logger.error(e) process_update(Proxy::Dynflow::Runner::Update.encode_exception('Proxy error', e)) end |
#with_mqtt? ⇒ Boolean
203 204 205 |
# File 'lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb', line 203 def with_mqtt? ::Proxy::RemoteExecution::Ssh.with_mqtt? end |