Class: Hotseat::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/hotseat/queue.rb

Constant Summary collapse

DEFAULT_CONFIG =
{
  :design_doc_name => 'hotseat_queue',
  :pending_view_name => 'pending',
  :locked_view_name => 'locked',
  :done_view_name => 'done',
  :all_view_name => 'all',
  :object_name => 'hotseat',
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, options = {}) ⇒ Queue

Returns a new instance of Queue.



20
21
22
23
24
25
26
# File 'lib/hotseat/queue.rb', line 20

def initialize(db, options={})
  @db = db
  @config = DEFAULT_CONFIG.merge(options)
  unless Hotseat.queue?(@db, @config[:design_doc_name])
    @db.save_doc design_doc
  end
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



9
10
11
# File 'lib/hotseat/queue.rb', line 9

def config
  @config
end

#dbObject

Returns the value of attribute db.



9
10
11
# File 'lib/hotseat/queue.rb', line 9

def db
  @db
end

Instance Method Details

#add(doc_id) ⇒ Object



115
116
117
118
119
120
# File 'lib/hotseat/queue.rb', line 115

def add(doc_id)
  @db.update_doc(doc_id) do |doc|
    patch doc
    yield doc if block_given?
  end
end

#add_bulk(doc_ids) ⇒ Object



122
123
124
125
126
127
# File 'lib/hotseat/queue.rb', line 122

def add_bulk(doc_ids)
  #Note: this silently ignores missing doc_ids
  docs = @db.bulk_load(doc_ids)['rows'].map{|row| row['doc']}.compact
  docs.each {|doc| patch doc }
  @db.bulk_save docs, use_uuids=false
end

#add_lock(doc) ⇒ Object



85
86
87
88
89
# File 'lib/hotseat/queue.rb', line 85

def add_lock(doc)
  obj = doc[config[:object_name]]
  obj['lock'] = {'at' => Time.now.utc.iso8601, 'by' => $$}
  doc
end

#all_view_nameObject



44
45
46
# File 'lib/hotseat/queue.rb', line 44

def all_view_name
  "#{config[:design_doc_name]}/#{config[:all_view_name]}"
end

#create_and_add_bulk(docs) ⇒ Object



129
130
131
132
# File 'lib/hotseat/queue.rb', line 129

def create_and_add_bulk(docs)
  docs.each {|doc| patch doc }
  @db.bulk_save docs
end

#design_docObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/hotseat/queue.rb', line 48

def design_doc
  q = "doc.#{config[:object_name]}"
  lock = "#{q}.lock"
  done = "#{q}.done"
  pending_func = "    function(doc) { if (\#{q} && !(\#{lock} || \#{done})) emit(\#{q}.at, null); }\n  JAVASCRIPT\n  locked_func = <<-JAVASCRIPT\n    function(doc) { if (\#{q} && \#{lock}) emit(\#{lock}.at, null); }\n  JAVASCRIPT\n  done_func = <<-JAVASCRIPT\n    function(doc) { if (\#{q} && \#{done}) emit(\#{done}.at, null); }\n  JAVASCRIPT\n  all_func = <<-JAVASCRIPT\n    function(doc) { if (\#{q}) emit(\#{q}.at, null); }\n  JAVASCRIPT\n  {\n    '_id' => \"_design/\#{config[:design_doc_name]}\",\n    :views => {\n      config[:pending_view_name] => { :map => pending_func.strip },\n      config[:locked_view_name] => { :map => locked_func.strip },\n      config[:done_view_name] => { :map => done_func.strip },\n      config[:all_view_name] => { :map => all_func.strip },\n    }\n  }\nend\n"

#design_doc_idObject



28
29
30
# File 'lib/hotseat/queue.rb', line 28

def design_doc_id
  "_design/#{config[:design_doc_name]}"
end

#done?(doc) ⇒ Boolean

Returns:

  • (Boolean)


109
110
111
112
113
# File 'lib/hotseat/queue.rb', line 109

def done?(doc)
  if obj = doc[config[:object_name]]
    obj.has_key? 'done'
  end
end

#done_view_nameObject



40
41
42
# File 'lib/hotseat/queue.rb', line 40

def done_view_name
  "#{config[:design_doc_name]}/#{config[:done_view_name]}"
end

#forget(doc_id) ⇒ Object



253
254
255
256
257
# File 'lib/hotseat/queue.rb', line 253

def forget(doc_id)
  @db.update_doc(doc_id) do |doc|
    unpatch doc
  end
end

#forget_bulk(doc_ids) ⇒ Object



259
260
261
262
263
264
# File 'lib/hotseat/queue.rb', line 259

def forget_bulk(doc_ids)
  #Note: this silently ignores missing doc_ids
  docs = @db.bulk_load(doc_ids)['rows'].map{|row| row['doc']}.compact
  docs.each {|doc| unpatch doc }
  @db.bulk_save docs, use_uuids=false
end

#get(n = 1, options = {}) ⇒ Object



139
140
141
142
143
# File 'lib/hotseat/queue.rb', line 139

def get(n=1, options={})
  params = {:limit => n, :include_docs => true}.merge(options)
  rows = @db.view(pending_view_name, params)['rows']
  rows.map{|row| row['doc']} unless rows.empty?
end

#lease(n = 1, query_options = {}) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/hotseat/queue.rb', line 145

def lease(n=1, query_options={})
  if docs = get(n, query_options)
    docs.each {|doc| add_lock doc }
    response = @db.bulk_save docs, use_uuids=false
    # Some docs may have failed to lock - probably updated by another process
    locked_ids = response.reject{|res| res['error']}.map{|res| res['id']}
    if locked_ids.length < docs.length
      # This runs in O(n^2) time. Performance will be bad here if the number of documents
      # is very large. Assuming that this isn't normally the case I'm keeping it simple.
      docs.keep_if{|doc| locked_ids.include? doc['_id']}
    end
    docs
  end
end

#locked?(doc) ⇒ Boolean

Returns:

  • (Boolean)


91
92
93
94
95
# File 'lib/hotseat/queue.rb', line 91

def locked?(doc)
  if obj = doc[config[:object_name]]
    obj.has_key? 'lock'
  end
end

#locked_view_nameObject



36
37
38
# File 'lib/hotseat/queue.rb', line 36

def locked_view_name
  "#{config[:design_doc_name]}/#{config[:locked_view_name]}"
end

#mark_done(doc) ⇒ Object



103
104
105
106
107
# File 'lib/hotseat/queue.rb', line 103

def mark_done(doc)
  obj = doc[config[:object_name]]
  obj['done'] = {'at' => Time.now.utc.iso8601, 'by' => $$}
  doc
end

#num_allObject Also known as: num_total



248
249
250
# File 'lib/hotseat/queue.rb', line 248

def num_all
  @db.view(all_view_name, :limit => 0)['total_rows']
end

#num_doneObject



244
245
246
# File 'lib/hotseat/queue.rb', line 244

def num_done
  @db.view(done_view_name, :limit => 0)['total_rows']
end

#num_lockedObject



160
161
162
# File 'lib/hotseat/queue.rb', line 160

def num_locked
  @db.view(locked_view_name, :limit => 0)['total_rows']
end

#num_pendingObject Also known as: size



134
135
136
# File 'lib/hotseat/queue.rb', line 134

def num_pending
  @db.view(pending_view_name, :limit => 0)['total_rows']
end

#patch(doc) ⇒ Object



75
76
77
78
# File 'lib/hotseat/queue.rb', line 75

def patch(doc)
  doc[config[:object_name]] = {'at' => Time.now.utc.iso8601, 'by' => $$}
  doc
end

#pending_view_nameObject



32
33
34
# File 'lib/hotseat/queue.rb', line 32

def pending_view_name
  "#{config[:design_doc_name]}/#{config[:pending_view_name]}"
end

#purgeObject



266
267
268
269
270
271
# File 'lib/hotseat/queue.rb', line 266

def purge
  rows = @db.view(all_view_name, :include_docs => true)['rows']
  docs = rows.map{|row| row['doc']}
  docs.each{|doc| unpatch doc }
  @db.bulk_save docs, use_uuids=false
end

#remove(doc_id, opts = {}) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
# File 'lib/hotseat/queue.rb', line 192

def remove(doc_id, opts={})
  @db.update_doc(doc_id) do |doc|
    raise(QueueError, "Document was already removed") unless locked?(doc)
    if opts.delete(:forget)
      unpatch doc
    else
      mark_done( remove_lock( doc ) )
    end
    yield doc if block_given?
  end
end

#remove_bulk(doc_ids, opts = {}) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/hotseat/queue.rb', line 204

def remove_bulk(doc_ids, opts={})
  rows = @db.bulk_load(doc_ids)['rows']
  docs, missing = rows.partition {|row| row['doc'] }
  docs.map! {|row| row['doc'] }
  locked, unlocked = docs.partition {|doc| locked? doc }
  forget = opts.delete(:forget)
  locked.each do |doc|
    if forget
      unpatch doc
    else
      mark_done( remove_lock( doc ) )
    end
  end
  @db.bulk_save locked, use_uuids=false
  {'errors' =>
    unlocked.map {|doc| {'id' => doc['_id'], 'error' => 'unlocked' } } +
    missing.map {|row| {'id' => row['key'], 'error' => row['error']} }
  }
end

#remove_lock(doc) ⇒ Object



97
98
99
100
101
# File 'lib/hotseat/queue.rb', line 97

def remove_lock(doc)
  obj = doc[config[:object_name]]
  obj.delete 'lock'
  doc
end

#undo(doc_id) ⇒ Object



224
225
226
227
228
229
230
# File 'lib/hotseat/queue.rb', line 224

def undo(doc_id)
  @db.update_doc(doc_id) do |doc|
    raise(QueueError, "Document is not done") unless done?(doc)
    obj = doc[config[:object_name]]
    obj.delete 'done'
  end
end

#undo_bulk(doc_ids) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
# File 'lib/hotseat/queue.rb', line 232

def undo_bulk(doc_ids)
  rows = @db.bulk_load(doc_ids)['rows']
  docs, missing = rows.partition {|row| row['doc'] }
  docs.map! {|row| row['doc'] }
  done, not_done = docs.partition {|doc| done? doc }
  done.each do |doc|
    obj = doc[config[:object_name]]
    obj.delete 'done'
  end
  @db.bulk_save done, use_uuids=false
end

#unlease(doc_id) ⇒ Object



164
165
166
167
168
169
170
# File 'lib/hotseat/queue.rb', line 164

def unlease(doc_id)
  @db.update_doc(doc_id) do |doc|
    raise(QueueError, "Document is already unlocked") unless locked?(doc)
    remove_lock doc
    yield doc if block_given?
  end
end

#unlease_allObject



187
188
189
190
# File 'lib/hotseat/queue.rb', line 187

def unlease_all
  doc_ids = db.view(locked_view_name)['rows'].map {|doc| doc['id'] }
  unlease_bulk doc_ids
end

#unlease_bulk(doc_ids) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/hotseat/queue.rb', line 172

def unlease_bulk(doc_ids)
  rows = @db.bulk_load(doc_ids)['rows']
  docs, missing = rows.partition {|row| row['doc'] }
  docs.map! {|row| row['doc'] }
  locked, unlocked = docs.partition {|doc| locked? doc }
  locked.each do |doc|
    remove_lock( doc )
  end
  @db.bulk_save locked, use_uuids=false
  {'errors' =>
    unlocked.map {|doc| {'id' => doc['_id'], 'error' => 'unlocked' } } +
    missing.map {|row| {'id' => row['key'], 'error' => row['error']} }
  }
end

#unpatch(doc) ⇒ Object



80
81
82
83
# File 'lib/hotseat/queue.rb', line 80

def unpatch(doc)
  doc.delete( config[:object_name] )
  doc
end