Class: StarlingServer::PersistentQueue
- Inherits:
-
Queue
- Object
- Queue
- StarlingServer::PersistentQueue
- Defined in:
- lib/starling/persistent_queue.rb
Overview
PersistentQueue is a subclass of Ruby’s thread-safe Queue class. It adds a transactional log to the in-memory Queue, which enables quickly rebuilding the Queue in the event of a sever outage.
Constant Summary collapse
- SOFT_LOG_MAX_SIZE =
When a log reaches the SOFT_LOG_MAX_SIZE, the Queue will wait until it is empty, and will then rotate the log file.
16 * (1024**2)
- TRX_CMD_PUSH =
"\000".freeze
- TRX_CMD_POP =
"\001".freeze
- TRX_PUSH =
"\000%s%s".freeze
- TRX_POP =
"\001".freeze
Instance Attribute Summary collapse
-
#current_age ⇒ Object
readonly
Returns the value of attribute current_age.
-
#initial_bytes ⇒ Object
readonly
Returns the value of attribute initial_bytes.
-
#logsize ⇒ Object
readonly
Returns the value of attribute logsize.
-
#total_items ⇒ Object
readonly
Returns the value of attribute total_items.
Instance Method Summary collapse
-
#close ⇒ Object
Safely closes the transactional queue.
-
#initialize(persistence_path, queue_name, debug = false) ⇒ PersistentQueue
constructor
Create a new PersistentQueue at
persistence_path
/queue_name
. -
#pop(log_trx = true) ⇒ Object
Retrieves data from the queue.
- #purge ⇒ Object
-
#push(value, log_trx = true) ⇒ Object
Pushes
value
to the queue.
Constructor Details
#initialize(persistence_path, queue_name, debug = false) ⇒ PersistentQueue
Create a new PersistentQueue at persistence_path
/queue_name
. If a queue log exists at that path, the Queue will be loaded from disk before being available for use.
32 33 34 35 36 37 38 39 |
# File 'lib/starling/persistent_queue.rb', line 32 def initialize(persistence_path, queue_name, debug = false) @persistence_path = persistence_path @queue_name = queue_name @total_items = 0 super() @initial_bytes = replay_transaction_log(debug) @current_age = 0 end |
Instance Attribute Details
#current_age ⇒ Object (readonly)
Returns the value of attribute current_age.
25 26 27 |
# File 'lib/starling/persistent_queue.rb', line 25 def current_age @current_age end |
#initial_bytes ⇒ Object (readonly)
Returns the value of attribute initial_bytes.
22 23 24 |
# File 'lib/starling/persistent_queue.rb', line 22 def initial_bytes @initial_bytes end |
#logsize ⇒ Object (readonly)
Returns the value of attribute logsize.
24 25 26 |
# File 'lib/starling/persistent_queue.rb', line 24 def logsize @logsize end |
#total_items ⇒ Object (readonly)
Returns the value of attribute total_items.
23 24 25 |
# File 'lib/starling/persistent_queue.rb', line 23 def total_items @total_items end |
Instance Method Details
#close ⇒ Object
Safely closes the transactional queue.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/starling/persistent_queue.rb', line 76 def close # Ok, yeah, this is lame, and is *technically* a race condition. HOWEVER, # the QueueCollection *should* have stopped processing requests, and I don't # want to add yet another Mutex around all the push and pop methods. So we # do the next simplest thing, and minimize the time we'll stick around before # @trx is nil. @not_trx = @trx @trx = nil @not_trx.close end |
#pop(log_trx = true) ⇒ Object
Retrieves data from the queue.
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/starling/persistent_queue.rb', line 59 def pop(log_trx = true) raise NoTransactionLog if log_trx && !@trx begin rv = super(!log_trx) rescue ThreadError puts "WARNING: The queue was empty when trying to pop(). Technically this shouldn't ever happen. Probably a bug in the transactional underpinnings. Or maybe shutdown didn't happen cleanly at some point. Ignoring." rv = [now_usec, ''] end transaction "\001" if log_trx @current_age = (now_usec - rv[0]) / 1000 rv[1] end |
#purge ⇒ Object
87 88 89 90 |
# File 'lib/starling/persistent_queue.rb', line 87 def purge close File.delete(log_path) end |
#push(value, log_trx = true) ⇒ Object
Pushes value
to the queue. By default, push
will write to the transactional log. Set log_trx=false to override this behaviour.
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/starling/persistent_queue.rb', line 45 def push(value, log_trx = true) if log_trx raise NoTransactionLog unless @trx size = [value.size].pack("I") transaction sprintf(TRX_PUSH, size, value) end @total_items += 1 super([now_usec, value]) end |