Class: Sqew::Backend::LevelDB

Inherits:
Qu::Backend::Base
  • Object
show all
Defined in:
lib/sqew/backend/leveldb.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#dbObject

Returns the value of attribute db.



4
5
6
# File 'lib/sqew/backend/leveldb.rb', line 4

def db
  @db
end

Instance Method Details

#clear(*queues) ⇒ Object



15
16
17
18
# File 'lib/sqew/backend/leveldb.rb', line 15

def clear(*queues)
  drop_all(queue) if queues.include?("queue") || queues.empty?
  drop_all(errors) if queues.include?("failed") || queues.empty?
end

#clear_runningObject



20
21
22
# File 'lib/sqew/backend/leveldb.rb', line 20

def clear_running
  drop_all(running)
end

#closeObject



79
80
81
82
83
84
# File 'lib/sqew/backend/leveldb.rb', line 79

def close
  queue.close
  running.close
  errors.close
  @queue, @running, @errors = nil
end

#completed(payload) ⇒ Object



49
50
51
# File 'lib/sqew/backend/leveldb.rb', line 49

def completed(payload)
  running.delete(payload.id, :sync => true)
end

#delete(id) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/sqew/backend/leveldb.rb', line 24

def delete(id)
  if queue.exists?(id)
    queue.delete(id)
  elsif errors.exists?(id)
    errors.delete(id)
  end
end

#enqueue(payload) ⇒ Object



6
7
8
9
# File 'lib/sqew/backend/leveldb.rb', line 6

def enqueue(payload)
  id = Time.now.to_f.to_s
  queue.put(id, MultiJson.encode(klass:payload.klass.to_s, args:payload.args), :sync => true)
end

#failed(payload, error) ⇒ Object



53
54
55
56
# File 'lib/sqew/backend/leveldb.rb', line 53

def failed(payload, error)
  running.delete(payload.id, :sync => true)
  errors.put(payload.id, MultiJson.encode("klass" => payload.klass.to_s, "args" => payload.args, "error" => error.message, "backtrace" => error.backtrace.join("\n")), :sync => true)
end

#failed_jobsObject



58
59
60
# File 'lib/sqew/backend/leveldb.rb', line 58

def failed_jobs
  errors.to_a.map {|k,v| MultiJson.decode(v).update("id" => k) }
end

#lengthObject



11
12
13
# File 'lib/sqew/backend/leveldb.rb', line 11

def length(*)
  queue.keys.length
end

#queued_jobsObject



66
67
68
# File 'lib/sqew/backend/leveldb.rb', line 66

def queued_jobs
  queue.to_a.map {|k,v| MultiJson.decode(v).update("id" => k) }
end

#register_workerObject



73
74
# File 'lib/sqew/backend/leveldb.rb', line 73

def register_worker(*)
end

#releaseObject



70
71
# File 'lib/sqew/backend/leveldb.rb', line 70

def release(*)
end

#reserve(_, options = {block:false}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/sqew/backend/leveldb.rb', line 32

def reserve(_, options = {block:false})
  loop do
    if raw = queue.first
      id, job = raw
      queue.delete(id, :sync => true)
      running.put(id, job, :sync => true)
      return Sqew::Payload.new(MultiJson.decode(job).update(id:id))
    end
    
    if options[:block]
      sleep 3
    else
      break
    end
  end
end

#running_jobsObject



62
63
64
# File 'lib/sqew/backend/leveldb.rb', line 62

def running_jobs
  running.to_a.map {|k,v| MultiJson.decode(v).update("id" => k) }
end

#unregister_workerObject



76
77
# File 'lib/sqew/backend/leveldb.rb', line 76

def unregister_worker(*)
end