Class: Proxy::RemoteExecution::Ssh::MQTT::DispatcherActor::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb

Instance Method Summary collapse

Constructor Details

#initialize(limit, clock) ⇒ Tracker

Returns a new instance of Tracker.



61
62
63
64
65
66
67
68
69
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 61

def initialize(limit, clock)
  @clock = clock
  @limit = limit
  @jobs = {}
  @pending = []
  @running = Set.new
  @hot = Set.new
  @cold = Set.new
end

Instance Method Details

#dispatch_pendingObject



110
111
112
113
114
115
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 110

def dispatch_pending
  pending_count.times do
    mqtt_notify(@pending.first)
    @hot << @pending.shift
  end
end

#done(uuid) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 89

def done(uuid)
  @jobs.delete(uuid)
  [@pending, @running, @hot, @cold].each do |source|
    source.delete(uuid)
  end
  dispatch_pending
end

#mqtt_notify(uuid) ⇒ Object



125
126
127
128
129
130
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 125

def mqtt_notify(uuid)
  job = @jobs[uuid]
  return if job.nil?

  Proxy::RemoteExecution::Ssh::MQTT.publish(job.topic, JSON.dump(job.payload))
end

#needs_processing?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 97

def needs_processing?
  pending_count.positive? || @hot.any? || @cold.any?
end

#new(uuid, topic, payload) ⇒ Object



71
72
73
74
75
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 71

def new(uuid, topic, payload)
  @jobs[uuid] = JobDefinition.new(uuid, topic, payload)
  @pending << uuid
  dispatch_pending
end

#pending_countObject



101
102
103
104
105
106
107
108
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 101

def pending_count
  pending = @pending.count
  return pending if @limit.nil?

  running = [@running, @hot, @cold].map(&:count).sum
  capacity = @limit - running
  pending > capacity ? capacity : pending
end

#processObject



117
118
119
120
121
122
123
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 117

def process
  @cold.each { |uuid| schedule_resend(uuid) }
  @cold = @hot
  @hot = Set.new

  dispatch_pending
end

#resend(uuid) ⇒ Object



82
83
84
85
86
87
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 82

def resend(uuid)
  return unless @jobs[uuid]

  @pending << uuid
  dispatch_pending
end

#resend_intervalObject



140
141
142
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 140

def resend_interval
  settings[:mqtt_resend_interval]
end

#running(uuid) ⇒ Object



77
78
79
80
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 77

def running(uuid)
  [@pending, @hot, @cold].each { |source| source.delete(uuid) }
  @running << uuid
end

#schedule_resend(uuid) ⇒ Object



136
137
138
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 136

def schedule_resend(uuid)
  @clock.ping(Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance, resend_interval, uuid, :resend)
end

#settingsObject



132
133
134
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 132

def settings
  Proxy::RemoteExecution::Ssh::Plugin.settings
end