Class: Lowkiq::Queue::Fetch
- Inherits:
-
Object
- Object
- Lowkiq::Queue::Fetch
- Defined in:
- lib/lowkiq/queue/fetch.rb
Instance Method Summary collapse
- #fetch(redis, strategy, ids) ⇒ Object
-
#initialize(name) ⇒ Fetch
constructor
A new instance of Fetch.
- #morgue_fetch(redis, strategy, ids) ⇒ Object
Constructor Details
Instance Method Details
#fetch(redis, strategy, ids) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/lowkiq/queue/fetch.rb', line 8 def fetch(redis, strategy, ids) resp = redis.public_send strategy do ids.each do |id| redis.zscore @keys.all_ids_scored_by_perform_in_zset, id redis.zscore @keys.all_ids_scored_by_retry_count_zset, id redis.zrange @keys.payloads_zset(id), 0, -1, with_scores: true redis.hget @keys.errors_hash, id end end ids.zip(resp.each_slice(4)).map do |x| next if x[1][0].nil? # пропускаем id, если его уже нет в очереди res = { id: x[0], perform_in: x[1][0], retry_count: x[1][1], payloads: x[1][2].map { |(payload, score)| [Marshal.load_payload(payload), score] }, error: x[1][3], }.compact end.compact end |
#morgue_fetch(redis, strategy, ids) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/lowkiq/queue/fetch.rb', line 30 def morgue_fetch(redis, strategy, ids) resp = redis.public_send strategy do ids.each do |id| redis.zscore @keys.morgue_all_ids_scored_by_updated_at_zset, id redis.zrange @keys.morgue_payloads_zset(id), 0, -1, with_scores: true redis.hget @keys.morgue_errors_hash, id end end ids.zip(resp.each_slice(3)).map do |x| next if x[1][0].nil? # пропускаем id, если его уже нет в очереди { id: x[0], updated_at: x[1][0], payloads: x[1][1].map { |(payload, score)| [Marshal.load_payload(payload), score] }, error: x[1][2], }.compact end.compact end |