Class: OpenC3::StoreQueued

Inherits:
Object show all
Defined in:
lib/openc3/utilities/store_queued.rb

Direct Known Subclasses

EphemeralStoreQueued

Defined Under Namespace

Classes: MessageStruct

Constant Summary collapse

@@instance_mutex =

Mutex used to ensure that only one instance is created

Mutex.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(update_interval) ⇒ StoreQueued

Returns a new instance of StoreQueued.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/openc3/utilities/store_queued.rb', line 45

def initialize(update_interval)
  @update_interval = update_interval
  @store = store_instance()
  # Queue to hold the store requests
  @store_queue = Queue.new
  # Sleeper used to delay update thread
  @update_sleeper = Sleeper.new

  at_exit() do
    shutdown()
  end

  # Thread used to call methods on the store
  @update_thread = OpenC3.safe_thread(self.class.to_s) do
    store_thread_body()
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(message, *args, **kwargs, &block) ⇒ Object

Record the message for pipelining by the thread



106
107
108
# File 'lib/openc3/utilities/store_queued.rb', line 106

def method_missing(message, *args, **kwargs, &block)
  @store_queue.push(MessageStruct.new(message, args, kwargs, block))
end

Class Method Details

.instance(update_interval = 1) ⇒ Object

Get the singleton instance Sets the update interval to 1 second by default



31
32
33
34
35
36
37
38
# File 'lib/openc3/utilities/store_queued.rb', line 31

def self.instance(update_interval = 1) # seconds
  return @instance if @instance

  @@instance_mutex.synchronize do
    @instance ||= self.new(update_interval)
    return @instance
  end
end

.method_missing(message, *args, **kwargs) ⇒ Object

Delegate all unknown class methods to delegate to the instance



41
42
43
# File 'lib/openc3/utilities/store_queued.rb', line 41

def self.method_missing(message, *args, **kwargs, &)
  self.instance.public_send(message, *args, **kwargs, &)
end

Instance Method Details

#graceful_killObject



115
116
117
# File 'lib/openc3/utilities/store_queued.rb', line 115

def graceful_kill
  # Do nothing
end

#process_queueObject



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/openc3/utilities/store_queued.rb', line 69

def process_queue
  unless @store_queue.empty?
    # Pipeline the requests to redis to improve performance
    @store.redis_pool.pipelined do
      while !@store_queue.empty?
        action = @store_queue.pop()
        @store.public_send(action.message, *action.args, **action.kwargs, &action.block)
      end
    end
  end
end

#set_update_interval(interval) ⇒ Object



63
64
65
66
67
# File 'lib/openc3/utilities/store_queued.rb', line 63

def set_update_interval(interval)
  if interval < @update_interval and interval > 0.0
    @update_interval = interval
  end
end

#shutdownObject



95
96
97
98
99
100
101
# File 'lib/openc3/utilities/store_queued.rb', line 95

def shutdown
  @update_sleeper.cancel if @update_sleeper
  OpenC3.kill_thread(self, @update_thread) if @update_thread
  @update_thread = nil
  # Drain the queue before shutdown
  process_queue()
end

#store_instanceObject

Returns the store we’re working with



111
112
113
# File 'lib/openc3/utilities/store_queued.rb', line 111

def store_instance
  Store.instance
end

#store_thread_bodyObject



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/openc3/utilities/store_queued.rb', line 81

def store_thread_body
  while true
    start_time = Time.now

    process_queue()

    # Only check whether to update at a set interval
    run_time = Time.now - start_time
    sleep_time = @update_interval - run_time
    sleep_time = 0 if sleep_time < 0
    break if @update_sleeper.sleep(sleep_time)
  end
end