Class: Starling

Inherits:
MemCache show all
Defined in:
lib/starling.rb

Constant Summary collapse

WAIT_TIME =
0.25

Instance Method Summary collapse

Instance Method Details

#_original_deleteObject



7
# File 'lib/starling.rb', line 7

alias_method :_original_delete, :delete

#_original_getObject



6
# File 'lib/starling.rb', line 6

alias_method :_original_get, :get

#available_queues(statistics = nil) ⇒ Object

returns a list of available (currently allocated) queues.



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/starling.rb', line 106

def available_queues(statistics = nil)
  statistics ||= stats

  statistics.map { |k,v|
    v.keys
  }.flatten.uniq.grep(/^queue_(.*)_items/).map { |v|
    v.gsub(/^queue_/, '').gsub(/_items$/, '')
  }.reject { |v|
    v =~ /_total$/ || v =~ /_expired$/
  }
end

#delete(key, expiry = 0) ⇒ Object

Delete the key (queue) from all Starling servers. This is necessary because the random way a server is chosen in #get_server_for_key implies that the queue could easily be spread across the entire Starling cluster.



33
34
35
36
37
# File 'lib/starling.rb', line 33

def delete(key, expiry = 0)
  with_servers do
    _original_delete(key, expiry)
  end
end

#fetch(*args) ⇒ Object

will return the next item or nil



23
24
25
# File 'lib/starling.rb', line 23

def fetch(*args)
  _original_get(*args)
end

#flush(queue) ⇒ Object

iterator to flush queue. Each element will be passed to the provided block



122
123
124
125
126
127
# File 'lib/starling.rb', line 122

def flush(queue)
  sizeof(queue).times do
    v = get(queue)
    yield v if block_given?
  end
end

#get(*args) ⇒ Object

fetch an item from a queue.



12
13
14
15
16
17
18
# File 'lib/starling.rb', line 12

def get(*args)
  loop do
    response = _original_get(*args)
    return response unless response.nil?
    sleep WAIT_TIME
  end
end

#set(queue, value, expiry = 0, raw = false) ⇒ Object

insert value into queue.

expiry is expressed as a UNIX timestamp

If raw is true, value will not be Marshalled. If raw = :yaml, value will be serialized with YAML, instead.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/starling.rb', line 68

def set(queue, value, expiry = 0, raw = false)
  retries = 0
  begin
    if raw == :yaml
      value = YAML.dump(value)
      raw = true
    end

    super(queue, value, expiry, raw)
  rescue MemCache::MemCacheError => e
    retries += 1
    sleep WAIT_TIME
    retry unless retries > 3
    raise e
  end
end

#sizeof(queue, statistics = nil) ⇒ Object

returns the number of items in queue. If queue is :all, a hash of all queue sizes will be returned.



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/starling.rb', line 89

def sizeof(queue, statistics = nil)
  statistics ||= stats

  if queue == :all
    queue_sizes = {}
    available_queues(statistics).each do |queue|
      queue_sizes[queue] = sizeof(queue, statistics)
    end
    return queue_sizes
  end

  statistics.inject(0) { |m,(k,v)| m + v["queue_#{queue}_items"].to_i }
end

#with_servers(my_servers = @servers.dup) ⇒ Object

Provides a way to work with a specific list of servers by forcing all calls to #get_server_for_key to use a specific server, and changing that server each time that the call yields to the block provided. This helps work around the normally random nature of the #get_server_for_key method.

Acquires the mutex for the entire duration of the call since unrelated calls to #get_server_for_key might be adversely affected by the non_random result.



49
50
51
52
53
54
55
56
57
58
# File 'lib/starling.rb', line 49

def with_servers(my_servers = @servers.dup)
  return unless block_given?
  with_lock do
    my_servers.each do |server|
      @force_server = server
      yield
    end
    @force_server = nil
  end
end