Class: Starling

Inherits:
MemCache
  • Object
show all
Defined in:
lib/starling.rb

Constant Summary collapse

WAIT_TIME =
0.25

Instance Method Summary collapse

Instance Method Details

#_original_deleteObject



7
# File 'lib/starling.rb', line 7

alias_method :_original_delete, :delete

#_original_getObject



6
# File 'lib/starling.rb', line 6

alias_method :_original_get, :get

#available_queues(statistics = nil) ⇒ Object

returns a list of available (currently allocated) queues.



99
100
101
102
103
104
105
106
107
108
# File 'lib/starling.rb', line 99

def available_queues(statistics = nil)
  statistics ||= stats

  res = []
  # Each server returns its own statistics
  statistics.each_pair { |k,v| res = res + v.keys }
  # Here we rely on the name of one of the statistic names to find the queue.
  # The name of the queue is embedded in the statistic name
  res.collect! {|k| /^queue_(.*)_total_items/.match(k)[1] rescue nil }.compact!
end

#delete(key, expiry = 0) ⇒ Object

Delete the key (queue) from all Starling servers. This is necessary because the random way a server is chosen in #get_server_for_key implies that the queue could easily be spread across the entire Starling cluster.



33
34
35
36
37
# File 'lib/starling.rb', line 33

def delete(key, expiry = 0)
  with_servers do
    _original_delete(key, expiry)
  end
end

#fetch(*args) ⇒ Object

will return the next item or nil



23
24
25
# File 'lib/starling.rb', line 23

def fetch(*args)
  _original_get(*args)
end

#flush(queue) ⇒ Object

iterator to flush queue. Each element will be passed to the provided block



114
115
116
117
118
119
# File 'lib/starling.rb', line 114

def flush(queue)
  sizeof(queue).times do
    v = get(queue)
    yield v if block_given?
  end
end

#get(*args) ⇒ Object

fetch an item from a queue.



12
13
14
15
16
17
18
# File 'lib/starling.rb', line 12

def get(*args)
  loop do
    response = _original_get(*args)
    return response unless response.nil?
    sleep WAIT_TIME
  end
end

#set(queue, value, expiry = 0, raw = false) ⇒ Object

insert value into queue.

expiry is expressed as a UNIX timestamp

If raw is true, value will not be Marshalled. If raw = :yaml, value will be serialized with YAML, instead.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/starling.rb', line 61

def set(queue, value, expiry = 0, raw = false)
  retries = 0
  begin
    if raw == :yaml
      value = YAML.dump(value)
      raw = true
    end

    super(queue, value, expiry, raw)
  rescue MemCache::MemCacheError => e
    retries += 1
    sleep WAIT_TIME
    retry unless retries > 3
    raise e
  end
end

#sizeof(queue, statistics = nil) ⇒ Object

returns the number of items in queue. If queue is :all, a hash of all queue sizes will be returned.



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/starling.rb', line 82

def sizeof(queue, statistics = nil)
  statistics ||= stats

  if queue == :all
    queue_sizes = {}
    available_queues(statistics).each do |queue|
      queue_sizes[queue] = sizeof(queue, statistics)
    end
    return queue_sizes
  end

  statistics.inject(0) { |m,(k,v)| m + v["queue_#{queue}_items"].to_i }
end

#with_servers(my_servers = self.servers.dup) ⇒ Object

Provides a way to work with a specific list of servers by forcing all calls to #get_server_for_key to use a specific server, and changing that server each time that the call yields to the block provided. This helps work around the normally random nature of the #get_server_for_key method.



46
47
48
49
50
51
# File 'lib/starling.rb', line 46

def with_servers(my_servers = self.servers.dup)
  return unless block_given?
  my_servers.each do |server|
    yield
  end
end