Class: PushmiPullyu::PreservationQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pushmi_pullyu/preservation_queue.rb

Overview

1) Create a sorted set in Redis (redis.io/topics/data-types). Call it preservation_queue

2) In GenericFile add an after_save that:

- determines a monotonically increasing "score". Obvious scores would be either the time in seconds/milliseconds
  or using something like redis INCR to create an atomic, increasing counter. It doesn't matter if 2 different
  noids ever have the same score, it only that scores generally increase over time.
- zadd preservation_queue score "noid" adds the noid and gives it the score from above.

3) Pushmi-pullyu pops elements out of the sorted set, lowest score to highest.

A sorted set will only ever contain a noid once, with whatever score it was last given. Because preservation_queue is sorted lowest score to highest, and because scores increase over time, a cascade of jobs/updates will cause a noid to keep “moving back” in the queue until it becomes the least recently updated noid in the queue, at which point it will be popped and preserved. Any further updates will trigger a new AIP build.

Defined Under Namespace

Classes: ConnectionError, MaxDepositAttemptsReached

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_url: 'redis://localhost:6379', pool_opts: { size: 1, timeout: 5 }, poll_interval: 10, age_at_least: 0, queue_name: 'dev:pmpy_queue') ⇒ PreservationQueue

Returns a new instance of PreservationQueue.

Raises:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 25

def initialize(redis_url: 'redis://localhost:6379',
               pool_opts: { size: 1, timeout: 5 },
               poll_interval: 10,
               age_at_least: 0,
               queue_name: 'dev:pmpy_queue')
  # we use a connection pool even though we're not (currently) threading
  # as it transparently provides for repairing connections if they are closed after long periods of inactivity
  @redis = ConnectionPool.new(pool_opts) do
    Redis.new(url: redis_url)
  end

  raise ConnectionError unless connected?

  @poll_interval = poll_interval
  @age_at_least = age_at_least
  @queue_name = queue_name
end

Class Method Details

.extra_wait_time(deposit_attempt) ⇒ Object



97
98
99
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 97

def self.extra_wait_time(deposit_attempt)
  (2**deposit_attempt) * PushmiPullyu.options[:first_failed_wait]
end

Instance Method Details

#add_entity_in_timeframe(entity) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 80

def add_entity_in_timeframe(entity)
  entity_attempts_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}"

  @redis.with do |connection|
    # separate information for priority information and queue
    deposit_attempt = connection.incr entity_attempts_key

    if deposit_attempt <= PushmiPullyu.options[:ingestion_attempts]
      connection.zadd @queue_name, Time.now.to_f + self.class.extra_wait_time(deposit_attempt),
                      entity.slice(:uuid, :type).to_json
    else
      connection.del entity_attempts_key
      raise MaxDepositAttemptsReached
    end
  end
end

#get_entity_ingestion_attempt(entity) ⇒ Object



73
74
75
76
77
78
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 73

def get_entity_ingestion_attempt(entity)
  entity_attempts_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}"
  @redis.with do |connection|
    return connection.get(entity_attempts_key).to_i
  end
end

#next_itemObject

Raises:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 43

def next_item
  raise ConnectionError unless connected?

  @redis.with do |conn|
    conn.watch(@queue_name) do |rd| # transactional mutation of the set is dependent on the set key
      element, score = rd.zrange(@queue_name, 0, 0, with_scores: true).first

      if element && ((Time.now.to_f - @age_at_least) >= score)
        rd.multi do |tx|
          tx.zrem(@queue_name, element) # remove the top element transactionally
        end

        return JSON.parse(element, { symbolize_names: true })
      else
        rd.unwatch # cancel the transaction since there was nothing in the queue
        return nil
      end
    end
  end
end

#wait_next_itemObject



64
65
66
67
68
69
70
71
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 64

def wait_next_item
  while PushmiPullyu.continue_polling?
    element = next_item
    return element if element.present?

    sleep @poll_interval
  end
end