Class: EQ::Queueing::Backends::LevelDB::JobsCollection
- Inherits:
-
Struct
- Object
- Struct
- EQ::Queueing::Backends::LevelDB::JobsCollection
show all
- Includes:
- Logging
- Defined in:
- lib/eq-queueing/backends/leveldb.rb
Constant Summary
collapse
- QUEUE =
'queue'.freeze
- PAYLOAD =
'payload'.freeze
- CREATED_AT =
'created_at'.freeze
- STARTED_WORKING_AT =
'started_working_at'.freeze
- NOT_WORKING =
''.freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
#debug, #info, #log_error
Instance Attribute Details
#db ⇒ Object
Returns the value of attribute db
8
9
10
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 8
def db
@db
end
|
#name ⇒ Object
Returns the value of attribute name
8
9
10
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 8
def name
@name
end
|
Instance Method Details
#count ⇒ Object
128
129
130
131
132
133
134
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 128
def count
result = 0
db.each do |k,v|
result += 1 if k.include?(QUEUE)
end
result
end
|
#count_waiting ⇒ Object
136
137
138
139
140
141
142
143
144
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 136
def count_waiting
result = 0
db.each do |k,v|
if k.include?(STARTED_WORKING_AT) && v == NOT_WORKING
result += 1
end
end
result
end
|
#count_working ⇒ Object
146
147
148
149
150
151
152
153
154
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 146
def count_working
result = 0
db.each do |k,v|
if k.include?(STARTED_WORKING_AT) && v != NOT_WORKING
result += 1
end
end
result
end
|
#delete(job_id) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 56
def delete job_id
did_exist = !db["#{QUEUE}:#{job_id}"].nil?
db.batch do |batch|
batch.delete "#{QUEUE}:#{job_id}"
batch.delete "#{PAYLOAD}:#{job_id}"
batch.delete "#{CREATED_AT}:#{job_id}"
batch.delete "#{STARTED_WORKING_AT}:#{job_id}"
end
does_not_exist = db["#{QUEUE}:#{job_id}"].nil?
did_exist && does_not_exist
end
|
#deserialize(data) ⇒ Object
124
125
126
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 124
def deserialize data
Marshal.load(data)
end
|
#each ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 156
def each
db.each do |k,v|
if k.include?(QUEUE)
job_id = job_id_from_key(k)
yield(id: job_id,
queue: v,
payload: find_payload(job_id),
created_at: find_created_at(job_id),
started_working_at: find_started_working_at(job_id))
end
end
end
|
#exists?(job) ⇒ Boolean
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 45
def exists? job
db.each do |k,v|
if k.include?(QUEUE) && v == job.queue
if find_payload(job_id_from_key(k)) == job.payload
return true
end
end
end
false
end
|
#find_created_at(job_id) ⇒ Object
88
89
90
91
92
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 88
def find_created_at job_id
if serialized_time = db["#{CREATED_AT}:#{job_id}"]
deserialize(serialized_time)
end
end
|
#find_free_job_id ⇒ Object
try as hard as you can to find a free slot
106
107
108
109
110
111
112
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 106
def find_free_job_id
loop do
job_id = generate_id
return job_id unless db.contains? "#{QUEUE}:#{job_id}"
debug "#{job_id} is not free"
end
end
|
#find_payload(job_id) ⇒ Object
80
81
82
83
84
85
86
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 80
def find_payload job_id
if raw = db["#{PAYLOAD}:#{job_id}"]
deserialize db["#{PAYLOAD}:#{job_id}"]
else
nil
end
end
|
#find_queue(job_id) ⇒ Object
76
77
78
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 76
def find_queue job_id
db["#{QUEUE}:#{job_id}"]
end
|
#find_started_working_at(job_id) ⇒ Object
94
95
96
97
98
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 94
def find_started_working_at job_id
if serialized_time = db["#{STARTED_WORKING_AT}:#{job_id}"]
deserialize(serialized_time)
end
end
|
#first_waiting ⇒ Object
27
28
29
30
31
32
33
34
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 27
def first_waiting
db.each do |k,v|
if k.include?(STARTED_WORKING_AT) && v == NOT_WORKING
return job_id_from_key(k)
end
end
nil
end
|
#generate_id ⇒ Object
Note:
Maybe this is a stupid idea, but for now it kinda works :)
Time in milliseconds and 4 digit random
116
117
118
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 116
def generate_id
'%d%04d' % [(Time.now.to_f * 1000.0).to_i, Kernel.rand(1000)]
end
|
#job_id_from_key(key) ⇒ Object
100
101
102
103
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 100
def job_id_from_key key
prefix, job_id = *key.split(':')
job_id
end
|
#push(job) ⇒ Object
18
19
20
21
22
23
24
25
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 18
def push job
job_id = find_free_job_id
db["#{QUEUE}:#{job_id}"] = job.queue
db["#{PAYLOAD}:#{job_id}"] = serialize(job.payload) unless job.payload.nil?
db["#{CREATED_AT}:#{job_id}"] = serialize(Time.now)
db["#{STARTED_WORKING_AT}:#{job_id}"] = NOT_WORKING
job_id
end
|
#serialize(data) ⇒ Object
120
121
122
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 120
def serialize data
Marshal.dump(data)
end
|
#start_working(job_id) ⇒ Object
68
69
70
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 68
def start_working job_id
db["#{STARTED_WORKING_AT}:#{job_id}"] = serialize(Time.now)
end
|
#stop_working(job_id) ⇒ Object
#working_iterator ⇒ Object
36
37
38
39
40
41
42
|
# File 'lib/eq-queueing/backends/leveldb.rb', line 36
def working_iterator
db.each do |k,v|
if k.include?(STARTED_WORKING_AT) && v != NOT_WORKING
yield job_id_from_key(k), deserialize(v)
end
end
end
|