Class: MessageBus::DistributedCache::Manager
- Inherits:
-
Object
- Object
- MessageBus::DistributedCache::Manager
- Defined in:
- lib/message_bus/distributed_cache.rb
Instance Attribute Summary collapse
-
#app_version ⇒ Object
Returns the value of attribute app_version.
Instance Method Summary collapse
- #clear(hash) ⇒ Object
- #delete(hash, key) ⇒ Object
- #ensure_subscribe! ⇒ Object
-
#initialize(message_bus = nil, publish_queue_in_memory: true) ⇒ Manager
constructor
A new instance of Manager.
- #process_message(message) ⇒ Object
- #publish(hash, message) ⇒ Object
- #register(hash) ⇒ Object
- #set(hash, key, value) ⇒ Object
- #subscribers ⇒ Object
Constructor Details
#initialize(message_bus = nil, publish_queue_in_memory: true) ⇒ Manager
Returns a new instance of Manager.
19 20 21 22 23 24 25 26 |
# File 'lib/message_bus/distributed_cache.rb', line 19 def initialize( = nil, publish_queue_in_memory: true) @subscribers = [] @subscribed = false @lock = Mutex.new @message_bus = || MessageBus @publish_queue_in_memory = publish_queue_in_memory @app_version = nil end |
Instance Attribute Details
#app_version ⇒ Object
Returns the value of attribute app_version.
17 18 19 |
# File 'lib/message_bus/distributed_cache.rb', line 17 def app_version @app_version end |
Instance Method Details
#clear(hash) ⇒ Object
99 100 101 |
# File 'lib/message_bus/distributed_cache.rb', line 99 def clear(hash) publish(hash, op: :clear) end |
#delete(hash, key) ⇒ Object
95 96 97 |
# File 'lib/message_bus/distributed_cache.rb', line 95 def delete(hash, key) publish(hash, op: :delete, key: key) end |
#ensure_subscribe! ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/message_bus/distributed_cache.rb', line 62 def ensure_subscribe! return if @subscribed @lock.synchronize do return if @subscribed @message_bus.subscribe(CHANNEL_NAME) do || @lock.synchronize do () end end @subscribed = true end end |
#process_message(message) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/message_bus/distributed_cache.rb', line 32 def () i = @subscribers.length - 1 payload = .data while i >= 0 begin current = @subscribers[i] next if payload["origin"] == current.identity next if current.key != payload["hash_key"] next if @app_version && payload["app_version"] != @app_version hash = current.hash(.site_id || DEFAULT_SITE_ID) case payload["op"] # TODO: consider custom marshal support with a restricted set when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] # rubocop:disable Security/MarshalLoad when "delete" then hash.delete(payload["key"]) when "clear" then hash.clear end rescue WeakRef::RefError @subscribers.delete_at(i) ensure i -= 1 end end end |
#publish(hash, message) ⇒ Object
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/message_bus/distributed_cache.rb', line 77 def publish(hash, ) [:origin] = hash.identity [:hash_key] = hash.key [:app_version] = @app_version if @app_version @message_bus.publish(CHANNEL_NAME, , user_ids: [-1], queue_in_memory: @publish_queue_in_memory ) end |
#register(hash) ⇒ Object
103 104 105 106 107 |
# File 'lib/message_bus/distributed_cache.rb', line 103 def register(hash) @lock.synchronize do @subscribers << WeakRef.new(hash) end end |
#set(hash, key, value) ⇒ Object
88 89 90 91 92 93 |
# File 'lib/message_bus/distributed_cache.rb', line 88 def set(hash, key, value) # special support for set marshal = (Set === value || Hash === value || Array === value) value = Base64.encode64(Marshal.dump(value)) if marshal publish(hash, op: :set, key: key, value: value, marshalled: marshal) end |
#subscribers ⇒ Object
28 29 30 |
# File 'lib/message_bus/distributed_cache.rb', line 28 def subscribers @subscribers end |