Class: OpenC3::TriggerGroupWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/microservices/trigger_group_microservice.rb

Overview

The TriggerGroupWorker is a very simple thread pool worker. Once the trigger manager has pushed a packet to the queue one of these workers will evaluate the triggers for that packet.

Constant Summary collapse

TYPE =
'type'.freeze
ITEM_TARGET =
'target'.freeze
ITEM_PACKET =
'packet'.freeze
ITEM_TYPE =
'item'.freeze
ITEM_VALUE_TYPE =
'valueType'.freeze
FLOAT_TYPE =
'float'.freeze
STRING_TYPE =
'string'.freeze
REGEX_TYPE =
'regex'.freeze
LIMIT_TYPE =
'limit'.freeze
TRIGGER_TYPE =
'trigger'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, logger:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker

Returns a new instance of TriggerGroupWorker.



266
267
268
269
270
271
272
273
274
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 266

def initialize(name:, logger:, scope:, group:, queue:, share:, ident:)
  @name = name
  @logger = logger
  @scope = scope
  @group = group
  @queue = queue
  @share = share
  @ident = ident
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



264
265
266
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 264

def group
  @group
end

#nameObject (readonly)

Returns the value of attribute name.



264
265
266
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 264

def name
  @name
end

#packetObject (readonly)

Returns the value of attribute packet.



264
265
266
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 264

def packet
  @packet
end

#scopeObject (readonly)

Returns the value of attribute scope.



264
265
266
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 264

def scope
  @scope
end

#targetObject (readonly)

Returns the value of attribute target.



264
265
266
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 264

def target
  @target
end

Instance Method Details

#evaluate(name:, left:, operator:, right:) ⇒ Object

the base evaluate method used by evaluate_trigger

-1 (the value is considered an error used to disable the trigger)
 0 (the value is considered as a false value)
 1 (the value is considered as a true value)


396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 396

def evaluate(name:, left:, operator:, right:)
  @logger.debug "TriggerGroupWorker-#{@ident} evaluate: (#{left}(#{left.class}) #{operator} #{right}(#{right.class}))"
  begin
    case operator
    when '>'
      return left > right ? 1 : 0
    when '<'
      return left < right ? 1 : 0
    when '>='
      return left >= right ? 1 : 0
    when '<='
      return left <= right ? 1 : 0
    when '!=', 'CHANGES'
      return left != right ? 1 : 0
    when '==', 'DOES NOT CHANGE'
      return left == right ? 1 : 0
    when '!~'
      return left !~ right ? 1 : 0
    when '=~'
      return left =~ right ? 1 : 0
    when 'AND'
      return left && right ? 1 : 0
    when 'OR'
      return left || right ? 1 : 0
    end
  rescue ArgumentError
    message = "invalid evaluate: (#{left} #{operator} #{right})"
    notify(name: name, severity: 'error', message: message)
    return -1
  end
end

#evaluate_data_packet(topic:) ⇒ Object

Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 308

def evaluate_data_packet(topic:)
  visited = Hash.new
  @logger.debug "TriggerGroupWorker-#{@ident} topic: #{topic}"
  @share.trigger_base.get_triggers(topic: topic).each do |trigger|
    @logger.debug "TriggerGroupWorker-#{@ident} eval head: #{trigger}"
    value = evaluate_trigger(
      head: trigger,
      trigger: trigger,
      visited: visited,
      triggers: @share.trigger_base.enabled_triggers
    )
    @logger.debug "TriggerGroupWorker-#{@ident} trigger: #{trigger} value: #{value}"
    # value MUST be -1, 0, or 1
    @share.trigger_base.update_state(name: trigger.name, value: value)
  end
end

#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object

This could be confusing… So this is a recursive method for the TriggerGroupWorkers to call. It will use the trigger name and append a __P for path or __R for result. The Path is a Hash that contains a key for each node traveled to get results. When the result has been found it will be stored in the result key __R in the visited Hash and eval_trigger will return a number.

-1 (the value is considered an error used to disable the trigger)
 0 (the value is considered as a false value)
 1 (the value is considered as a true value)

IF an operand is evaluated as nil it will log an error and return -1 IF a loop is detected it will log an error and return -1



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 440

def evaluate_trigger(head:, trigger:, visited:, triggers:)
  if visited["#{trigger.name}__R"]
    return visited["#{trigger.name}__R"]
  end
  if visited["#{trigger.name}__P"].nil?
    visited["#{trigger.name}__P"] = Hash.new
  end
  if visited["#{head.name}__P"][trigger.name]
    # Not sure if this is possible as on create it validates that the dependents are already created
    message = "loop detected from #{head.name} -> #{trigger.name} path: #{visited["#{head.name}__P"]}"
    notify(name: trigger.name, severity: 'error', message: message)
    return visited["#{trigger.name}__R"] = -1
  end
  trigger.roots.each do | root_trigger_name |
    next if visited["#{root_trigger_name}__R"]
    root_trigger = triggers[root_trigger_name]
    if head.name == root_trigger.name
      message = "loop detected from #{head.name} -> #{root_trigger_name} path: #{visited["#{head.name}__P"]}"
      notify(name: trigger.name, severity: 'error', message: message)
      return visited["#{trigger.name}__R"] = -1
    end
    result = evaluate_trigger(
      head: head,
      trigger: root_trigger,
      visited: visited,
      triggers: triggers
    )
    @logger.debug "TriggerGroupWorker-#{@ident} #{root_trigger.name} result: #{result}"
    visited["#{root_trigger.name}__R"] = visited["#{head.name}__P"][root_trigger.name] = result
  end
  begin
    left = operand_value(operand: trigger.left, other: trigger.right, visited: visited)
    if trigger.operator.include?('CHANGE')
      right = operand_value(operand: trigger.left, other: trigger.right, visited: visited, previous: true)
    else
      right = operand_value(operand: trigger.right, other: trigger.left, visited: visited)
    end
  rescue => e
    # This will primarily happen when the user inputs a bad Regexp
    notify(name: trigger.name, severity: 'error', message: e.message)
    return visited["#{trigger.name}__R"] = -1
  end
  # Convert the standard '==' and '!=' into Ruby Regexp operators
  operator = trigger.operator
  if right and right.is_a? Regexp
    operator = '=~' if operator == '=='
    operator = '!~' if operator == '!='
  end
  if left.nil? || right.nil?
    return visited["#{trigger.name}__R"] = 0
  end
  result = evaluate(name: trigger.name,left: left, operator: operator, right: right)
  return visited["#{trigger.name}__R"] = result
end

#get_packet_limit(operand:, other:) ⇒ Object

extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN



329
330
331
332
333
334
335
336
337
338
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 329

def get_packet_limit(operand:, other:)
  packet = @share.packet_base.packet(
    target: operand[ITEM_TARGET],
    packet: operand[ITEM_PACKET]
  )
  return nil if packet.nil?
  _, limit = packet.read_with_limits_state(operand[ITEM_TYPE], operand[ITEM_VALUE_TYPE].intern)
  # Convert limit symbol to string since we'll be comparing with strings
  return limit.to_s
end

#get_packet_value(operand:, previous:) ⇒ Object

extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 342

def get_packet_value(operand:, previous:)
  if previous
    packet = @share.packet_base.previous_packet(
      target: operand[ITEM_TARGET],
      packet: operand[ITEM_PACKET]
    )
    # Previous might not be populated ... that's ok just return nil
    return nil unless packet
  else
    packet = @share.packet_base.packet(
      target: operand[ITEM_TARGET],
      packet: operand[ITEM_PACKET]
    )
  end
  # This shouldn't happen because the frontend provides valid items but good to check
  # The raise is ultimately rescued inside evaluate_trigger when operand_value is called
  if packet.nil?
    raise "Packet #{operand[ITEM_TARGET]} #{operand[ITEM_PACKET]} not found"
  end
  value = packet.read(operand[ITEM_TYPE], operand[ITEM_VALUE_TYPE].intern)
  if value.nil?
    raise "Item #{operand[ITEM_TARGET]} #{operand[ITEM_PACKET]} #{operand[ITEM_TYPE]} not found"
  end
  value
end

#notify(name:, severity:, message:) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 276

def notify(name:, severity:, message:)
  data = {}
  # All AutonomicTopic notifications must have 'name' and 'updated_at' in the data
  data['name'] = name
  data['updated_at'] = Time.now.to_nsec_from_epoch
  data['severity'] = severity
  data['message'] = message
  notification = {
    'kind' => 'error',
    'type' => 'trigger',
    'data' => JSON.generate(data),
  }
  AutonomicTopic.write_notification(notification, scope: @scope)
  @logger.public_send(severity.intern, message)
end

#operand_value(operand:, other:, visited:, previous: false) ⇒ Object

extract the value of the operand from the packet



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 369

def operand_value(operand:, other:, visited:, previous: false)
  if operand[TYPE] == ITEM_TYPE && other && other[TYPE] == LIMIT_TYPE
    return get_packet_limit(operand: operand, other: other)
  elsif operand[TYPE] == ITEM_TYPE
    return get_packet_value(operand: operand, previous: previous)
  elsif operand[TYPE] == TRIGGER_TYPE
    return visited["#{operand[TRIGGER_TYPE]}__R"] == 1
  elsif operand[TYPE] == FLOAT_TYPE
    return operand[operand[TYPE]].to_f
  elsif operand[TYPE] == STRING_TYPE
    return operand[operand[TYPE]].to_s
  elsif operand[TYPE] == REGEX_TYPE
    # This can potentially throw an exception on badly formatted Regexp
    return Regexp.new(operand[operand[TYPE]])
  elsif operand[TYPE] == LIMIT_TYPE
    return operand[operand[TYPE]]
  else
    # This is a logic error ... should never get here
    raise "Unknown operand type: #{operand}"
  end
end

#runObject



292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 292

def run
  @logger.info "TriggerGroupWorker-#{@ident} running"
  loop do
    topic = @queue.pop
    break if topic.nil?
    begin
      evaluate_data_packet(topic: topic)
    rescue StandardError => e
      @logger.error "TriggerGroupWorker-#{@ident} failed to evaluate data packet from topic: #{topic}\n#{e.formatted}"
    end
  end
  @logger.info "TriggerGroupWorker-#{@ident} exiting"
end