Class: Proxy::Dynflow::Runner::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/smart_proxy_dynflow/runner/dispatcher.rb

Defined Under Namespace

Classes: RunnerActor

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(clock, logger) ⇒ Dispatcher

Returns a new instance of Dispatcher.



117
118
119
120
121
122
123
124
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 117

def initialize(clock, logger)
  @mutex  = Mutex.new
  @clock  = clock
  @logger = logger
  @ticker = ::Proxy::Dynflow::Ticker.spawn('dispatcher-ticker', @clock, @logger, refresh_interval)
  @runner_actors = {}
  @runner_suspended_actions = {}
end

Instance Attribute Details

#tickerObject (readonly)

Returns the value of attribute ticker.



115
116
117
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 115

def ticker
  @ticker
end

Class Method Details

.instanceObject



8
9
10
11
12
13
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 8

def self.instance
  return @instance if @instance

  @instance = new(Proxy::Dynflow::Core.world.clock,
                  Proxy::Dynflow::Core.world.logger)
end

Instance Method Details

#external_event(runner_id, external_event) ⇒ Object



163
164
165
166
167
168
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 163

def external_event(runner_id, external_event)
  synchronize do
    runner_actor = @runner_actors[runner_id]
    runner_actor&.tell([:external_event, external_event])
  end
end

#finish(runner_id) ⇒ Object



155
156
157
158
159
160
161
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 155

def finish(runner_id)
  synchronize do
    _finish(runner_id)
  rescue => e
    _handle_command_exception(runner_id, e, false)
  end
end

#handle_command_exception(*args) ⇒ Object



176
177
178
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 176

def handle_command_exception(*args)
  synchronize { _handle_command_exception(*args) }
end

#kill(runner_id) ⇒ Object



146
147
148
149
150
151
152
153
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 146

def kill(runner_id)
  synchronize do
    runner_actor = @runner_actors[runner_id]
    runner_actor&.tell(:kill)
  rescue => e
    _handle_command_exception(runner_id, e, false)
  end
end

#refresh_intervalObject



180
181
182
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 180

def refresh_interval
  1
end

#refresh_output(runner_id) ⇒ Object



170
171
172
173
174
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 170

def refresh_output(runner_id)
  synchronize do
    @runner_actors[runner_id]&.tell([:refresh_output])
  end
end

#start(suspended_action, runner) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 130

def start(suspended_action, runner)
  synchronize do
    raise "Actor with runner id #{runner.id} already exists" if @runner_actors[runner.id]

    runner.logger = @logger
    runner_actor = RunnerActor.spawn("runner-actor-#{runner.id}", self, suspended_action, runner, @clock, @logger)
    @runner_actors[runner.id] = runner_actor
    @runner_suspended_actions[runner.id] = suspended_action
    runner_actor.tell(:start_runner)
    return runner.id
  rescue => e
    _handle_command_exception(runner.id, e)
    return nil
  end
end

#synchronize(&block) ⇒ Object



126
127
128
# File 'lib/smart_proxy_dynflow/runner/dispatcher.rb', line 126

def synchronize(&block)
  @mutex.synchronize(&block)
end