Class: Pipes::Store
- Inherits:
-
Object
- Object
- Pipes::Store
- Defined in:
- lib/pipes/store.rb
Overview
Stages are stored in Redis in the following manner: pipes:stages:stage_1 [‘ContentWriterStrategy’, args: [‘en-US’], …] pipes:stages:stage_2 [‘PublisherStrategy’, args: [‘en-US’]]
The jobs stored in Redis are Marshalled Ruby objects, so the structure is more-or-less arbitrary, though at a performance cost.
Jobs are queued up in the following steps
1. Strategies in stage n? No, look in stage n+1 until last stage.
Yes, shift off the next stage and queue up its jobs
2. Strategies run concurrently. Keep track of how many are currently running to
know when the next stage should be started.
Class Method Summary collapse
-
.add_pipe(stages, options = {}) ⇒ Object
Add a new set of stages to Redis.
-
.clear(stage) ⇒ Object
Clear a specific stage queue.
-
.clear_all ⇒ Object
Find all stage queues in Redis (even ones not configured), and clear them.
-
.done ⇒ Object
Register that a job has finished.
-
.next_stage ⇒ Object
Fire off the next available stage, if available.
-
.pending_jobs(stage) ⇒ Object
Jobs left in a given stage.
-
.remaining_jobs ⇒ Object
Jobs remaining before the next stage will be evaluated.
-
.run_stage(jobs) ⇒ Object
Actually enqueue the jobs.
-
.stages ⇒ Object
Stages specified in the configuration.
Class Method Details
.add_pipe(stages, options = {}) ⇒ Object
Add a new set of stages to Redis.
23 24 25 26 27 28 29 30 31 |
# File 'lib/pipes/store.rb', line 23 def self.add_pipe(stages, = {}) stages.each do |stage| stage[:jobs].each do |job| pending = pending_jobs(stage[:name]) pending << job if valid_for_queue?(stage[:name], pending, job, ) end end next_stage end |
.clear(stage) ⇒ Object
Clear a specific stage queue.
69 70 71 |
# File 'lib/pipes/store.rb', line 69 def self.clear(stage) pending_jobs(stage).clear end |
.clear_all ⇒ Object
Find all stage queues in Redis (even ones not configured), and clear them.
75 76 77 78 79 80 |
# File 'lib/pipes/store.rb', line 75 def self.clear_all stage_keys = Redis.current.keys "#{@redis_stages_key}:*" Redis.current.del *stage_keys unless stage_keys.empty? remaining_jobs.clear end |
.done ⇒ Object
Register that a job has finished.
61 62 63 64 65 |
# File 'lib/pipes/store.rb', line 61 def self.done if remaining_jobs.decrement == 0 next_stage end end |
.next_stage ⇒ Object
Fire off the next available stage, if available.
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/pipes/store.rb', line 35 def self.next_stage return unless remaining_jobs == 0 # Always start at the first stage, in case new stragies have been added mid-pipe stages.each do |stage| if !(jobs = pending_jobs(stage)).empty? run_stage(jobs) clear(stage) return end end end |
.pending_jobs(stage) ⇒ Object
Jobs left in a given stage.
90 91 92 |
# File 'lib/pipes/store.rb', line 90 def self.pending_jobs(stage) Redis::List.new(stage_key(stage), marshal: true) end |
.remaining_jobs ⇒ Object
Jobs remaining before the next stage will be evaluated
96 97 98 |
# File 'lib/pipes/store.rb', line 96 def self.remaining_jobs @remaining_jobs ||= Redis::Counter.new(@redis_remaining_key) end |
.run_stage(jobs) ⇒ Object
Actually enqueue the jobs.
50 51 52 53 54 55 56 57 |
# File 'lib/pipes/store.rb', line 50 def self.run_stage(jobs) remaining_jobs.clear remaining_jobs.incr(jobs.count) jobs.each do |job| Resque.enqueue(job[:class], *job[:args]) end end |
.stages ⇒ Object
Stages specified in the configuration.
84 85 86 |
# File 'lib/pipes/store.rb', line 84 def self.stages StageParser.new.stage_names end |