Class: Bumbleworks::Ruote

Inherits:
Object
  • Object
show all
Defined in:
lib/bumbleworks/ruote.rb

Defined Under Namespace

Classes: CancelTimeout, KillTimeout

Class Method Summary collapse

Class Method Details

.cancel_all_processes!(options = {}) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/bumbleworks/ruote.rb', line 47

def cancel_all_processes!(options = {})
  options[:timeout] ||= 5
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  dashboard.processes.each do |ps|
    dashboard.send(options[:method], ps.wfid)
  end

  start_time = Time.now
  while dashboard.processes.count > 0
    if (Time.now - start_time) > options[:timeout]
      error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
      raise error_type, "Process #{options[:method]} taking too long - #{dashboard.processes.count} processes remain"
    end
    sleep 0.1
  end
end

.dashboard(options = {}) ⇒ Object



9
10
11
12
13
14
15
16
17
18
# File 'lib/bumbleworks/ruote.rb', line 9

def dashboard(options = {})
  @dashboard ||= begin
    context = if options[:start_worker] == true
      ::Ruote::Worker.new(storage)
    else
      storage
    end
    ::Ruote::Dashboard.new(context)
  end
end

.kill_all_processes!(options = {}) ⇒ Object



67
68
69
# File 'lib/bumbleworks/ruote.rb', line 67

def kill_all_processes!(options = {})
  cancel_all_processes!(options.merge(:method => :kill))
end

.launch(name, *args) ⇒ Object



43
44
45
# File 'lib/bumbleworks/ruote.rb', line 43

def launch(name, *args)
  dashboard.launch(dashboard.variables[name], *args)
end

.register_participants(&block) ⇒ Object



71
72
73
74
75
# File 'lib/bumbleworks/ruote.rb', line 71

def register_participants(&block)
  dashboard.register(&block) if block
  set_catchall_if_needed
  dashboard.participant_list
end

.reset!Object



97
98
99
100
101
102
103
104
105
# File 'lib/bumbleworks/ruote.rb', line 97

def reset!
  if @storage
    @storage.purge!
    @storage.shutdown
  end
  @dashboard.shutdown if @dashboard && @dashboard.respond_to?(:shutdown)
  @storage = nil
  @dashboard = nil
end

.set_catchall_if_neededObject



77
78
79
80
81
82
83
84
# File 'lib/bumbleworks/ruote.rb', line 77

def set_catchall_if_needed
  last_participant = dashboard.participant_list.last
  unless last_participant && last_participant.regex == "^.+$" &&
      ["Ruote::StorageParticipant", "Bumbleworks::StorageParticipant"].include?(last_participant.classname)
    catchall = ::Ruote::ParticipantEntry.new(["^.+$", ["Bumbleworks::StorageParticipant", {}]])
    dashboard.participant_list = dashboard.participant_list.push(catchall)
  end
end

.start_worker!(options = {}) ⇒ Object

Start a worker, which will begin polling for messages in the workflow storage. You can run multiple workers if you are using a storage that supports them (such as Sequel or Redis, but not Hash) - they all just have to be connected to the same storage, and be able to instantiate participants in the participant list.

Parameters:

  • options (Hash) (defaults to: {})

    startup options for the worker

Options Hash (options):

  • :verbose (Boolean)

    whether or not to spin up a “noisy” worker, which will output all messages picked up

  • :join (Boolean)

    whether or not to join the worker thread; if false, this method will return, and the worker thread will be disconnected, and killed if the calling process exits.



35
36
37
38
39
40
41
# File 'lib/bumbleworks/ruote.rb', line 35

def start_worker!(options = {})
  @dashboard = nil
  dashboard(:start_worker => true)
  dashboard.noisy = options[:verbose] == true
  dashboard.join if options[:join] == true
  dashboard.worker
end

.storageObject



86
87
88
89
90
91
92
93
94
95
# File 'lib/bumbleworks/ruote.rb', line 86

def storage
  @storage ||= begin
    all_adapters = Bumbleworks.configuration.storage_adapters
    adapter = all_adapters.detect do |adapter|
      adapter.use?(Bumbleworks.storage)
    end
    raise UndefinedSetting, "Storage is missing or not supported.  Supported: #{all_adapters.map(&:display_name).join(', ')}" unless adapter
    adapter.driver.new(Bumbleworks.storage)
  end
end