Class: OpenC3::Store

Inherits:
Object show all
Defined in:
lib/openc3/utilities/store_autoload.rb

Direct Known Subclasses

EphemeralStore

Constant Summary collapse

@@instance_mutex =

Mutex used to ensure that only one instance is created

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool_size = 10) ⇒ Store

Returns a new instance of Store.



97
98
99
100
101
102
# File 'lib/openc3/utilities/store_autoload.rb', line 97

def initialize(pool_size = 10)
  @redis_username = ENV['OPENC3_REDIS_USERNAME']
  @redis_key = ENV['OPENC3_REDIS_PASSWORD']
  @redis_url = "redis://#{ENV['OPENC3_REDIS_HOSTNAME']}:#{ENV['OPENC3_REDIS_PORT']}"
  @redis_pool = StoreConnectionPool.new(size: pool_size) { build_redis() }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown methods to redis through the @redis_pool



93
94
95
# File 'lib/openc3/utilities/store_autoload.rb', line 93

def method_missing(message, *args, **kwargs, &block)
  @redis_pool.with { |redis| redis.public_send(message, *args, **kwargs, &block) }
end

Instance Attribute Details

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



74
75
76
# File 'lib/openc3/utilities/store_autoload.rb', line 74

def redis_pool
  @redis_pool
end

#redis_urlObject (readonly)

Returns the value of attribute redis_url.



73
74
75
# File 'lib/openc3/utilities/store_autoload.rb', line 73

def redis_url
  @redis_url
end

Class Method Details

.instance(pool_size = 100) ⇒ Object

Get the singleton instance



77
78
79
80
81
82
83
84
85
# File 'lib/openc3/utilities/store_autoload.rb', line 77

def self.instance(pool_size = 100)
  # Logger.level = Logger::DEBUG
  return @instance if @instance

  @@instance_mutex.synchronize do
    @instance ||= self.new(pool_size)
    return @instance
  end
end

.method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown class methods to delegate to the instance



88
89
90
# File 'lib/openc3/utilities/store_autoload.rb', line 88

def self.method_missing(message, *args, **kwargs, &block)
  self.instance.public_send(message, *args, **kwargs, &block)
end

Instance Method Details

#build_redisObject



105
106
107
# File 'lib/openc3/utilities/store_autoload.rb', line 105

def build_redis
  return Redis.new(url: @redis_url, username: @redis_username, password: @redis_key)
end

#get_last_offset(topic) ⇒ Object



139
140
141
142
143
144
145
146
147
148
# File 'lib/openc3/utilities/store_autoload.rb', line 139

def get_last_offset(topic)
  @redis_pool.with do |redis|
    result = redis.xrevrange(topic, count: 1)
    if result and result[0] and result[0][0]
      result[0][0]
    else
      "0-0"
    end
  end
end

#get_newest_message(topic) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/openc3/utilities/store_autoload.rb', line 125

def get_newest_message(topic)
  @redis_pool.with do |redis|
    # Default in xrevrange is range end '+', start '-' which means get all
    # elements from higher ID to lower ID and since we're limiting to 1
    # we get the last element. See https://redis.io/commands/xrevrange.
    result = redis.xrevrange(topic, count: 1)
    if result and result.length > 0
      return result[0]
    else
      return nil
    end
  end
end

#get_oldest_message(topic) ⇒ Object

Stream APIs



114
115
116
117
118
119
120
121
122
123
# File 'lib/openc3/utilities/store_autoload.rb', line 114

def get_oldest_message(topic)
  @redis_pool.with do |redis|
    result = redis.xrange(topic, count: 1)
    if result and result.length > 0
      return result[0]
    else
      return nil
    end
  end
end

#read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/openc3/utilities/store_autoload.rb', line 171

def read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil)
  return {} if topics.empty?
  Thread.current[:topic_offsets] ||= {}
  topic_offsets = Thread.current[:topic_offsets]
  begin
    # Logger.debug "read_topics: #{topics}, #{offsets} pool:#{@redis_pool}"
    @redis_pool.with do |redis|
      offsets = update_topic_offsets(topics) unless offsets
      result = redis.xread(topics, offsets, block: timeout_ms, count: count)
      if result and result.length > 0
        result.each do |topic, messages|
          messages.each do |msg_id, msg_hash|
            topic_offsets[topic] = msg_id
            yield topic, msg_id, msg_hash, redis if block_given?
          end
        end
      end
      # Logger.debug "result:#{result}" if result and result.length > 0
      return result
    end
  rescue Redis::TimeoutError
    return {} # Should return an empty hash not array - xread returns a hash
  end
end

#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer

Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim

Examples:

Without options

store.trim_topic('MANGO__TOPIC', 1000)

With options

store.trim_topic('MANGO__TOPIC', 1000, approximate: 'true', limit: 0)

Parameters:

  • topic (String)

    the stream key

  • minid (Integer)

    Id to throw away data up to

  • approximate (Boolean) (defaults to: true)

    whether to add '~` modifier of maxlen or not

  • limit (Boolean) (defaults to: 0)

    number of items to return from the call

Returns:

  • (Integer)

    the number of entries actually deleted



235
236
237
238
239
# File 'lib/openc3/utilities/store_autoload.rb', line 235

def trim_topic(topic, minid, approximate = true, limit: 0)
  @redis_pool.with do |redis|
    return redis.xtrim_minid(topic, minid, approximate: approximate, limit: limit)
  end
end

#update_topic_offsets(topics) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/openc3/utilities/store_autoload.rb', line 150

def update_topic_offsets(topics)
  offsets = []
  topics.each do |topic|
    # Normally we will just be grabbing the topic offset
    # this allows xread to get everything past this point
    Thread.current[:topic_offsets] ||= {}
    topic_offsets = Thread.current[:topic_offsets]
    last_id = topic_offsets[topic]
    if last_id
      offsets << last_id
    else
      # If there is no topic offset this is the first call.
      # Get the last offset ID so we'll start getting everything from now on
      offsets << get_last_offset(topic)
      topic_offsets[topic] = offsets[-1]
    end
  end
  return offsets
end

#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true') ⇒ String

Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd

Examples:

Without options

store.write_topic('MANGO__TOPIC', {'message' => 'something'})

With options

store.write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: 'true')

Parameters:

  • topic (String)

    the stream / topic

  • msg_hash (Hash)

    one or multiple field-value pairs

  • opts (Hash)

    a customizable set of options

Returns:



214
215
216
217
218
219
# File 'lib/openc3/utilities/store_autoload.rb', line 214

def write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true')
  id = '*' if id.nil?
  @redis_pool.with do |redis|
    return redis.xadd(topic, msg_hash, id: id, maxlen: maxlen, approximate: approximate)
  end
end