Class: Hotseat::Queue
- Inherits:
-
Object
- Object
- Hotseat::Queue
- Defined in:
- lib/hotseat/queue.rb
Constant Summary collapse
- DEFAULT_CONFIG =
{ :design_doc_name => 'hotseat_queue', :pending_view_name => 'pending', :locked_view_name => 'locked', :done_view_name => 'done', :all_view_name => 'all', :object_name => 'hotseat', }
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#db ⇒ Object
Returns the value of attribute db.
Instance Method Summary collapse
- #add(doc_id) ⇒ Object
- #add_bulk(doc_ids) ⇒ Object
- #add_lock(doc) ⇒ Object
- #all_view_name ⇒ Object
- #create_and_add_bulk(docs) ⇒ Object
- #design_doc ⇒ Object
- #design_doc_id ⇒ Object
- #done?(doc) ⇒ Boolean
- #done_view_name ⇒ Object
- #forget(doc_id) ⇒ Object
- #forget_bulk(doc_ids) ⇒ Object
- #get(n = 1, options = {}) ⇒ Object
-
#initialize(db, options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #lease(n = 1, query_options = {}) ⇒ Object
- #locked?(doc) ⇒ Boolean
- #locked_view_name ⇒ Object
- #mark_done(doc) ⇒ Object
- #num_all ⇒ Object (also: #num_total)
- #num_done ⇒ Object
- #num_locked ⇒ Object
- #num_pending ⇒ Object (also: #size)
- #patch(doc) ⇒ Object
- #pending_view_name ⇒ Object
- #purge ⇒ Object
- #remove(doc_id, opts = {}) ⇒ Object
- #remove_bulk(doc_ids, opts = {}) ⇒ Object
- #remove_lock(doc) ⇒ Object
- #undo(doc_id) ⇒ Object
- #undo_bulk(doc_ids) ⇒ Object
- #unlease(doc_id) ⇒ Object
- #unlease_all ⇒ Object
- #unlease_bulk(doc_ids) ⇒ Object
- #unpatch(doc) ⇒ Object
Constructor Details
#initialize(db, options = {}) ⇒ Queue
Returns a new instance of Queue.
20 21 22 23 24 25 26 |
# File 'lib/hotseat/queue.rb', line 20 def initialize(db, ={}) @db = db @config = DEFAULT_CONFIG.merge() unless Hotseat.queue?(@db, @config[:design_doc_name]) @db.save_doc design_doc end end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
9 10 11 |
# File 'lib/hotseat/queue.rb', line 9 def config @config end |
#db ⇒ Object
Returns the value of attribute db.
9 10 11 |
# File 'lib/hotseat/queue.rb', line 9 def db @db end |
Instance Method Details
#add(doc_id) ⇒ Object
115 116 117 118 119 120 |
# File 'lib/hotseat/queue.rb', line 115 def add(doc_id) @db.update_doc(doc_id) do |doc| patch doc yield doc if block_given? end end |
#add_bulk(doc_ids) ⇒ Object
122 123 124 125 126 127 |
# File 'lib/hotseat/queue.rb', line 122 def add_bulk(doc_ids) #Note: this silently ignores missing doc_ids docs = @db.bulk_load(doc_ids)['rows'].map{|row| row['doc']}.compact docs.each {|doc| patch doc } @db.bulk_save docs, use_uuids=false end |
#add_lock(doc) ⇒ Object
85 86 87 88 89 |
# File 'lib/hotseat/queue.rb', line 85 def add_lock(doc) obj = doc[config[:object_name]] obj['lock'] = {'at' => Time.now.utc.iso8601, 'by' => $$} doc end |
#all_view_name ⇒ Object
44 45 46 |
# File 'lib/hotseat/queue.rb', line 44 def all_view_name "#{config[:design_doc_name]}/#{config[:all_view_name]}" end |
#create_and_add_bulk(docs) ⇒ Object
129 130 131 132 |
# File 'lib/hotseat/queue.rb', line 129 def create_and_add_bulk(docs) docs.each {|doc| patch doc } @db.bulk_save docs end |
#design_doc ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/hotseat/queue.rb', line 48 def design_doc q = "doc.#{config[:object_name]}" lock = "#{q}.lock" done = "#{q}.done" pending_func = " function(doc) { if (\#{q} && !(\#{lock} || \#{done})) emit(\#{q}.at, null); }\n JAVASCRIPT\n locked_func = <<-JAVASCRIPT\n function(doc) { if (\#{q} && \#{lock}) emit(\#{lock}.at, null); }\n JAVASCRIPT\n done_func = <<-JAVASCRIPT\n function(doc) { if (\#{q} && \#{done}) emit(\#{done}.at, null); }\n JAVASCRIPT\n all_func = <<-JAVASCRIPT\n function(doc) { if (\#{q}) emit(\#{q}.at, null); }\n JAVASCRIPT\n {\n '_id' => \"_design/\#{config[:design_doc_name]}\",\n :views => {\n config[:pending_view_name] => { :map => pending_func.strip },\n config[:locked_view_name] => { :map => locked_func.strip },\n config[:done_view_name] => { :map => done_func.strip },\n config[:all_view_name] => { :map => all_func.strip },\n }\n }\nend\n" |
#design_doc_id ⇒ Object
28 29 30 |
# File 'lib/hotseat/queue.rb', line 28 def design_doc_id "_design/#{config[:design_doc_name]}" end |
#done?(doc) ⇒ Boolean
109 110 111 112 113 |
# File 'lib/hotseat/queue.rb', line 109 def done?(doc) if obj = doc[config[:object_name]] obj.has_key? 'done' end end |
#done_view_name ⇒ Object
40 41 42 |
# File 'lib/hotseat/queue.rb', line 40 def done_view_name "#{config[:design_doc_name]}/#{config[:done_view_name]}" end |
#forget(doc_id) ⇒ Object
253 254 255 256 257 |
# File 'lib/hotseat/queue.rb', line 253 def forget(doc_id) @db.update_doc(doc_id) do |doc| unpatch doc end end |
#forget_bulk(doc_ids) ⇒ Object
259 260 261 262 263 264 |
# File 'lib/hotseat/queue.rb', line 259 def forget_bulk(doc_ids) #Note: this silently ignores missing doc_ids docs = @db.bulk_load(doc_ids)['rows'].map{|row| row['doc']}.compact docs.each {|doc| unpatch doc } @db.bulk_save docs, use_uuids=false end |
#get(n = 1, options = {}) ⇒ Object
139 140 141 142 143 |
# File 'lib/hotseat/queue.rb', line 139 def get(n=1, ={}) params = {:limit => n, :include_docs => true}.merge() rows = @db.view(pending_view_name, params)['rows'] rows.map{|row| row['doc']} unless rows.empty? end |
#lease(n = 1, query_options = {}) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/hotseat/queue.rb', line 145 def lease(n=1, ={}) if docs = get(n, ) docs.each {|doc| add_lock doc } response = @db.bulk_save docs, use_uuids=false # Some docs may have failed to lock - probably updated by another process locked_ids = response.reject{|res| res['error']}.map{|res| res['id']} if locked_ids.length < docs.length # This runs in O(n^2) time. Performance will be bad here if the number of documents # is very large. Assuming that this isn't normally the case I'm keeping it simple. docs.keep_if{|doc| locked_ids.include? doc['_id']} end docs end end |
#locked?(doc) ⇒ Boolean
91 92 93 94 95 |
# File 'lib/hotseat/queue.rb', line 91 def locked?(doc) if obj = doc[config[:object_name]] obj.has_key? 'lock' end end |
#locked_view_name ⇒ Object
36 37 38 |
# File 'lib/hotseat/queue.rb', line 36 def locked_view_name "#{config[:design_doc_name]}/#{config[:locked_view_name]}" end |
#mark_done(doc) ⇒ Object
103 104 105 106 107 |
# File 'lib/hotseat/queue.rb', line 103 def mark_done(doc) obj = doc[config[:object_name]] obj['done'] = {'at' => Time.now.utc.iso8601, 'by' => $$} doc end |
#num_all ⇒ Object Also known as: num_total
248 249 250 |
# File 'lib/hotseat/queue.rb', line 248 def num_all @db.view(all_view_name, :limit => 0)['total_rows'] end |
#num_done ⇒ Object
244 245 246 |
# File 'lib/hotseat/queue.rb', line 244 def num_done @db.view(done_view_name, :limit => 0)['total_rows'] end |
#num_locked ⇒ Object
160 161 162 |
# File 'lib/hotseat/queue.rb', line 160 def num_locked @db.view(locked_view_name, :limit => 0)['total_rows'] end |
#num_pending ⇒ Object Also known as: size
134 135 136 |
# File 'lib/hotseat/queue.rb', line 134 def num_pending @db.view(pending_view_name, :limit => 0)['total_rows'] end |
#patch(doc) ⇒ Object
75 76 77 78 |
# File 'lib/hotseat/queue.rb', line 75 def patch(doc) doc[config[:object_name]] = {'at' => Time.now.utc.iso8601, 'by' => $$} doc end |
#pending_view_name ⇒ Object
32 33 34 |
# File 'lib/hotseat/queue.rb', line 32 def pending_view_name "#{config[:design_doc_name]}/#{config[:pending_view_name]}" end |
#purge ⇒ Object
266 267 268 269 270 271 |
# File 'lib/hotseat/queue.rb', line 266 def purge rows = @db.view(all_view_name, :include_docs => true)['rows'] docs = rows.map{|row| row['doc']} docs.each{|doc| unpatch doc } @db.bulk_save docs, use_uuids=false end |
#remove(doc_id, opts = {}) ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/hotseat/queue.rb', line 192 def remove(doc_id, opts={}) @db.update_doc(doc_id) do |doc| raise(QueueError, "Document was already removed") unless locked?(doc) if opts.delete(:forget) unpatch doc else mark_done( remove_lock( doc ) ) end yield doc if block_given? end end |
#remove_bulk(doc_ids, opts = {}) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/hotseat/queue.rb', line 204 def remove_bulk(doc_ids, opts={}) rows = @db.bulk_load(doc_ids)['rows'] docs, missing = rows.partition {|row| row['doc'] } docs.map! {|row| row['doc'] } locked, unlocked = docs.partition {|doc| locked? doc } forget = opts.delete(:forget) locked.each do |doc| if forget unpatch doc else mark_done( remove_lock( doc ) ) end end @db.bulk_save locked, use_uuids=false {'errors' => unlocked.map {|doc| {'id' => doc['_id'], 'error' => 'unlocked' } } + missing.map {|row| {'id' => row['key'], 'error' => row['error']} } } end |
#remove_lock(doc) ⇒ Object
97 98 99 100 101 |
# File 'lib/hotseat/queue.rb', line 97 def remove_lock(doc) obj = doc[config[:object_name]] obj.delete 'lock' doc end |
#undo(doc_id) ⇒ Object
224 225 226 227 228 229 230 |
# File 'lib/hotseat/queue.rb', line 224 def undo(doc_id) @db.update_doc(doc_id) do |doc| raise(QueueError, "Document is not done") unless done?(doc) obj = doc[config[:object_name]] obj.delete 'done' end end |
#undo_bulk(doc_ids) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/hotseat/queue.rb', line 232 def undo_bulk(doc_ids) rows = @db.bulk_load(doc_ids)['rows'] docs, missing = rows.partition {|row| row['doc'] } docs.map! {|row| row['doc'] } done, not_done = docs.partition {|doc| done? doc } done.each do |doc| obj = doc[config[:object_name]] obj.delete 'done' end @db.bulk_save done, use_uuids=false end |
#unlease(doc_id) ⇒ Object
164 165 166 167 168 169 170 |
# File 'lib/hotseat/queue.rb', line 164 def unlease(doc_id) @db.update_doc(doc_id) do |doc| raise(QueueError, "Document is already unlocked") unless locked?(doc) remove_lock doc yield doc if block_given? end end |
#unlease_all ⇒ Object
187 188 189 190 |
# File 'lib/hotseat/queue.rb', line 187 def unlease_all doc_ids = db.view(locked_view_name)['rows'].map {|doc| doc['id'] } unlease_bulk doc_ids end |
#unlease_bulk(doc_ids) ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/hotseat/queue.rb', line 172 def unlease_bulk(doc_ids) rows = @db.bulk_load(doc_ids)['rows'] docs, missing = rows.partition {|row| row['doc'] } docs.map! {|row| row['doc'] } locked, unlocked = docs.partition {|doc| locked? doc } locked.each do |doc| remove_lock( doc ) end @db.bulk_save locked, use_uuids=false {'errors' => unlocked.map {|doc| {'id' => doc['_id'], 'error' => 'unlocked' } } + missing.map {|row| {'id' => row['key'], 'error' => row['error']} } } end |
#unpatch(doc) ⇒ Object
80 81 82 83 |
# File 'lib/hotseat/queue.rb', line 80 def unpatch(doc) doc.delete( config[:object_name] ) doc end |