Class: Orchestrator::Control
- Inherits:
-
Object
- Object
- Orchestrator::Control
- Includes:
- Singleton
- Defined in:
- lib/orchestrator/control.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
-
#ready ⇒ Object
readonly
Returns the value of attribute ready.
-
#ready_promise ⇒ Object
readonly
Returns the value of attribute ready_promise.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
-
#zones ⇒ Object
readonly
Returns the value of attribute zones.
Instance Method Summary collapse
-
#boot(*args) ⇒ Object
Boot the control system, running all defined modules.
-
#initialize ⇒ Control
constructor
1.
-
#load(mod_settings) ⇒ Object
Load the modules on the loop references in round robin This method is thread safe.
-
#loaded?(mod_id) ⇒ Boolean
Checks if a module with the ID specified is loaded.
- #log_unhandled_exception(*args) ⇒ Object
-
#mount ⇒ Object
Start the control reactor.
- #notify_ready ⇒ Object
- #reload(dep_id) ⇒ Object
-
#start(mod_id) ⇒ Object
Starts a module running.
-
#stop(mod_id) ⇒ Object
Stops a module running.
-
#unload(mod_id) ⇒ Object
Stop the module gracefully Then remove it from @loaded.
-
#update(mod_id) ⇒ Object
Unload then Get a fresh version of the settings from the database load the module.
Constructor Details
#initialize ⇒ Control
-
Load the modules allocated to this node
-
Allocate modules to CPUs
-
Modules load dependencies as required
-
Logics are streamed in after devices and services
-
Logic modules will fetch their system when they interact with other modules.
Devices and services do not have a system associated with them
This makes systems very loosely coupled to the modules
which should make distributing the system slightly simpler
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/orchestrator/control.rb', line 23 def initialize # critical sections @critical = ::Mutex.new @loaded = ::ThreadSafe::Cache.new @zones = ::ThreadSafe::Cache.new @loader = DependencyManager.instance @loop = ::Libuv::Loop.default @exceptions = method(:log_unhandled_exception) @ready = false @ready_defer = @loop.defer @ready_promise = @ready_defer.promise # We keep track of unloaded modules so we can optimise loading them again @unloaded = Set.new if Rails.env.production? logger = ::Logger.new(::Rails.root.join('log/control.log').to_s, 10, 4194304) else logger = ::Logger.new(STDOUT) end logger.formatter = proc { |severity, datetime, progname, msg| "#{datetime.strftime("%d/%m/%Y @ %I:%M%p")} #{severity}: #{progname} - #{msg}\n" } @logger = ::ActiveSupport::TaggedLogging.new(logger) end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
51 52 53 |
# File 'lib/orchestrator/control.rb', line 51 def logger @logger end |
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
51 52 53 |
# File 'lib/orchestrator/control.rb', line 51 def loop @loop end |
#ready ⇒ Object (readonly)
Returns the value of attribute ready.
51 52 53 |
# File 'lib/orchestrator/control.rb', line 51 def ready @ready end |
#ready_promise ⇒ Object (readonly)
Returns the value of attribute ready_promise.
51 52 53 |
# File 'lib/orchestrator/control.rb', line 51 def ready_promise @ready_promise end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
51 52 53 |
# File 'lib/orchestrator/control.rb', line 51 def threads @threads end |
#zones ⇒ Object (readonly)
Returns the value of attribute zones.
51 52 53 |
# File 'lib/orchestrator/control.rb', line 51 def zones @zones end |
Instance Method Details
#boot(*args) ⇒ Object
Boot the control system, running all defined modules
88 89 90 91 |
# File 'lib/orchestrator/control.rb', line 88 def boot(*args) # Only boot if running as a server Thread.new &method(:load_all) end |
#load(mod_settings) ⇒ Object
Load the modules on the loop references in round robin This method is thread safe.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/orchestrator/control.rb', line 95 def load(mod_settings) mod_id = mod_settings.id.to_sym defer = @loop.defer mod = @loaded[mod_id] if mod defer.resolve(mod) else defer.resolve( @loader.load(mod_settings.dependency).then(proc { |klass| # We will always be on the default loop here thread = @selector.next # We'll resolve the promise if the module loads on the deferred thread defer = @loop.defer thread.schedule do defer.resolve(start_module(thread, klass, mod_settings)) end # update the module cache defer.promise.then do |mod_manager| @loaded[mod_id] = mod_manager # Transfer any existing observers over to the new thread if @ready && @unloaded.include?(mod_id) @unloaded.delete(mod_id) new_thread = thread.observer @threads.each do |thr| thr.observer.move(mod_id, new_thread) end end # Return the manager mod_manager end defer.promise }, @exceptions) ) end defer.promise end |
#loaded?(mod_id) ⇒ Boolean
Checks if a module with the ID specified is loaded
139 140 141 |
# File 'lib/orchestrator/control.rb', line 139 def loaded?(mod_id) @loaded[mod_id.to_sym] end |
#log_unhandled_exception(*args) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/orchestrator/control.rb', line 222 def log_unhandled_exception(*args) msg = '' err = args[-1] if err && err.respond_to?(:backtrace) msg << "exception: #{err.} (#{args[0..-2]})" msg << "\n#{err.backtrace.join("\n")}" if err.respond_to?(:backtrace) && err.backtrace else msg << "unhandled exception: #{args}" end @logger.error msg ::Libuv::Q.reject(@loop, msg) end |
#mount ⇒ Object
Start the control reactor
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/orchestrator/control.rb', line 55 def mount return @server.loaded if @server @critical.synchronize { return if @server # Protect against multiple mounts # Cache all the zones in the system ::Orchestrator::Zone.all.each do |zone| @zones[zone.id] = zone end @server = ::SpiderGazelle::Spider.instance @server.loaded.then do # Share threads with SpiderGazelle (one per core) if @server.mode == :thread @threads = @server.threads else # We are either running no_ipc or process (unsupported for control) @threads = Set.new cpus = ::Libuv.cpu_count || 1 cpus.times &method(:start_thread) @loop.signal :INT, method(:kill_workers) end @selector = @threads.cycle end } return @server.loaded end |
#notify_ready ⇒ Object
215 216 217 218 219 220 |
# File 'lib/orchestrator/control.rb', line 215 def notify_ready # Clear the system cache (in case it has been populated at all) System.clear_cache @ready = true @ready_defer.resolve(true) end |
#reload(dep_id) ⇒ Object
209 210 211 212 213 |
# File 'lib/orchestrator/control.rb', line 209 def reload(dep_id) @loop.work do reload_dep(dep_id) end end |
#start(mod_id) ⇒ Object
Starts a module running
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/orchestrator/control.rb', line 144 def start(mod_id) defer = @loop.defer mod = loaded? mod_id if mod mod.thread.schedule do mod.start defer.resolve(true) end else err = Error::ModuleNotFound.new "unable to start module '#{mod_id}', not found" defer.reject(err) @logger.warn err. end defer.promise end |
#stop(mod_id) ⇒ Object
Stops a module running
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/orchestrator/control.rb', line 163 def stop(mod_id) defer = @loop.defer mod = loaded? mod_id if mod mod.thread.schedule do mod.stop defer.resolve(true) end else err = Error::ModuleNotFound.new "unable to stop module '#{mod_id}', not found" defer.reject(err) @logger.warn err. end defer.promise end |
#unload(mod_id) ⇒ Object
Stop the module gracefully Then remove it from @loaded
183 184 185 186 187 188 189 190 |
# File 'lib/orchestrator/control.rb', line 183 def unload(mod_id) mod = mod_id.to_sym stop(mod).then(proc { @unloaded << mod @loaded.delete(mod) true # promise response }) end |
#update(mod_id) ⇒ Object
Unload then Get a fresh version of the settings from the database load the module
195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/orchestrator/control.rb', line 195 def update(mod_id) unload(mod_id).then(proc { # Grab database model in the thread pool res = @loop.work do ::Orchestrator::Module.find(mod_id) end # Load the module if model found res.then(proc { |config| load(config) # Promise chaining to here }) }) end |