Class: Proxy::RemoteExecution::Ssh::MQTT::DispatcherActor

Inherits:
Concurrent::Actor::RestartingContext
  • Object
show all
Defined in:
lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb

Defined Under Namespace

Classes: JobDefinition, Tracker

Instance Method Summary collapse

Constructor Details

#initialize(clock, limit = nil) ⇒ DispatcherActor

Returns a new instance of DispatcherActor.



145
146
147
148
149
150
151
152
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 145

def initialize(clock, limit = nil)
  @tracker = Tracker.new(limit, clock)

  interval = Proxy::RemoteExecution::Ssh::Plugin.settings[:mqtt_ttl]
  @timer = Concurrent::TimerTask.new(execution_interval: interval) do
    reference.tell(:tick)
  end
end

Instance Method Details

#on_message(message) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 154

def on_message(message)
  action, arg = message
  # Enable the timer just in case anything in tracker raises an exception so
  # we can continue
  timer_on
  case action
  when :new
    _, uuid, topic, payload = message
    @tracker.new(uuid, topic, payload)
  when :resend
    @tracker.resend(arg)
  when :running
    @tracker.running(arg)
  when :done
    @tracker.done(arg)
  when :tick
    @tracker.process
  end
  timer_set(@tracker.needs_processing?)
end

#timer_offObject



183
184
185
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 183

def timer_off
  @timer.shutdown
end

#timer_onObject



179
180
181
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 179

def timer_on
  @timer.execute
end

#timer_set(on) ⇒ Object



175
176
177
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 175

def timer_set(on)
  on ? timer_on : timer_off
end