Class: EQ::Queueing::Backends::LevelDB::JobsCollection

Inherits:
Struct
  • Object
show all
Includes:
Logging
Defined in:
lib/eq-queueing/backends/leveldb.rb

Constant Summary collapse

QUEUE =
'queue'.freeze
PAYLOAD =
'payload'.freeze
CREATED_AT =
'created_at'.freeze
STARTED_WORKING_AT =
'started_working_at'.freeze
NOT_WORKING =
''.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#debug, #info, #log_error

Instance Attribute Details

#dbObject

Returns the value of attribute db

Returns:

  • (Object)

    the current value of db



8
9
10
# File 'lib/eq-queueing/backends/leveldb.rb', line 8

def db
  @db
end

#nameObject

Returns the value of attribute name

Returns:

  • (Object)

    the current value of name



8
9
10
# File 'lib/eq-queueing/backends/leveldb.rb', line 8

def name
  @name
end

Instance Method Details

#countObject



128
129
130
131
132
133
134
# File 'lib/eq-queueing/backends/leveldb.rb', line 128

def count
  result = 0
  db.each do |k,v|
    result += 1 if k.include?(QUEUE)
  end
  result
end

#count_waitingObject



136
137
138
139
140
141
142
143
144
# File 'lib/eq-queueing/backends/leveldb.rb', line 136

def count_waiting
  result = 0
  db.each do |k,v|
    if k.include?(STARTED_WORKING_AT) && v == NOT_WORKING
      result += 1
    end
  end
  result
end

#count_workingObject



146
147
148
149
150
151
152
153
154
# File 'lib/eq-queueing/backends/leveldb.rb', line 146

def count_working
  result = 0
  db.each do |k,v|
    if k.include?(STARTED_WORKING_AT) && v != NOT_WORKING
      result += 1
    end
  end
  result
end

#delete(job_id) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/eq-queueing/backends/leveldb.rb', line 56

def delete job_id
  did_exist = !db["#{QUEUE}:#{job_id}"].nil?
  db.batch do |batch|
    batch.delete "#{QUEUE}:#{job_id}"
    batch.delete "#{PAYLOAD}:#{job_id}"
    batch.delete "#{CREATED_AT}:#{job_id}"
    batch.delete "#{STARTED_WORKING_AT}:#{job_id}"
  end
  does_not_exist = db["#{QUEUE}:#{job_id}"].nil?
  did_exist && does_not_exist
end

#deserialize(data) ⇒ Object



124
125
126
# File 'lib/eq-queueing/backends/leveldb.rb', line 124

def deserialize data
  Marshal.load(data)
end

#eachObject



156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/eq-queueing/backends/leveldb.rb', line 156

def each
  # TODO optimize this (and others) using range queries
  db.each do |k,v|
    if k.include?(QUEUE)
      job_id = job_id_from_key(k)
      yield(id: job_id,
            queue: v,
            payload: find_payload(job_id),
            created_at: find_created_at(job_id),
            started_working_at: find_started_working_at(job_id))
    end
  end
end

#exists?(job) ⇒ Boolean

Parameters:

Returns:

  • (Boolean)


45
46
47
48
49
50
51
52
53
54
# File 'lib/eq-queueing/backends/leveldb.rb', line 45

def exists? job
  db.each do |k,v|
    if k.include?(QUEUE) && v == job.queue
      if find_payload(job_id_from_key(k)) == job.payload
        return true
      end
    end
  end
  false
end

#find_created_at(job_id) ⇒ Object



88
89
90
91
92
# File 'lib/eq-queueing/backends/leveldb.rb', line 88

def find_created_at job_id
  if serialized_time = db["#{CREATED_AT}:#{job_id}"]
    deserialize(serialized_time)
  end
end

#find_free_job_idObject

try as hard as you can to find a free slot



106
107
108
109
110
111
112
# File 'lib/eq-queueing/backends/leveldb.rb', line 106

def find_free_job_id
  loop do
    job_id = generate_id
    return job_id unless db.contains? "#{QUEUE}:#{job_id}"
    debug "#{job_id} is not free"
  end
end

#find_payload(job_id) ⇒ Object



80
81
82
83
84
85
86
# File 'lib/eq-queueing/backends/leveldb.rb', line 80

def find_payload job_id
  if raw = db["#{PAYLOAD}:#{job_id}"]
    deserialize db["#{PAYLOAD}:#{job_id}"]
  else
    nil
  end
end

#find_queue(job_id) ⇒ Object



76
77
78
# File 'lib/eq-queueing/backends/leveldb.rb', line 76

def find_queue job_id
  db["#{QUEUE}:#{job_id}"]
end

#find_started_working_at(job_id) ⇒ Object



94
95
96
97
98
# File 'lib/eq-queueing/backends/leveldb.rb', line 94

def find_started_working_at job_id
  if serialized_time = db["#{STARTED_WORKING_AT}:#{job_id}"]
    deserialize(serialized_time)
  end
end

#first_waitingObject



27
28
29
30
31
32
33
34
# File 'lib/eq-queueing/backends/leveldb.rb', line 27

def first_waiting
  db.each do |k,v|
    if k.include?(STARTED_WORKING_AT) && v == NOT_WORKING
      return job_id_from_key(k)
    end
  end
  nil
end

#generate_idObject

Note:

Maybe this is a stupid idea, but for now it kinda works :)

Time in milliseconds and 4 digit random



116
117
118
# File 'lib/eq-queueing/backends/leveldb.rb', line 116

def generate_id
  '%d%04d' % [(Time.now.to_f * 1000.0).to_i, Kernel.rand(1000)]
end

#job_id_from_key(key) ⇒ Object



100
101
102
103
# File 'lib/eq-queueing/backends/leveldb.rb', line 100

def job_id_from_key key
  prefix, job_id = *key.split(':')
  job_id
end

#push(job) ⇒ Object

Parameters:



18
19
20
21
22
23
24
25
# File 'lib/eq-queueing/backends/leveldb.rb', line 18

def push job
  job_id = find_free_job_id
  db["#{QUEUE}:#{job_id}"] = job.queue
  db["#{PAYLOAD}:#{job_id}"] = serialize(job.payload) unless job.payload.nil?
  db["#{CREATED_AT}:#{job_id}"] = serialize(Time.now)
  db["#{STARTED_WORKING_AT}:#{job_id}"] = NOT_WORKING
  job_id
end

#serialize(data) ⇒ Object



120
121
122
# File 'lib/eq-queueing/backends/leveldb.rb', line 120

def serialize data
  Marshal.dump(data)
end

#start_working(job_id) ⇒ Object



68
69
70
# File 'lib/eq-queueing/backends/leveldb.rb', line 68

def start_working job_id
  db["#{STARTED_WORKING_AT}:#{job_id}"] = serialize(Time.now)
end

#stop_working(job_id) ⇒ Object



72
73
74
# File 'lib/eq-queueing/backends/leveldb.rb', line 72

def stop_working job_id
  db["#{STARTED_WORKING_AT}:#{job_id}"] = NOT_WORKING
end

#working_iteratorObject



36
37
38
39
40
41
42
# File 'lib/eq-queueing/backends/leveldb.rb', line 36

def working_iterator
  db.each do |k,v|
    if k.include?(STARTED_WORKING_AT) && v != NOT_WORKING
      yield job_id_from_key(k), deserialize(v)
    end
  end
end