Class: WorkflowRb::WorkflowHost

Inherits:
Object
  • Object
show all
Defined in:
lib/workflow_rb/services/workflow_host.rb

Instance Method Summary collapse

Constructor Details

#initializeWorkflowHost

Returns a new instance of WorkflowHost.



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/workflow_rb/services/workflow_host.rb', line 12

def initialize
  @persistence = MemoryPersistenceProvider.new
  @queue_provider = SingleNodeQueueProvider.new
  @lock_provider = SingleNodeLockProvider.new
  @registry = WorkflowRegistry.new
  @is_shutdown = true;
  @logger = Logger.new(STDOUT)
  @logger.level = Logger::WARN
  @thread_count = Etc.nprocessors
  @threads = []
  @poll_interval = 5
  @poll_tick = 0
end

Instance Method Details

#publish_event(event_name, event_key, event_data) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/workflow_rb/services/workflow_host.rb', line 104

def publish_event(event_name, event_key, event_data)
  if @is_shutdown
    raise Exception 'Host is not running'
  end
  @logger.info("Publishing event #{event_name} #{event_key}")
  subs = @persistence.get_subscriptions(event_name, event_key)
  subs.each do |sub|
    pub = EventPublication.new
    pub.id = SecureRandom.uuid
    pub.event_data = event_data
    pub.event_key = event_key
    pub.event_name = event_name
    pub.step_id = sub.step_id
    pub.workflow_id = sub.workflow_id
    @queue_provider.queue_for_publish(pub)
    @persistence.terminate_subscription(sub.id)
  end
end

#register_workflow(workflow_class) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/workflow_rb/services/workflow_host.rb', line 34

def register_workflow(workflow_class)
  builder = WorkflowRb::WorkflowBuilder.new
  workflow_obj = workflow_class.new
  workflow_obj.build(builder)
  definition = builder.build(workflow_class::ID, workflow_class::VERSION, workflow_class::DATA_CLASS)
  @registry.register_workflow(definition)
end

#resume_workflow(id) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/workflow_rb/services/workflow_host.rb', line 144

def resume_workflow(id)
  if @lock_provider.acquire_lock(id)
    begin
      workflow = @persistence.get_workflow_instance(id)
      if workflow.status == WorkflowStatus::SUSPENDED
        workflow.status = WorkflowStatus::RUNNABLE
        @persistence.persist_workflow(workflow)
        return true
      else
        return false
      end
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    ensure
      @lock_provider.release_lock(id)
    end
  else
    false
  end
end

#startObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/workflow_rb/services/workflow_host.rb', line 71

def start
  if (@is_shutdown)
    @is_shutdown = false;
    @logger.info('Starting worker thread pool')

    @thread_count.times do
      @threads << Thread.new { run_workflows }
    end

    @threads << Thread.new { run_publications }
    @threads << Thread.new { house_keeping }

  end
end

#start_workflow(definition_id, version, data = nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/workflow_rb/services/workflow_host.rb', line 42

def start_workflow(definition_id, version, data = nil)
  wf_def = @registry.get_definition(definition_id, version)

  wf = WorkflowInstance.new
  wf.definition_id = definition_id
  wf.version = version
  wf.next_execution = Time.new
  wf.create_time = Time.new
  wf.status = WorkflowStatus::RUNNABLE

  if data
    wf.data = data
  else
    if wf_def.data_class
      wf.data = wf_def.data_class.new
    end
  end

  ep = ExecutionPointer.new
  ep.active = true
  ep.step_id = wf_def.initial_step
  ep.concurrent_fork = 1
  wf.execution_pointers << ep

  id = @persistence.create_new_workflow(wf)
  @queue_provider.queue_for_processing(id)
  id
end

#stopObject



86
87
88
89
90
91
92
# File 'lib/workflow_rb/services/workflow_host.rb', line 86

def stop
  @is_shutdown = true;
  @logger.info('Stopping worker thread pool')
  @threads.each do |thread|
    thread.join(10)
  end
end

#subscribe_event(workflow_id, step_id, event_name, event_key) ⇒ Object



94
95
96
97
98
99
100
101
102
# File 'lib/workflow_rb/services/workflow_host.rb', line 94

def subscribe_event(workflow_id, step_id, event_name, event_key)
  @logger.info("Subscribing to event #{event_name} #{event_key} for workflow #{workflow_id} step #{step_id}")
  sub = EventSubscription.new
  sub.workflow_id = workflow_id
  sub.step_id = step_id
  sub.event_name = event_name
  sub.event_key = event_key
  @persistence.create_subscription(sub)
end

#suspend_workflow(id) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/workflow_rb/services/workflow_host.rb', line 123

def suspend_workflow(id)
  if @lock_provider.acquire_lock(id)
    begin
      workflow = @persistence.get_workflow_instance(id)
      if workflow.status == WorkflowStatus::RUNNABLE
        workflow.status = WorkflowStatus::SUSPENDED
        @persistence.persist_workflow(workflow)
        return true
      else
        return false
      end
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    ensure
      @lock_provider.release_lock(id)
    end
  else
    false
  end
end

#terminate_workflow(id) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/workflow_rb/services/workflow_host.rb', line 165

def terminate_workflow(id)
  if @lock_provider.acquire_lock(id)
    begin
      workflow = @persistence.get_workflow_instance(id)
      workflow.status = WorkflowStatus::TERMINATED
      @persistence.persist_workflow(workflow)
      return true
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    ensure
      @lock_provider.release_lock(id)
    end
  else
    false
  end
end

#use_logger(logger) ⇒ Object



26
27
28
# File 'lib/workflow_rb/services/workflow_host.rb', line 26

def use_logger(logger)
  @logger = logger
end

#use_persistence(persistence) ⇒ Object



30
31
32
# File 'lib/workflow_rb/services/workflow_host.rb', line 30

def use_persistence(persistence)
  @persistence = persistence
end