Class: Gush::Workflow

Inherits:
Object
  • Object
show all
Defined in:
lib/gush/workflow.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, globals: nil, internal_state: {}, **kwargs) ⇒ Workflow

Returns a new instance of Workflow.



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/gush/workflow.rb', line 8

def initialize(*args, globals: nil, internal_state: {}, **kwargs)
  @arguments = args
  @kwargs = kwargs
  @globals = globals || {}

  @id = internal_state[:id] || id
  @jobs = internal_state[:jobs] || []
  @dependencies = internal_state[:dependencies] || []
  @persisted = internal_state[:persisted] || false
  @stopped = internal_state[:stopped] || false

  setup unless internal_state[:skip_setup]
end

Instance Attribute Details

#argumentsObject

Returns the value of attribute arguments.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def arguments
  @arguments
end

#dependenciesObject

Returns the value of attribute dependencies.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def dependencies
  @dependencies
end

#globalsObject

Returns the value of attribute globals.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def globals
  @globals
end

#idObject



208
209
210
# File 'lib/gush/workflow.rb', line 208

def id
  @id ||= client.next_free_workflow_id
end

#jobsObject

Returns the value of attribute jobs.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def jobs
  @jobs
end

#kwargsObject

Returns the value of attribute kwargs.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def kwargs
  @kwargs
end

#persistedObject

Returns the value of attribute persisted.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def persisted
  @persisted
end

#stoppedObject

Returns the value of attribute stopped.



5
6
7
# File 'lib/gush/workflow.rb', line 5

def stopped
  @stopped
end

Class Method Details

.create(*args, **kwargs) ⇒ Object



30
31
32
33
34
# File 'lib/gush/workflow.rb', line 30

def self.create(*args, **kwargs)
  flow = new(*args, **kwargs)
  flow.save
  flow
end

.descendantsObject



204
205
206
# File 'lib/gush/workflow.rb', line 204

def self.descendants
  ObjectSpace.each_object(Class).select { |klass| klass < self }
end

.find(id) ⇒ Object



22
23
24
# File 'lib/gush/workflow.rb', line 22

def self.find(id)
  Gush::Client.new.find_workflow(id)
end

.page(start = 0, stop = 99, order: :asc) ⇒ Object



26
27
28
# File 'lib/gush/workflow.rb', line 26

def self.page(start=0, stop=99, order: :asc)
  Gush::Client.new.workflows(start, stop, order: order)
end

Instance Method Details

#configure(*args, **kwargs) ⇒ Object



49
50
# File 'lib/gush/workflow.rb', line 49

def configure(*args, **kwargs)
end

#continueObject



36
37
38
39
40
41
42
43
# File 'lib/gush/workflow.rb', line 36

def continue
  client = Gush::Client.new
  failed_jobs = jobs.select(&:failed?)

  failed_jobs.each do |job|
    client.enqueue_job(id, job)
  end
end

#expire!(ttl = nil) ⇒ Object



64
65
66
# File 'lib/gush/workflow.rb', line 64

def expire!(ttl=nil)
  client.expire_workflow(self, ttl)
end

#failed?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/gush/workflow.rb', line 110

def failed?
  jobs.any?(&:failed?)
end

#find_job(name) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/gush/workflow.rb', line 86

def find_job(name)
  match_data = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(name.to_s)

  if match_data.nil?
    job = jobs.find { |node| node.klass.to_s == name.to_s }
  else
    job = jobs.find { |node| node.name.to_s == name.to_s }
  end

  job
end

#finished?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/gush/workflow.rb', line 98

def finished?
  jobs.all?(&:finished?)
end

#finished_atObject



176
177
178
# File 'lib/gush/workflow.rb', line 176

def finished_at
  last_job ? last_job.finished_at : nil
end

#initial_jobsObject



153
154
155
# File 'lib/gush/workflow.rb', line 153

def initial_jobs
  jobs.select(&:has_no_dependencies?)
end

#mark_as_persistedObject



68
69
70
# File 'lib/gush/workflow.rb', line 68

def mark_as_persisted
  @persisted = true
end

#mark_as_startedObject



72
73
74
# File 'lib/gush/workflow.rb', line 72

def mark_as_started
  @stopped = false
end

#mark_as_stoppedObject



52
53
54
# File 'lib/gush/workflow.rb', line 52

def mark_as_stopped
  @stopped = true
end

#persist!Object



60
61
62
# File 'lib/gush/workflow.rb', line 60

def persist!
  client.persist_workflow(self)
end

#reloadObject



144
145
146
147
148
149
150
151
# File 'lib/gush/workflow.rb', line 144

def reload
  flow = self.class.find(id)

  self.jobs = flow.jobs
  self.stopped = flow.stopped

  self
end

#resolve_dependenciesObject



76
77
78
79
80
81
82
83
84
# File 'lib/gush/workflow.rb', line 76

def resolve_dependencies
  @dependencies.each do |dependency|
    from = find_job(dependency[:from])
    to   = find_job(dependency[:to])

    to.incoming << dependency[:from]
    from.outgoing << dependency[:to]
  end
end

#run(klass, opts = {}) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/gush/workflow.rb', line 118

def run(klass, opts = {})
  node = klass.new({
    workflow_id: id,
    id: client.next_free_job_id(id, klass.to_s),
    params: (@globals || {}).merge(opts.fetch(:params, {})),
    queue: opts[:queue],
    wait: opts[:wait]
  })

  jobs << node

  deps_after = [*opts[:after]]

  deps_after.each do |dep|
    @dependencies << {from: dep.to_s, to: node.name.to_s }
  end

  deps_before = [*opts[:before]]

  deps_before.each do |dep|
    @dependencies << {from: node.name.to_s, to: dep.to_s }
  end

  node.name
end

#running?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/gush/workflow.rb', line 106

def running?
  started? && !finished?
end

#saveObject



45
46
47
# File 'lib/gush/workflow.rb', line 45

def save
  persist!
end

#start!Object



56
57
58
# File 'lib/gush/workflow.rb', line 56

def start!
  client.start_workflow(self)
end

#started?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/gush/workflow.rb', line 102

def started?
  !!started_at
end

#started_atObject



172
173
174
# File 'lib/gush/workflow.rb', line 172

def started_at
  first_job ? first_job.started_at : nil
end

#statusObject



157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/gush/workflow.rb', line 157

def status
  case
    when failed?
      :failed
    when running?
      :running
    when finished?
      :finished
    when stopped?
      :stopped
    else
      :pending
  end
end

#stopped?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/gush/workflow.rb', line 114

def stopped?
  stopped
end

#to_hashObject



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/gush/workflow.rb', line 180

def to_hash
  name = self.class.to_s
  {
    name: name,
    id: id,
    arguments: @arguments,
    kwargs: @kwargs,
    globals: @globals,
    dependencies: @dependencies,
    total: jobs.count,
    finished: jobs.count(&:finished?),
    klass: name,
    job_klasses: jobs.map(&:class).map(&:to_s).uniq,
    status: status,
    stopped: stopped,
    started_at: started_at,
    finished_at: finished_at
  }
end

#to_json(options = {}) ⇒ Object



200
201
202
# File 'lib/gush/workflow.rb', line 200

def to_json(options = {})
  Gush::JSON.encode(to_hash)
end