Class: OpenC3::StoreQueued
- Defined in:
- lib/openc3/utilities/store_queued.rb
Direct Known Subclasses
Defined Under Namespace
Classes: MessageStruct
Constant Summary collapse
- @@instance_mutex =
Mutex used to ensure that only one instance is created
Mutex.new
Class Method Summary collapse
-
.instance(update_interval = 1) ⇒ Object
Get the singleton instance Sets the update interval to 1 second by default.
-
.method_missing(message, *args, **kwargs) ⇒ Object
Delegate all unknown class methods to delegate to the instance.
Instance Method Summary collapse
- #graceful_kill ⇒ Object
-
#initialize(update_interval) ⇒ StoreQueued
constructor
A new instance of StoreQueued.
-
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Record the message for pipelining by the thread.
- #process_queue ⇒ Object
- #set_update_interval(interval) ⇒ Object
- #shutdown ⇒ Object
-
#store_instance ⇒ Object
Returns the store we’re working with.
- #store_thread_body ⇒ Object
Constructor Details
#initialize(update_interval) ⇒ StoreQueued
Returns a new instance of StoreQueued.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/openc3/utilities/store_queued.rb', line 45 def initialize(update_interval) @update_interval = update_interval @store = store_instance() # Queue to hold the store requests @store_queue = Queue.new # Sleeper used to delay update thread @update_sleeper = Sleeper.new at_exit() do shutdown() end # Thread used to call methods on the store @update_thread = OpenC3.safe_thread(self.class.to_s) do store_thread_body() end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Record the message for pipelining by the thread
106 107 108 |
# File 'lib/openc3/utilities/store_queued.rb', line 106 def method_missing(, *args, **kwargs, &block) @store_queue.push(MessageStruct.new(, args, kwargs, block)) end |
Class Method Details
.instance(update_interval = 1) ⇒ Object
Get the singleton instance Sets the update interval to 1 second by default
31 32 33 34 35 36 37 38 |
# File 'lib/openc3/utilities/store_queued.rb', line 31 def self.instance(update_interval = 1) # seconds return @instance if @instance @@instance_mutex.synchronize do @instance ||= self.new(update_interval) return @instance end end |
.method_missing(message, *args, **kwargs) ⇒ Object
Delegate all unknown class methods to delegate to the instance
41 42 43 |
# File 'lib/openc3/utilities/store_queued.rb', line 41 def self.method_missing(, *args, **kwargs, &) self.instance.public_send(, *args, **kwargs, &) end |
Instance Method Details
#graceful_kill ⇒ Object
115 116 117 |
# File 'lib/openc3/utilities/store_queued.rb', line 115 def graceful_kill # Do nothing end |
#process_queue ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/openc3/utilities/store_queued.rb', line 69 def process_queue unless @store_queue.empty? # Pipeline the requests to redis to improve performance @store.redis_pool.pipelined do while !@store_queue.empty? action = @store_queue.pop() @store.public_send(action., *action.args, **action.kwargs, &action.block) end end end end |
#set_update_interval(interval) ⇒ Object
63 64 65 66 67 |
# File 'lib/openc3/utilities/store_queued.rb', line 63 def set_update_interval(interval) if interval < @update_interval and interval > 0.0 @update_interval = interval end end |
#shutdown ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/openc3/utilities/store_queued.rb', line 95 def shutdown @update_sleeper.cancel if @update_sleeper OpenC3.kill_thread(self, @update_thread) if @update_thread @update_thread = nil # Drain the queue before shutdown process_queue() end |
#store_instance ⇒ Object
Returns the store we’re working with
111 112 113 |
# File 'lib/openc3/utilities/store_queued.rb', line 111 def store_instance Store.instance end |
#store_thread_body ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/openc3/utilities/store_queued.rb', line 81 def store_thread_body while true start_time = Time.now process_queue() # Only check whether to update at a set interval run_time = Time.now - start_time sleep_time = @update_interval - run_time sleep_time = 0 if sleep_time < 0 break if @update_sleeper.sleep(sleep_time) end end |