Class: Bumbleworks::Ruote
- Inherits:
-
Object
- Object
- Bumbleworks::Ruote
show all
- Defined in:
- lib/bumbleworks/ruote.rb
Defined Under Namespace
Classes: CancelTimeout, KillTimeout
Class Method Summary
collapse
Class Method Details
.cancel_all_processes!(options = {}) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/bumbleworks/ruote.rb', line 47
def cancel_all_processes!(options = {})
options[:timeout] ||= 5
unless options[:method] == :kill
options[:method] = :cancel
end
dashboard.processes.each do |ps|
dashboard.send(options[:method], ps.wfid)
end
start_time = Time.now
while dashboard.processes.count > 0
if (Time.now - start_time) > options[:timeout]
error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
raise error_type, "Process #{options[:method]} taking too long - #{dashboard.processes.count} processes remain"
end
sleep 0.1
end
end
|
.dashboard(options = {}) ⇒ Object
9
10
11
12
13
14
15
16
17
18
|
# File 'lib/bumbleworks/ruote.rb', line 9
def dashboard(options = {})
@dashboard ||= begin
context = if options[:start_worker] == true
::Ruote::Worker.new(storage)
else
storage
end
::Ruote::Dashboard.new(context)
end
end
|
.kill_all_processes!(options = {}) ⇒ Object
67
68
69
|
# File 'lib/bumbleworks/ruote.rb', line 67
def kill_all_processes!(options = {})
cancel_all_processes!(options.merge(:method => :kill))
end
|
.launch(name, *args) ⇒ Object
43
44
45
|
# File 'lib/bumbleworks/ruote.rb', line 43
def launch(name, *args)
dashboard.launch(dashboard.variables[name], *args)
end
|
.register_participants(&block) ⇒ Object
71
72
73
74
75
|
# File 'lib/bumbleworks/ruote.rb', line 71
def register_participants(&block)
dashboard.register(&block) if block
set_catchall_if_needed
dashboard.participant_list
end
|
.reset! ⇒ Object
97
98
99
100
101
102
103
104
105
|
# File 'lib/bumbleworks/ruote.rb', line 97
def reset!
if @storage
@storage.purge!
@storage.shutdown
end
@dashboard.shutdown if @dashboard && @dashboard.respond_to?(:shutdown)
@storage = nil
@dashboard = nil
end
|
.set_catchall_if_needed ⇒ Object
77
78
79
80
81
82
83
84
|
# File 'lib/bumbleworks/ruote.rb', line 77
def set_catchall_if_needed
last_participant = dashboard.participant_list.last
unless last_participant && last_participant.regex == "^.+$" &&
["Ruote::StorageParticipant", "Bumbleworks::StorageParticipant"].include?(last_participant.classname)
catchall = ::Ruote::ParticipantEntry.new(["^.+$", ["Bumbleworks::StorageParticipant", {}]])
dashboard.participant_list = dashboard.participant_list.push(catchall)
end
end
|
.start_worker!(options = {}) ⇒ Object
Start a worker, which will begin polling for messages in the workflow storage. You can run multiple workers if you are using a storage that supports them (such as Sequel or Redis, but not Hash) - they all just have to be connected to the same storage, and be able to instantiate participants in the participant list.
35
36
37
38
39
40
41
|
# File 'lib/bumbleworks/ruote.rb', line 35
def start_worker!(options = {})
@dashboard = nil
dashboard(:start_worker => true)
dashboard.noisy = options[:verbose] == true
dashboard.join if options[:join] == true
dashboard.worker
end
|
.storage ⇒ Object
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/bumbleworks/ruote.rb', line 86
def storage
@storage ||= begin
all_adapters = Bumbleworks.configuration.storage_adapters
adapter = all_adapters.detect do |adapter|
adapter.use?(Bumbleworks.storage)
end
raise UndefinedSetting, "Storage is missing or not supported. Supported: #{all_adapters.map(&:display_name).join(', ')}" unless adapter
adapter.driver.new(Bumbleworks.storage)
end
end
|