Class: Rworkflow::Flow
- Inherits:
-
Object
- Object
- Rworkflow::Flow
- Defined in:
- lib/rworkflow/flow.rb
Direct Known Subclasses
Constant Summary collapse
- STATE_SUCCESSFUL =
:successful
- STATE_FAILED =
:failed
- STATES_TERMINAL =
[STATE_FAILED, STATE_SUCCESSFUL].freeze
- STATES_FAILED =
[STATE_FAILED].freeze
- REDIS_NS =
'flow'.freeze
- WORKFLOW_REGISTRY =
"#{REDIS_NS}:__registry".freeze
Instance Attribute Summary collapse
-
#id ⇒ Object
Returns the value of attribute id.
-
#lifecycle ⇒ Object
Returns the value of attribute lifecycle.
Class Method Summary collapse
- .all(options = {}) ⇒ Object
- .cleanup(id) ⇒ Object
- .create(lifecycle, name = '', options = {}) ⇒ Object
- .failure?(state) ⇒ Boolean
- .generate_id(workflow_name) ⇒ Object
- .get_private_workflows(options = {}) ⇒ Object
- .get_public_workflows(options = {}) ⇒ Object
- .load(id, klass = nil) ⇒ Object
- .read_flow_class(id) ⇒ Object
- .register(workflow) ⇒ Object
- .registered?(workflow) ⇒ Boolean
- .registry ⇒ Object
- .serializer ⇒ Object
- .terminal?(state) ⇒ Boolean
- .unregister(workflow) ⇒ Object
Instance Method Summary collapse
- #cleaned_up? ⇒ Boolean
- #cleanup ⇒ Object
- #count(state) ⇒ Object
- #counters ⇒ Object
- #created_at ⇒ Object
- #expected_duration ⇒ Object
- #failed? ⇒ Boolean
- #fetch(fetcher_id, state_name) ⇒ Object
- #finish_time ⇒ Object
- #finished? ⇒ Boolean
- #get(key, default = nil) ⇒ Object
- #get_state_cardinality(state_name) ⇒ Object
- #incr(key, value = 1) ⇒ Object
-
#initialize(id) ⇒ Flow
constructor
A new instance of Flow.
- #list_objects(state_name, limit = -1)) ⇒ Object
- #log(from_state, transition, num_objects) ⇒ Object
- #logger ⇒ Object
- #logging? ⇒ Boolean
- #logs ⇒ Object
- #metadata_string ⇒ Object
- #name ⇒ Object
- #name=(name) ⇒ Object
- #public? ⇒ Boolean
- #set(key, value) ⇒ Object
- #start(objects) ⇒ Object
- #start_time ⇒ Object
- #started? ⇒ Boolean
- #states_list ⇒ Object
- #status ⇒ Object
- #successful? ⇒ Boolean
- #terminate ⇒ Object
- #total_objects(counters = nil) ⇒ Object
- #total_objects_failed(counters = nil) ⇒ Object
- #total_objects_processed(counters = nil) ⇒ Object
- #transition(from_state, name, objects) ⇒ Object
- #valid? ⇒ Boolean
Constructor Details
#initialize(id) ⇒ Flow
Returns a new instance of Flow.
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rworkflow/flow.rb', line 14 def initialize(id) @id = id @redis_key = "#{REDIS_NS}:#{id}" @storage = RedisRds::Hash.new(@redis_key) @flow_data = RedisRds::Hash.new("#{@redis_key}__data") @processing = RedisRds::Hash.new("#{@redis_key}__processing") load_lifecycle end |
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
11 12 13 |
# File 'lib/rworkflow/flow.rb', line 11 def id @id end |
#lifecycle ⇒ Object
Returns the value of attribute lifecycle.
12 13 14 |
# File 'lib/rworkflow/flow.rb', line 12 def lifecycle @lifecycle end |
Class Method Details
.all(options = {}) ⇒ Object
393 394 395 |
# File 'lib/rworkflow/flow.rb', line 393 def all( = {}) return registry.all(.reverse_merge(parent_class: self)).map { |id| load(id) } end |
.cleanup(id) ⇒ Object
380 381 382 383 |
# File 'lib/rworkflow/flow.rb', line 380 def cleanup(id) workflow = new(id) workflow.cleanup end |
.create(lifecycle, name = '', options = {}) ⇒ Object
359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/rworkflow/flow.rb', line 359 def create(lifecycle, name = '', = {}) id = generate_id(name) workflow = new(id) workflow.name = name workflow.lifecycle = lifecycle workflow.set(:created_at, Time.now.to_i) workflow.set(:public, .fetch(:public, false)) workflow.set(:logging, .fetch(:logging, true)) register(workflow) return workflow end |
.failure?(state) ⇒ Boolean
436 437 438 |
# File 'lib/rworkflow/flow.rb', line 436 def failure?(state) return self::STATES_FAILED.include?(state) end |
.generate_id(workflow_name) ⇒ Object
373 374 375 376 377 |
# File 'lib/rworkflow/flow.rb', line 373 def generate_id(workflow_name) now = Time.now.to_f random = Random.new(now) return "#{name}__#{workflow_name}__#{(Time.now.to_f * 1000).to_i}__#{random.rand(now).to_i}" end |
.get_private_workflows(options = {}) ⇒ Object
389 390 391 |
# File 'lib/rworkflow/flow.rb', line 389 def get_private_workflows( = {}) return registry.private_flows(.reverse_merge(parent_class: self)).map { |id| load(id) } end |
.get_public_workflows(options = {}) ⇒ Object
385 386 387 |
# File 'lib/rworkflow/flow.rb', line 385 def get_public_workflows( = {}) return registry.public_flows(.reverse_merge(parent_class: self)).map { |id| load(id) } end |
.load(id, klass = nil) ⇒ Object
397 398 399 400 401 402 403 |
# File 'lib/rworkflow/flow.rb', line 397 def load(id, klass = nil) workflow = nil klass = read_flow_class(id) if klass.nil? workflow = klass.new(id) if klass.respond_to?(:new) return workflow end |
.read_flow_class(id) ⇒ Object
405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/rworkflow/flow.rb', line 405 def read_flow_class(id) klass = nil raw_class = id.split('__').first if !raw_class.nil? klass = begin raw_class.constantize rescue NameError => _ Rails.logger.warn("Unknown flow class for workflow id #{id}") nil end end return klass end |
.register(workflow) ⇒ Object
424 425 426 |
# File 'lib/rworkflow/flow.rb', line 424 def register(workflow) registry.add(workflow) end |
.registered?(workflow) ⇒ Boolean
420 421 422 |
# File 'lib/rworkflow/flow.rb', line 420 def registered?(workflow) return registry.include?(workflow) end |
.registry ⇒ Object
440 441 442 443 444 |
# File 'lib/rworkflow/flow.rb', line 440 def registry return @registry ||= begin FlowRegistry.new(Rworkflow::VERSION.to_s) end end |
.serializer ⇒ Object
446 447 448 |
# File 'lib/rworkflow/flow.rb', line 446 def serializer YAML end |
.terminal?(state) ⇒ Boolean
432 433 434 |
# File 'lib/rworkflow/flow.rb', line 432 def terminal?(state) return self::STATES_TERMINAL.include?(state) end |
.unregister(workflow) ⇒ Object
428 429 430 |
# File 'lib/rworkflow/flow.rb', line 428 def unregister(workflow) registry.remove(workflow) end |
Instance Method Details
#cleaned_up? ⇒ Boolean
204 205 206 |
# File 'lib/rworkflow/flow.rb', line 204 def cleaned_up? return states_list.all? { |name| !get_list(name).exists? } end |
#cleanup ⇒ Object
296 297 298 299 300 301 302 303 304 |
# File 'lib/rworkflow/flow.rb', line 296 def cleanup return if Rails.env.test? states_cleanup logger.delete @processing.delete self.class.unregister(self) @flow_data.delete @storage.delete end |
#count(state) ⇒ Object
89 90 91 |
# File 'lib/rworkflow/flow.rb', line 89 def count(state) return get_list(state).size end |
#counters ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rworkflow/flow.rb', line 93 def counters the_counters = @storage.get(:counters) if !the_counters.nil? the_counters = begin self.class.serializer.load(the_counters) rescue => e Rails.logger.error("Error loading stored flow counters: #{e.}") nil end end return the_counters || counters! end |
#created_at ⇒ Object
57 58 59 |
# File 'lib/rworkflow/flow.rb', line 57 def created_at return @created_at ||= begin Time.zone.at(get(:created_at, 0)) end end |
#expected_duration ⇒ Object
81 82 83 |
# File 'lib/rworkflow/flow.rb', line 81 def expected_duration return Float::INFINITY end |
#failed? ⇒ Boolean
349 350 351 352 |
# File 'lib/rworkflow/flow.rb', line 349 def failed? return false if !finished? return total_objects_failed > 0 end |
#fetch(fetcher_id, state_name) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/rworkflow/flow.rb', line 127 def fetch(fetcher_id, state_name) @processing.set(fetcher_id, 1) list = get_state_list(state_name) unless list.nil? failed = [] cardinality = @lifecycle.states[state_name].cardinality cardinality = get(:start_count).to_i if cardinality == Lifecycle::CARDINALITY_ALL_STARTED force_list_complete = @lifecycle.states[state_name].policy == State::STATE_POLICY_WAIT raw_objects = list.lpop(cardinality, force_list_complete) unless raw_objects.empty? objects = raw_objects.map do |raw_object| begin self.class.serializer.load(raw_object) rescue StandardError => _ failed << raw_object nil end end.compact @processing.set(fetcher_id, objects.size) unless failed.empty? push(failed, STATE_FAILED) Rails.logger.error("Failed to parse #{failed.size} in workflow #{@id} for fetcher id #{fetcher_id} at state #{state_name}") end yield(objects) if block_given? end end ensure @processing.remove(fetcher_id) terminate if finished? end |
#finish_time ⇒ Object
77 78 79 |
# File 'lib/rworkflow/flow.rb', line 77 def finish_time return Time.zone.at(get(:finish_time, 0)) end |
#finished? ⇒ Boolean
41 42 43 44 45 46 47 48 |
# File 'lib/rworkflow/flow.rb', line 41 def finished? return false unless started? total = self.counters.reduce(0) do |sum, pair| self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i) end return total == 0 end |
#get(key, default = nil) ⇒ Object
268 269 270 271 272 273 |
# File 'lib/rworkflow/flow.rb', line 268 def get(key, default = nil) value = @flow_data.get(key) value = value.nil? ? default : self.class.serializer.load(value) return value end |
#get_state_cardinality(state_name) ⇒ Object
258 259 260 261 262 |
# File 'lib/rworkflow/flow.rb', line 258 def get_state_cardinality(state_name) cardinality = @lifecycle.states[state_name].cardinality cardinality = self.get(:start_count).to_i if cardinality == Rworkflow::Lifecycle::CARDINALITY_ALL_STARTED return cardinality end |
#incr(key, value = 1) ⇒ Object
275 276 277 |
# File 'lib/rworkflow/flow.rb', line 275 def incr(key, value = 1) return @flow_data.incrby(key, value) end |
#list_objects(state_name, limit = -1)) ⇒ Object
160 161 162 163 |
# File 'lib/rworkflow/flow.rb', line 160 def list_objects(state_name, limit = -1) list = get_list(state_name) return list.get(0, limit).map { |object| self.class.serializer.load(object) } end |
#log(from_state, transition, num_objects) ⇒ Object
234 235 236 |
# File 'lib/rworkflow/flow.rb', line 234 def log(from_state, transition, num_objects) logger.incrby("#{from_state}__#{transition}", num_objects.to_i) if logging? end |
#logger ⇒ Object
238 239 240 241 242 |
# File 'lib/rworkflow/flow.rb', line 238 def logger return @logger ||= begin RedisRds::Hash.new("#{@redis_key}__logger") end end |
#logging? ⇒ Boolean
230 231 232 |
# File 'lib/rworkflow/flow.rb', line 230 def logging? return get(:logging, false) end |
#logs ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/rworkflow/flow.rb', line 244 def logs logs = {} if valid? && logging? state_transition_counters = logger.getall state_transition_counters.each do |state_transition, counter| state, transition = state_transition.split('__') logs[state] = {} unless logs.key?(state) logs[state][transition] = counter.to_i end end return logs end |
#metadata_string ⇒ Object
200 201 202 |
# File 'lib/rworkflow/flow.rb', line 200 def return "Rworkflow: #{self.name}" end |
#name ⇒ Object
65 66 67 |
# File 'lib/rworkflow/flow.rb', line 65 def name return get(:name, @id) end |
#name=(name) ⇒ Object
69 70 71 |
# File 'lib/rworkflow/flow.rb', line 69 def name=(name) return set(:name, name) end |
#public? ⇒ Boolean
354 355 356 |
# File 'lib/rworkflow/flow.rb', line 354 def public? return @public ||= begin get(:public, false) end end |
#set(key, value) ⇒ Object
264 265 266 |
# File 'lib/rworkflow/flow.rb', line 264 def set(key, value) @flow_data.set(key, self.class.serializer.dump(value)) end |
#start(objects) ⇒ Object
312 313 314 315 316 317 318 |
# File 'lib/rworkflow/flow.rb', line 312 def start(objects) objects = Array.wrap(objects) set(:start_time, Time.now.to_i) set(:start_count, objects.size) push(objects, lifecycle.initial) log(lifecycle.initial, 'initial', objects.size) end |
#start_time ⇒ Object
73 74 75 |
# File 'lib/rworkflow/flow.rb', line 73 def start_time return Time.zone.at(get(:start_time, 0)) end |
#started? ⇒ Boolean
61 62 63 |
# File 'lib/rworkflow/flow.rb', line 61 def started? return !get(:start_time).nil? end |
#states_list ⇒ Object
208 209 210 211 212 213 |
# File 'lib/rworkflow/flow.rb', line 208 def states_list states = self.class::STATES_TERMINAL states += @lifecycle.states.keys if valid? return states end |
#status ⇒ Object
50 51 52 53 54 55 |
# File 'lib/rworkflow/flow.rb', line 50 def status status = 'Running' status = successful? ? 'Finished' : 'Failed' if finished? return status end |
#successful? ⇒ Boolean
344 345 346 347 |
# File 'lib/rworkflow/flow.rb', line 344 def successful? return false if !finished? return !failed? end |
#terminate ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/rworkflow/flow.rb', line 178 def terminate mutex = RedisRds::Mutex.new(self.id) mutex.synchronize do if !self.cleaned_up? set(:finish_time, Time.now.to_i) post_process if self.public? the_counters = counters! the_counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done @storage.setnx(:counters, self.class.serializer.dump(the_counters)) states_cleanup else self.cleanup end end end end |
#total_objects(counters = nil) ⇒ Object
330 331 332 |
# File 'lib/rworkflow/flow.rb', line 330 def total_objects(counters = nil) return (counters || self.counters).reduce(0) { |sum, pair| sum + pair[1] } end |
#total_objects_failed(counters = nil) ⇒ Object
334 335 336 337 338 339 340 341 342 |
# File 'lib/rworkflow/flow.rb', line 334 def total_objects_failed(counters = nil) return (counters || self.counters).reduce(0) do |sum, pair| if self.class.failure?(pair[0]) sum + pair[1] else sum end end end |
#total_objects_processed(counters = nil) ⇒ Object
320 321 322 323 324 325 326 327 328 |
# File 'lib/rworkflow/flow.rb', line 320 def total_objects_processed(counters = nil) return (counters || self.counters).reduce(0) do |sum, pair| if self.class.terminal?(pair[0]) sum + pair[1] else sum end end end |
#transition(from_state, name, objects) ⇒ Object
215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/rworkflow/flow.rb', line 215 def transition(from_state, name, objects) objects = Array.wrap(objects) to_state = begin lifecycle.transition(from_state, name) rescue Rworkflow::StateError => e Rails.logger.error("Error transitioning: #{e}") nil end if !to_state.nil? push(objects, to_state) log(from_state, name, objects.size) end end |
#valid? ⇒ Boolean
85 86 87 |
# File 'lib/rworkflow/flow.rb', line 85 def valid? return !@lifecycle.nil? end |