Class: OpenC3::ScopeCleanupMicroservice

Inherits:
CleanupMicroservice show all
Defined in:
lib/openc3/microservices/scope_cleanup_microservice.rb

Constant Summary collapse

TSDB_HEALTH_QUERY =
"SELECT
table_name,
table_row_count,
wal_pending_row_count,
CASE
    WHEN table_suspended THEN 'SUSPENDED'
    WHEN table_memory_pressure_level = 2 THEN 'BACKOFF'
    WHEN table_memory_pressure_level = 1 THEN 'PRESSURE'
    ELSE 'OK'
END AS status,
wal_txn - table_txn AS lag_txns,
table_write_amp_p50 AS write_amp,
table_merge_rate_p99 AS slowest_merge
FROM tables()
WHERE walEnabled
ORDER BY
table_suspended DESC,
table_memory_pressure_level DESC,
wal_pending_row_count DESC;"
GROWTH_NUM_SAMPLE_PERIODS =
4

Instance Attribute Summary

Attributes inherited from Microservice

#count, #custom, #error, #logger, #microservice_status_thread, #name, #scope, #secrets, #state

Instance Method Summary collapse

Methods inherited from CleanupMicroservice

#run, #shutdown

Methods inherited from Microservice

#as_json, #microservice_cmd, run, #run, #setup_microservice_topic, #shutdown

Constructor Details

#initialize(*args) ⇒ ScopeCleanupMicroservice

Returns a new instance of ScopeCleanupMicroservice.



42
43
44
45
46
47
48
49
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 42

def initialize(*args)
  super(*args)
  @run_time = nil
  @cleanup_poll_time = nil
  @delta_time = 0.0
  @wal_pending_row_count = {}
  @lag_txns = {}
end

Instance Method Details

#cleanup(areas, bucket) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 51

def cleanup(areas, bucket)
  current_time = Time.now
  if @run_time
    delta = current_time - @run_time
    if delta > 0.0
      @delta_time += delta
    end
  end
  @run_time = current_time
  if @delta_time > @cleanup_poll_time
    @delta_time = 0.0
    super(areas, bucket)
  end

  # Always check TSDB health
  if @scope == 'DEFAULT'
    begin
      conn = OpenC3::QuestDBClient.connection
      result = conn.exec(TSDB_HEALTH_QUERY)
      columns = result.fields
      rows = result.values

      table_name_column = columns.index("table_name")
      wal_pending_row_count_column = columns.index("wal_pending_row_count")
      status_column = columns.index("status")
      lag_txns_column = columns.index("lag_txns")

      rows.each do |values|
        table_name = values[table_name_column]
        wal_pending_row_count = values[wal_pending_row_count_column].to_i
        status = values[status_column]
        lag_txns = values[lag_txns_column].to_i

        if status != 'OK'
          @logger.error("QuestDB: #{table_name} in bad state: #{status}")

          if status == 'SUSPENDED'
            # Try to automatically unsuspend
            @logger.info("QuestDB: Attempting to unsuspend: #{table_name}")
            conn.exec("ALTER TABLE #{table_name} RESUME WAL;")
          end
        end

        @wal_pending_row_count[table_name] ||= []
        @wal_pending_row_count[table_name] << wal_pending_row_count
        @lag_txns[table_name] ||= []
        @lag_txns[table_name] << lag_txns

        if @wal_pending_row_count[table_name].length > GROWTH_NUM_SAMPLE_PERIODS
          if detect_growth(@wal_pending_row_count[table_name], GROWTH_NUM_SAMPLE_PERIODS)
            # Crossed threshold of sample periods of growth
            @logger.error("QuestDB: #{table_name} has growing wal_pending_row_count: #{wal_pending_row_count}")
          end

          # Leave the last GROWTH_NUM_SAMPLE_PERIODS samples
          @wal_pending_row_count[table_name] = @wal_pending_row_count[table_name][-GROWTH_NUM_SAMPLE_PERIODS..-1]
        end

        if @lag_txns[table_name].length > GROWTH_NUM_SAMPLE_PERIODS
          if detect_growth(@lag_txns[table_name], GROWTH_NUM_SAMPLE_PERIODS)
            # Crossed threshold of sample periods of growth
            @logger.error("QuestDB: #{table_name} has growing lag_txns: #{lag_txns}")
          end

          # Leave the last GROWTH_NUM_SAMPLE_PERIODS samples
          @lag_txns[table_name] = @lag_txns[table_name][-GROWTH_NUM_SAMPLE_PERIODS..-1]
        end
      end
    rescue => e
      OpenC3::QuestDBClient.disconnect
      @logger.error("QuestDB Error: #{e.formatted}")
    end
  end
end

#detect_growth(array, num_samples) ⇒ Object



126
127
128
129
130
131
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 126

def detect_growth(array, num_samples)
  num_samples.times do |index|
    return false if array[index + 1] <= array[index]
  end
  return true
end

#get_areas_and_poll_timeObject



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 133

def get_areas_and_poll_time
  scope = ScopeModel.get_model(name: @scope)
  areas = [
    ["#{@scope}/text_logs/openc3_log_messages", scope.text_log_retain_time],
    ["#{@scope}/tool_logs/sr", scope.tool_log_retain_time],
  ]

  if @scope == 'DEFAULT'
    areas << ["NOSCOPE/text_logs/openc3_log_messages", scope.text_log_retain_time]
    areas << ["NOSCOPE/tool_logs/sr", scope.tool_log_retain_time]
  end

  @cleanup_poll_time = scope.cleanup_poll_time
  return areas, 60 # Run every 1 minute for TSDB checks
end