Class: StarlingServer::PersistentQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/starling/persistent_queue.rb

Overview

PersistentQueue is a subclass of Ruby’s thread-safe Queue class. It adds a transactional log to the in-memory Queue, which enables quickly rebuilding the Queue in the event of a sever outage.

Constant Summary collapse

SOFT_LOG_MAX_SIZE =

When a log reaches the SOFT_LOG_MAX_SIZE, the Queue will wait until it is empty, and will then rotate the log file.

16 * (1024**2)
TRX_CMD_PUSH =
"\000".freeze
TRX_CMD_POP =
"\001".freeze
TRX_PUSH =
"\000%s%s".freeze
TRX_POP =
"\001".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(persistence_path, queue_name, debug = false) ⇒ PersistentQueue

Create a new PersistentQueue at persistence_path/queue_name. If a queue log exists at that path, the Queue will be loaded from disk before being available for use.



32
33
34
35
36
37
38
39
# File 'lib/starling/persistent_queue.rb', line 32

def initialize(persistence_path, queue_name, debug = false)
  @persistence_path = persistence_path
  @queue_name = queue_name
  @total_items = 0
  super()
  @initial_bytes = replay_transaction_log(debug)
  @current_age = 0
end

Instance Attribute Details

#current_ageObject (readonly)

Returns the value of attribute current_age.



25
26
27
# File 'lib/starling/persistent_queue.rb', line 25

def current_age
  @current_age
end

#initial_bytesObject (readonly)

Returns the value of attribute initial_bytes.



22
23
24
# File 'lib/starling/persistent_queue.rb', line 22

def initial_bytes
  @initial_bytes
end

#logsizeObject (readonly)

Returns the value of attribute logsize.



24
25
26
# File 'lib/starling/persistent_queue.rb', line 24

def logsize
  @logsize
end

#total_itemsObject (readonly)

Returns the value of attribute total_items.



23
24
25
# File 'lib/starling/persistent_queue.rb', line 23

def total_items
  @total_items
end

Instance Method Details

#closeObject

Safely closes the transactional queue.



76
77
78
79
80
81
82
83
84
85
# File 'lib/starling/persistent_queue.rb', line 76

def close
  # Ok, yeah, this is lame, and is *technically* a race condition. HOWEVER,
  # the QueueCollection *should* have stopped processing requests, and I don't
  # want to add yet another Mutex around all the push and pop methods. So we
  # do the next simplest thing, and minimize the time we'll stick around before
  # @trx is nil.
  @not_trx = @trx
  @trx = nil
  @not_trx.close
end

#pop(log_trx = true) ⇒ Object

Retrieves data from the queue.

Raises:

  • (NoTransactionLog)


59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/starling/persistent_queue.rb', line 59

def pop(log_trx = true)
  raise NoTransactionLog if log_trx && !@trx

  begin
    rv = super(!log_trx)
  rescue ThreadError
    puts "WARNING: The queue was empty when trying to pop(). Technically this shouldn't ever happen. Probably a bug in the transactional underpinnings. Or maybe shutdown didn't happen cleanly at some point. Ignoring."
    rv = [now_usec, '']
  end
  transaction "\001" if log_trx
  @current_age = (now_usec - rv[0]) / 1000
  rv[1]
end

#purgeObject



87
88
89
90
# File 'lib/starling/persistent_queue.rb', line 87

def purge
  close
  File.delete(log_path)
end

#push(value, log_trx = true) ⇒ Object

Pushes value to the queue. By default, push will write to the transactional log. Set log_trx=false to override this behaviour.



45
46
47
48
49
50
51
52
53
54
# File 'lib/starling/persistent_queue.rb', line 45

def push(value, log_trx = true)
  if log_trx
    raise NoTransactionLog unless @trx
    size = [value.size].pack("I")
    transaction sprintf(TRX_PUSH, size, value)
  end

  @total_items += 1
  super([now_usec, value])
end