70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
# File 'lib/openc3/microservices/interface_microservice.rb', line 70
def run
InterfaceTopic.receive_commands(@interface, scope: @scope) do |topic, msg_id, msg_hash, _redis|
OpenC3.with_context(msg_hash) do
msgid_seconds_from_epoch = msg_id.split('-')[0].to_i / 1000.0
delta = Time.now.to_f - msgid_seconds_from_epoch
@metric.set(name: 'interface_topic_delta_seconds', value: delta, type: 'gauge', unit: 'seconds', help: 'Delta time between data written to stream and interface cmd start') if @metric
if topic =~ /CMD}INTERFACE/
@directive_count += 1
@metric.set(name: 'interface_directive_total', value: @directive_count, type: 'counter') if @metric
if msg_hash['shutdown']
@logger.info "#{@interface.name}: Shutdown requested"
return
end
if msg_hash['connect']
@logger.info "#{@interface.name}: Connect requested"
params = []
if msg_hash['params']
params = JSON.parse(msg_hash['params'], :allow_nan => true, :create_additions => true)
end
@interface = @tlm.attempting(*params)
next 'SUCCESS'
end
if msg_hash['disconnect']
@logger.info "#{@interface.name}: Disconnect requested"
@tlm.disconnect(false)
next 'SUCCESS'
end
if msg_hash['raw']
if @interface.connected?
@logger.info "#{@interface.name}: Write raw"
command = System.commands.packet('UNKNOWN', 'UNKNOWN')
command.received_count += 1
command = command.clone
command.buffer = msg_hash['raw']
command.received_time = Time.now
CommandTopic.write_packet(command, scope: @scope)
@interface.write_raw(msg_hash['raw'])
next 'SUCCESS'
else
next "Interface not connected: #{@interface.name}"
end
end
if msg_hash.key?('log_stream')
if msg_hash['log_stream'] == 'true'
@logger.info "#{@interface.name}: Enable stream logging"
@interface.start_raw_logging
else
@logger.info "#{@interface.name}: Disable stream logging"
@interface.stop_raw_logging
end
next 'SUCCESS'
end
if msg_hash.key?('interface_cmd')
params = JSON.parse(msg_hash['interface_cmd'], allow_nan: true, create_additions: true)
begin
@logger.info "#{@interface.name}: interface_cmd: #{params['cmd_name']} #{params['cmd_params'].join(' ')}"
@interface.interface_cmd(params['cmd_name'], *params['cmd_params'])
InterfaceStatusModel.set(@interface.as_json(:allow_nan => true), queued: true, scope: @scope)
rescue => e
@logger.error "#{@interface.name}: interface_cmd: #{e.formatted}"
next e.message
end
next 'SUCCESS'
end
if msg_hash.key?('protocol_cmd')
params = JSON.parse(msg_hash['protocol_cmd'], allow_nan: true, create_additions: true)
begin
@logger.info "#{@interface.name}: protocol_cmd: #{params['cmd_name']} #{params['cmd_params'].join(' ')} read_write: #{params['read_write']} index: #{params['index']}"
@interface.protocol_cmd(params['cmd_name'], *params['cmd_params'], read_write: params['read_write'], index: params['index'])
InterfaceStatusModel.set(@interface.as_json(:allow_nan => true), queued: true, scope: @scope)
rescue => e
@logger.error "#{@interface.name}: protocol_cmd: #{e.formatted}"
next e.message
end
next 'SUCCESS'
end
if msg_hash.key?('inject_tlm')
handle_inject_tlm(msg_hash['inject_tlm'])
next 'SUCCESS'
end
end
target_name = msg_hash['target_name']
cmd_name = msg_hash['cmd_name']
cmd_params = nil
cmd_buffer = nil
hazardous_check = nil
if msg_hash['cmd_params']
cmd_params = JSON.parse(msg_hash['cmd_params'], :allow_nan => true, :create_additions => true)
range_check = ConfigParser.handle_true_false(msg_hash['range_check'])
raw = ConfigParser.handle_true_false(msg_hash['raw'])
hazardous_check = ConfigParser.handle_true_false(msg_hash['hazardous_check'])
elsif msg_hash['cmd_buffer']
cmd_buffer = msg_hash['cmd_buffer']
end
begin
begin
if cmd_params
command = System.commands.build_cmd(target_name, cmd_name, cmd_params, range_check, raw)
elsif cmd_buffer
if target_name
command = System.commands.identify(cmd_buffer, [target_name])
else
command = System.commands.identify(cmd_buffer, @interface.cmd_target_names)
end
unless command
command = System.commands.packet('UNKNOWN', 'UNKNOWN')
command.received_count += 1
command = command.clone
command.buffer = cmd_buffer
end
else
raise "Invalid command received:\n #{msg_hash}"
end
command.received_time = Time.now
rescue => e
@logger.error "#{@interface.name}: #{msg_hash}"
@logger.error "#{@interface.name}: #{e.formatted}"
next e.message
end
command. ||= {}
command.['cmd_string'] = msg_hash['cmd_string']
command.['username'] = msg_hash['username']
if hazardous_check
hazardous, hazardous_description = System.commands.cmd_pkt_hazardous?(command)
next "HazardousError\n#{hazardous_description}\n#{msg_hash['cmd_string']}" if hazardous
end
validate = ConfigParser.handle_true_false(msg_hash['validate'])
begin
if @interface.connected?
result = true
reason = nil
if command.validator and validate
begin
result, reason = command.validator.pre_check(command)
rescue => e
result = false
reason = e.message
end
if result == false
message = "pre_check returned false for #{msg_hash['cmd_string']} due to #{reason}"
raise WriteRejectError.new(message)
end
end
@count += 1
@metric.set(name: 'interface_cmd_total', value: @count, type: 'counter') if @metric
@interface.write(command)
if command.validator and validate
begin
result, reason = command.validator.post_check(command)
rescue => e
result = false
reason = e.message
end
command.['cmd_success'] = result
command.['cmd_reason'] = reason if reason
end
CommandDecomTopic.write_packet(command, scope: @scope)
CommandTopic.write_packet(command, scope: @scope)
InterfaceStatusModel.set(@interface.as_json(:allow_nan => true), queued: true, scope: @scope)
if result == false
message = "post_check returned false for #{msg_hash['cmd_string']} due to #{reason}"
raise WriteRejectError.new(message)
end
next 'SUCCESS'
else
next "Interface not connected: #{@interface.name}"
end
rescue WriteRejectError => e
next e.message
rescue => e
@logger.error "#{@interface.name}: #{e.formatted}"
next e.message
end
rescue => e
@logger.error "#{@interface.name}: #{e.formatted}"
next e.message
end
end
end
end
|