Class: Pipes::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/pipes/store.rb

Overview

Stages are stored in Redis in the following manner: pipes:stages:stage_1 [‘ContentWriterStrategy’, args: [‘en-US’], …] pipes:stages:stage_2 [‘PublisherStrategy’, args: [‘en-US’]]

The jobs stored in Redis are Marshalled Ruby objects, so the structure is more-or-less arbitrary, though at a performance cost.

Jobs are queued up in the following steps

1. Strategies in stage n? No, look in stage n+1 until last stage.
                          Yes, shift off the next stage and queue up its jobs
2. Strategies run concurrently. Keep track of how many are currently running to
   know when the next stage should be started.

Class Method Summary collapse

Class Method Details

.add_pipe(stages, options = {}) ⇒ Object

Add a new set of stages to Redis.



23
24
25
26
27
28
29
30
31
# File 'lib/pipes/store.rb', line 23

def self.add_pipe(stages, options = {})
  stages.each do |stage|
    stage[:jobs].each do |job|
      pending = pending_jobs(stage[:name])
      pending << job if valid_for_queue?(stage[:name], pending, job, options)
    end
  end
  next_stage
end

.clear(stage) ⇒ Object

Clear a specific stage queue.



69
70
71
# File 'lib/pipes/store.rb', line 69

def self.clear(stage)
  pending_jobs(stage).clear
end

.clear_allObject

Find all stage queues in Redis (even ones not configured), and clear them.



75
76
77
78
79
80
# File 'lib/pipes/store.rb', line 75

def self.clear_all
  stage_keys = Redis.current.keys "#{@redis_stages_key}:*"
  Redis.current.del *stage_keys unless stage_keys.empty?

  remaining_jobs.clear
end

.doneObject

Register that a job has finished.



61
62
63
64
65
# File 'lib/pipes/store.rb', line 61

def self.done
  if remaining_jobs.decrement == 0
    next_stage
  end
end

.next_stageObject

Fire off the next available stage, if available.



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pipes/store.rb', line 35

def self.next_stage
  return unless remaining_jobs == 0

  # Always start at the first stage, in case new stragies have been added mid-pipe
  stages.each do |stage|
    if !(jobs = pending_jobs(stage)).empty?
      run_stage(jobs)
      clear(stage)
      return
    end
  end
end

.pending_jobs(stage) ⇒ Object

Jobs left in a given stage.



90
91
92
# File 'lib/pipes/store.rb', line 90

def self.pending_jobs(stage)
  Redis::List.new(stage_key(stage), marshal: true)
end

.remaining_jobsObject

Jobs remaining before the next stage will be evaluated



96
97
98
# File 'lib/pipes/store.rb', line 96

def self.remaining_jobs
  @remaining_jobs ||= Redis::Counter.new(@redis_remaining_key)
end

.run_stage(jobs) ⇒ Object

Actually enqueue the jobs.



50
51
52
53
54
55
56
57
# File 'lib/pipes/store.rb', line 50

def self.run_stage(jobs)
  remaining_jobs.clear
  remaining_jobs.incr(jobs.count)

  jobs.each do |job|
    Resque.enqueue(job[:class], *job[:args])
  end
end

.stagesObject

Stages specified in the configuration.



84
85
86
# File 'lib/pipes/store.rb', line 84

def self.stages
  StageParser.new.stage_names
end