Class: WorkflowRb::WorkflowHost
- Inherits:
-
Object
- Object
- WorkflowRb::WorkflowHost
- Defined in:
- lib/workflow_rb/services/workflow_host.rb
Instance Method Summary collapse
-
#initialize ⇒ WorkflowHost
constructor
A new instance of WorkflowHost.
- #publish_event(event_name, event_key, event_data) ⇒ Object
- #register_workflow(workflow_class) ⇒ Object
- #resume_workflow(id) ⇒ Object
- #start ⇒ Object
- #start_workflow(definition_id, version, data = nil) ⇒ Object
- #stop ⇒ Object
- #subscribe_event(workflow_id, step_id, event_name, event_key) ⇒ Object
- #suspend_workflow(id) ⇒ Object
- #terminate_workflow(id) ⇒ Object
- #use_logger(logger) ⇒ Object
- #use_persistence(persistence) ⇒ Object
Constructor Details
#initialize ⇒ WorkflowHost
Returns a new instance of WorkflowHost.
12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 12 def initialize @persistence = MemoryPersistenceProvider.new @queue_provider = SingleNodeQueueProvider.new @lock_provider = SingleNodeLockProvider.new @registry = WorkflowRegistry.new @is_shutdown = true; @logger = Logger.new(STDOUT) @logger.level = Logger::WARN @thread_count = Etc.nprocessors @threads = [] @poll_interval = 5 @poll_tick = 0 end |
Instance Method Details
#publish_event(event_name, event_key, event_data) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 104 def publish_event(event_name, event_key, event_data) if @is_shutdown raise Exception 'Host is not running' end @logger.info("Publishing event #{event_name} #{event_key}") subs = @persistence.get_subscriptions(event_name, event_key) subs.each do |sub| pub = EventPublication.new pub.id = SecureRandom.uuid pub.event_data = event_data pub.event_key = event_key pub.event_name = event_name pub.step_id = sub.step_id pub.workflow_id = sub.workflow_id @queue_provider.queue_for_publish(pub) @persistence.terminate_subscription(sub.id) end end |
#register_workflow(workflow_class) ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 34 def register_workflow(workflow_class) builder = WorkflowRb::WorkflowBuilder.new workflow_obj = workflow_class.new workflow_obj.build(builder) definition = builder.build(workflow_class::ID, workflow_class::VERSION, workflow_class::DATA_CLASS) @registry.register_workflow(definition) end |
#resume_workflow(id) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 144 def resume_workflow(id) if @lock_provider.acquire_lock(id) begin workflow = @persistence.get_workflow_instance(id) if workflow.status == WorkflowStatus::SUSPENDED workflow.status = WorkflowStatus::RUNNABLE @persistence.persist_workflow(workflow) return true else return false end rescue Exception => e @logger.error("#{e.} #{e.backtrace}") ensure @lock_provider.release_lock(id) end else false end end |
#start ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 71 def start if (@is_shutdown) @is_shutdown = false; @logger.info('Starting worker thread pool') @thread_count.times do @threads << Thread.new { run_workflows } end @threads << Thread.new { run_publications } @threads << Thread.new { house_keeping } end end |
#start_workflow(definition_id, version, data = nil) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 42 def start_workflow(definition_id, version, data = nil) wf_def = @registry.get_definition(definition_id, version) wf = WorkflowInstance.new wf.definition_id = definition_id wf.version = version wf.next_execution = Time.new wf.create_time = Time.new wf.status = WorkflowStatus::RUNNABLE if data wf.data = data else if wf_def.data_class wf.data = wf_def.data_class.new end end ep = ExecutionPointer.new ep.active = true ep.step_id = wf_def.initial_step ep.concurrent_fork = 1 wf.execution_pointers << ep id = @persistence.create_new_workflow(wf) @queue_provider.queue_for_processing(id) id end |
#stop ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 86 def stop @is_shutdown = true; @logger.info('Stopping worker thread pool') @threads.each do |thread| thread.join(10) end end |
#subscribe_event(workflow_id, step_id, event_name, event_key) ⇒ Object
94 95 96 97 98 99 100 101 102 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 94 def subscribe_event(workflow_id, step_id, event_name, event_key) @logger.info("Subscribing to event #{event_name} #{event_key} for workflow #{workflow_id} step #{step_id}") sub = EventSubscription.new sub.workflow_id = workflow_id sub.step_id = step_id sub.event_name = event_name sub.event_key = event_key @persistence.create_subscription(sub) end |
#suspend_workflow(id) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 123 def suspend_workflow(id) if @lock_provider.acquire_lock(id) begin workflow = @persistence.get_workflow_instance(id) if workflow.status == WorkflowStatus::RUNNABLE workflow.status = WorkflowStatus::SUSPENDED @persistence.persist_workflow(workflow) return true else return false end rescue Exception => e @logger.error("#{e.} #{e.backtrace}") ensure @lock_provider.release_lock(id) end else false end end |
#terminate_workflow(id) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 165 def terminate_workflow(id) if @lock_provider.acquire_lock(id) begin workflow = @persistence.get_workflow_instance(id) workflow.status = WorkflowStatus::TERMINATED @persistence.persist_workflow(workflow) return true rescue Exception => e @logger.error("#{e.} #{e.backtrace}") ensure @lock_provider.release_lock(id) end else false end end |
#use_logger(logger) ⇒ Object
26 27 28 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 26 def use_logger(logger) @logger = logger end |
#use_persistence(persistence) ⇒ Object
30 31 32 |
# File 'lib/workflow_rb/services/workflow_host.rb', line 30 def use_persistence(persistence) @persistence = persistence end |