Class: ReliableMsg::MessageStore::Base
- Inherits:
-
Object
- Object
- ReliableMsg::MessageStore::Base
- Defined in:
- lib/reliable-msg/message-store.rb
Overview
Base class for message store.
Direct Known Subclasses
Constant Summary collapse
- ERROR_INVALID_MESSAGE_STORE =
:nodoc:
"No message store '%s' available (note: case is not important)"
- @@stores =
:nodoc:
{}
Class Method Summary collapse
-
.configure(config, logger) ⇒ Object
Returns a message store from the specified configuration (previously created with configure).
Instance Method Summary collapse
-
#activate ⇒ Object
Activates the message store.
-
#configuration ⇒ Object
Returns the message store configuration as a hash.
-
#deactivate ⇒ Object
Deactivates the message store.
- #get_headers(queue) ⇒ Object
- #get_last(topic, seen, &block) ⇒ Object
- #get_message(queue, &block) ⇒ Object
-
#initialize(logger) ⇒ Base
constructor
A new instance of Base.
-
#setup ⇒ Object
Set up the message store.
- #transaction(&block) ⇒ Object
-
#type ⇒ Object
Returns the message store type name.
Constructor Details
#initialize(logger) ⇒ Base
Returns a new instance of Base.
31 32 33 |
# File 'lib/reliable-msg/message-store.rb', line 31 def initialize logger @logger = logger end |
Class Method Details
.configure(config, logger) ⇒ Object
Returns a message store from the specified configuration (previously created with configure).
:call-seq:
Base::configure(config, logger) -> store
145 146 147 148 149 150 |
# File 'lib/reliable-msg/message-store.rb', line 145 def self.configure config, logger type = config["type"].downcase cls = @@stores[type] raise RuntimeError, format(ERROR_INVALID_MESSAGE_STORE, type) unless cls cls.new config, logger end |
Instance Method Details
#activate ⇒ Object
Activates the message store. Call this method before using the message store.
:call-seq:
store.activate
71 72 73 74 75 76 77 |
# File 'lib/reliable-msg/message-store.rb', line 71 def activate @mutex = Mutex.new @queues = {Queue::DLQ=>[]} @topics = {} @cache = {} # TODO: add recovery logic end |
#configuration ⇒ Object
Returns the message store configuration as a hash.
:call-seq:
store.configuration -> hash
60 61 62 |
# File 'lib/reliable-msg/message-store.rb', line 60 def configuration raise RuntimeException, "Not implemented" end |
#deactivate ⇒ Object
Deactivates the message store. Call this method when done using the message store.
:call-seq:
store.deactivate
86 87 88 |
# File 'lib/reliable-msg/message-store.rb', line 86 def deactivate @mutex = @queues = @topics = @cache = nil end |
#get_headers(queue) ⇒ Object
123 124 125 |
# File 'lib/reliable-msg/message-store.rb', line 123 def get_headers queue return @queues[queue] || [] end |
#get_last(topic, seen, &block) ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/reliable-msg/message-store.rb', line 128 def get_last topic, seen, &block headers = @topics[topic] return nil if headers.nil? || headers[:id] == seen if block.call(headers) id = headers[:id] = @cache[id] || load(id, :topic, topic) {:id=>id, :headers=>headers, :message=>} end end |
#get_message(queue, &block) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/reliable-msg/message-store.rb', line 109 def queue, &block = @queues[queue] return nil unless .each do |headers| if block.call(headers) id = headers[:id] = @cache[id] || load(id, :queue, queue) return {:id=>id, :headers=>headers, :message=>} end end return nil end |
#setup ⇒ Object
Set up the message store. Create files, database tables, etc.
:call-seq:
store.setup
51 52 |
# File 'lib/reliable-msg/message-store.rb', line 51 def setup end |
#transaction(&block) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/reliable-msg/message-store.rb', line 91 def transaction &block result = block.call inserts = [], deletes = [], dlqs= [] begin update inserts, deletes, dlqs unless inserts.empty? && deletes.empty? && dlqs.empty? rescue Exception=>error @logger.error error # If an error occurs, the queue may be out of synch with the store. # Empty the cache and reload the queue, before raising the error. @cache = {} @queues = {Queue::DLQ=>[]} @topics = {} load_index raise error end result end |
#type ⇒ Object
Returns the message store type name.
:call-seq:
store.type -> string
41 42 43 |
# File 'lib/reliable-msg/message-store.rb', line 41 def type raise RuntimeException, "Not implemented" end |