Class: Massive::Step

Inherits:
Object
  • Object
show all
Includes:
Locking, MemoryConsumption, Notifications, Status, TimingSupport, Mongoid::Document, Mongoid::Timestamps
Defined in:
lib/massive/step.rb

Direct Known Subclasses

FileStep

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Notifications

#notifier, #notifier_id, #notify

Methods included from Locking

#locked?

Methods included from TimingSupport

#elapsed_time

Methods included from MemoryConsumption

#current_memory_consumption

Methods included from Status

#completed?, #enqueued?, #failed?, #started?

Class Method Details

.calculates_total_count_with(&block) ⇒ Object



31
32
33
# File 'lib/massive/step.rb', line 31

def self.calculates_total_count_with(&block)
  define_method(:calculate_total_count, &block)
end

.inherited(child) ⇒ Object



45
46
47
48
49
50
# File 'lib/massive/step.rb', line 45

def self.inherited(child)
  super

  child.job_class   self.job_class
  child.limit_ratio self.limit_ratio
end

.job_class(value = nil) ⇒ Object



40
41
42
43
# File 'lib/massive/step.rb', line 40

def self.job_class(value=nil)
  @job_class = value if value
  @job_class
end

.limit_ratio(value = nil) ⇒ Object



35
36
37
38
# File 'lib/massive/step.rb', line 35

def self.limit_ratio(value=nil)
  @limit_ratio = value if value
  @limit_ratio
end

.perform(process_id, step_id) ⇒ Object



23
24
25
# File 'lib/massive/step.rb', line 23

def self.perform(process_id, step_id)
  Massive::Process.find_step(process_id, step_id).work
end

.queueObject



27
28
29
# File 'lib/massive/step.rb', line 27

def self.queue
  :massive_step
end

Instance Method Details

#active_model_serializerObject



118
119
120
# File 'lib/massive/step.rb', line 118

def active_model_serializer
  super || Massive::StepSerializer
end

#calculate_total_countObject



114
115
116
# File 'lib/massive/step.rb', line 114

def calculate_total_count
  0
end

#completeObject



81
82
83
84
85
86
87
88
89
90
# File 'lib/massive/step.rb', line 81

def complete
  if completed_all_jobs? && !locked?(:complete)
    run_callbacks :complete do
      update_attributes finished_at: Time.now, failed_at: nil, memory_consumption: current_memory_consumption
      notify(:complete)
    end

    process.enqueue_next if execute_next?
  end
end

#completed_all_jobs?Boolean

Returns:

  • (Boolean)


92
93
94
95
96
# File 'lib/massive/step.rb', line 92

def completed_all_jobs?
  reload if persisted?

  jobs.all?(&:completed?)
end

#enqueueObject



55
56
57
58
# File 'lib/massive/step.rb', line 55

def enqueue
  Resque.enqueue(self.class, process.id.to_s, id.to_s)
  notify(:enqueued)
end

#limitObject



110
111
112
# File 'lib/massive/step.rb', line 110

def limit
  @limit ||= self.class.limit_ratio.find { |count, l| total_count >= count }.last
end

#process_stepObject



75
76
77
78
79
# File 'lib/massive/step.rb', line 75

def process_step
  self.jobs = number_of_jobs.times.map do |index|
    job_class.constantize.new(job_params(index))
  end
end

#processedObject



98
99
100
# File 'lib/massive/step.rb', line 98

def processed
  jobs.map(&:processed).sum
end

#processed_percentageObject



102
103
104
# File 'lib/massive/step.rb', line 102

def processed_percentage
  total_count && total_count > 0 ? processed.to_f / total_count : 0
end

#processing_timeObject



106
107
108
# File 'lib/massive/step.rb', line 106

def processing_time
  jobs.map(&:elapsed_time).sum
end

#start!Object



60
61
62
63
# File 'lib/massive/step.rb', line 60

def start!
  super
  notify(:start)
end

#workObject



65
66
67
68
69
70
71
72
73
# File 'lib/massive/step.rb', line 65

def work
  start!

  run_callbacks :work do
    process_step
  end

  complete
end