Class: Cuboid::RPC::Server::Agent
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/cuboid/rpc/server/agent.rb
Overview
Dispatches RPC Instances on demand and allows for extensive process monitoring.
The process goes something like this:
-
A client issues a #spawn call.
-
The Agent spawns and returns Instance info to the client (url, auth token, etc.).
-
The client connects to the Instance using that info.
Once the client finishes using the RPC Instance it must shut it down otherwise the system will be eaten away by zombie processes.
Defined Under Namespace
Constant Summary collapse
- SERVICE_NAMESPACE =
Service
- PREFERENCE_STRATEGIES =
Cuboid::OptionGroups::Agent::STRATEGIES
Instance Method Summary collapse
-
#alive? ⇒ TrueClass
True.
-
#finished_instances ⇒ Array<Hash>
Returns info for all finished (dead) instances.
-
#initialize(options = Options.instance) ⇒ Agent
constructor
A new instance of Agent.
-
#instance(pid) ⇒ Hash
Returns proc info for a given pid.
-
#instances ⇒ Array<Hash>
Returns info for all instances.
-
#log ⇒ String
Contents of the log file.
- #pid ⇒ Object
-
#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?
Depending on strategy and availability:.
-
#running_instances ⇒ Array<Hash>
Returns info for all running (alive) instances.
- #services ⇒ Object
-
#spawn(options = {}, &block) ⇒ Hash?
Spawns an Instance.
-
#statistics ⇒ Hash
Returns server stats regarding the instances and pool.
-
#utilization ⇒ Float
Workload score for this Agent, calculated using System#utilization.
Methods included from UI::Output
#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?
Methods included from UI::OutputInterface
Methods included from UI::OutputInterface::Personalization
Methods included from UI::OutputInterface::Controls
#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on
Methods included from UI::OutputInterface::ErrorLogging
#error_logfile, #has_error_log?, initialize, #set_error_logfile
Methods included from UI::OutputInterface::Implemented
#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception
Methods included from UI::OutputInterface::Abstract
#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize(options = Options.instance) ⇒ Agent
Returns a new instance of Agent.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/cuboid/rpc/server/agent.rb', line 36 def initialize( = Options.instance ) @options = @options.snapshot.path ||= @options.paths.snapshots @server = Base.new( @options.rpc. ) @server.logger.level = @options.datastore.log_level if @options.datastore.log_level @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end Options.agent.url = @url = @server.url prep_logging print_status 'Starting the Agent...' @server.logger.info( 'System' ) { "Logfile at: #{@logfile}" } @server.add_handler( 'agent', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @instances = [] Cuboid::Application.application.agent_services.each do |name, service| @server.add_handler( name.to_s, service.new( @options, self ) ) end @node = Node.new( @options, @server, @logfile ) @server.add_handler( 'node', @node ) run end |
Instance Method Details
#alive? ⇒ TrueClass
Returns true.
79 80 81 |
# File 'lib/cuboid/rpc/server/agent.rb', line 79 def alive? @server.alive? end |
#finished_instances ⇒ Array<Hash>
Returns info for all finished (dead) instances.
249 250 251 |
# File 'lib/cuboid/rpc/server/agent.rb', line 249 def finished_instances instances.reject { |i| i['alive'] } end |
#instance(pid) ⇒ Hash
Returns proc info for a given pid
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/cuboid/rpc/server/agent.rb', line 214 def instance( pid ) @instances.each do |i| next if i['pid'] != pid i = i.dup now = Time.now i['now'] = now.to_s i['age'] = now - Time.parse( i['birthdate'] ) i['alive'] = Cuboid::Processes::Manager.alive?( pid ) return i end nil end |
#instances ⇒ Array<Hash>
Returns info for all instances.
233 234 235 |
# File 'lib/cuboid/rpc/server/agent.rb', line 233 def instances @instances.map { |i| instance( i['pid'] ) }.compact end |
#log ⇒ String
Returns Contents of the log file.
279 280 281 |
# File 'lib/cuboid/rpc/server/agent.rb', line 279 def log IO.read prep_logging end |
#pid ⇒ Object
284 285 286 |
# File 'lib/cuboid/rpc/server/agent.rb', line 284 def pid Process.pid end |
#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?
Returns Depending on strategy and availability:
-
URL of the preferred Agent. If not a grid member it will return
this Agent's URL.
-
‘nil` if all nodes are at max utilization or on error.
-
‘ArgumentError` – On invalid `strategy`.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/cuboid/rpc/server/agent.rb', line 95 def preferred( strategy = Cuboid::Options.agent.strategy, &block ) strategy = strategy.to_sym if !PREFERENCE_STRATEGIES.include? strategy block.call :error_unknown_strategy raise ArgumentError, "Unknown strategy: #{strategy}" end if strategy == :direct || !@node.grid_member? block.call( self.utilization == 1 ? nil : @url ) return end pick_utilization = proc do |url, utilization| (utilization == 1 || utilization.rpc_exception?) ? nil : [url, utilization] end adjust_score_by_strategy = proc do |score| case strategy when :horizontal score when :vertical -score end end each = proc do |peer, iter| connect_to_peer( peer ).utilization do |utilization| iter.return pick_utilization.call( peer, utilization ) end end after = proc do |nodes| nodes << pick_utilization.call( @url, self.utilization ) nodes.compact! # All nodes are at max utilization, pass. if nodes.empty? block.call next end block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0] end Raktr.global.create_iterator( @node.peers ).map( each, after ) end |
#running_instances ⇒ Array<Hash>
Returns info for all running (alive) instances.
241 242 243 |
# File 'lib/cuboid/rpc/server/agent.rb', line 241 def running_instances instances.select { |i| i['alive'] } end |
#services ⇒ Object
73 74 75 |
# File 'lib/cuboid/rpc/server/agent.rb', line 73 def services Cuboid::Application.application.agent_services.keys end |
#spawn(options = {}, &block) ⇒ Hash?
Spawns an Instance.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/cuboid/rpc/server/agent.rb', line 158 def spawn( = {}, &block ) if @spawning block.call nil return end = .my_symbolize_keys strategy = .delete(:strategy) owner = [:owner] helpers = [:helpers] || {} if strategy != 'direct' && @node.grid_member? preferred *[strategy].compact do |url| if !url block.call next end if url == :error_unknown_strategy block.call :error_unknown_strategy next end connect_to_peer( url ).spawn( .merge( helpers: helpers.merge( via: @url ), strategy: :direct ), &block ) end return end if System.max_utilization? block.call return end @spawning = true spawn_instance do |info| info['owner'] = owner info['helpers'] = helpers @instances << info block.call info @spawning = false end end |
#statistics ⇒ Hash
Returns server stats regarding the instances and pool.
266 267 268 269 270 271 272 273 274 275 |
# File 'lib/cuboid/rpc/server/agent.rb', line 266 def statistics { 'utilization' => utilization, 'running_instances' => running_instances, 'finished_instances' => finished_instances, 'consumed_pids' => @instances.map { |i| i['pid'] }.compact, 'snapshots' => Dir.glob( "#{@options.snapshot.path}*.#{Snapshot::EXTENSION}" ), 'node' => @node.info } end |
#utilization ⇒ Float
Returns Workload score for this Agent, calculated using System#utilization.
-
‘0.0` => No utilization.
-
‘1.0` => Max utilization.
Lower is better.
260 261 262 |
# File 'lib/cuboid/rpc/server/agent.rb', line 260 def utilization System.utilization end |