Class: Cuboid::RPC::Server::Scheduler
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/cuboid/rpc/server/scheduler.rb
Overview
RPC scheduler service which:
-
Maintains a priority queue of Instance jobs.
-
Runs them once a slot is available – determined by system utilization.
-
Monitors #running Instances, retrieves and stores their reports and shuts down their Instance to free its slot.
-
Makes available information on #completed and #failed Instances.
In addition to the purely queue functionality, it also allows for running Instances to be:
-
Detached from the queue monitor and transfer the management responsibility to the client.
-
Attached to the queue monitor and transfer the management responsibility to the queue.
If a Agent has been provided, instances will be provided by it. If no Agent has been given, instances will be spawned on the Scheduler machine.
Constant Summary collapse
- TICK_CONSUME =
0.1
Instance Method Summary collapse
- #alive? ⇒ TrueClass
- #any? ⇒ Bool
-
#attach(url, token, &block) ⇒ String, ...
Attaches a running Instance to the queue monitor.
-
#clear ⇒ Object
Empties the queue.
-
#completed ⇒ Hash
Completed Instances and their report location.
-
#detach(id, &block) ⇒ Hash?
-
RPC connection information for the Instance.
-
- #empty? ⇒ Bool
- #errors(starting_line = 0) ⇒ Array<String>
-
#failed ⇒ Hash
Failed Instances and the associated error.
-
#get(id) ⇒ Hash?
-
Instance options and priority.
-
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#list ⇒ Hash<Integer,Array>
Queued Instances grouped and sorted by priority.
-
#push(options, queue_options = {}) ⇒ String
Instance ID used to reference the Instance from then on.
- #remove(id) ⇒ Object
-
#running ⇒ Hash
RPC connection information on running Instances.
-
#shutdown ⇒ Object
Shuts down the service.
- #size ⇒ Integer
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Methods included from UI::Output
#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?
Methods included from UI::OutputInterface
Methods included from UI::OutputInterface::Personalization
Methods included from UI::OutputInterface::Controls
#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on
Methods included from UI::OutputInterface::ErrorLogging
#error_logfile, #has_error_log?, initialize, #set_error_logfile
Methods included from UI::OutputInterface::Implemented
#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception
Methods included from UI::OutputInterface::Abstract
#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 48 def initialize @options = Options.instance @options.snapshot.path ||= @options.paths.snapshots @options.report.path ||= @options.paths.reports @server = Base.new( @options.rpc. ) @server.logger.level = @options.datastore.log_level if @options.datastore.log_level Options.scheduler.url = @url = @server.url prep_logging @queue = {} @id_to_priority = {} @by_priority = {} @running = {} @completed = {} @failed = {} set_handlers( @server ) trap_interrupts { Thread.new { shutdown } } monitor_instances consume_queue run end |
Instance Method Details
#alive? ⇒ TrueClass
264 265 266 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 264 def alive? @server.alive? end |
#any? ⇒ Bool
84 85 86 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 84 def any? !empty? end |
#attach(url, token, &block) ⇒ String, ...
Attaches a running Instance to the queue monitor.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 205 def attach( url, token, &block ) client = connect_to_instance( url, token ) client.alive? do |bool| if bool.rpc_exception? block.call next end client.scheduler_url do |scheduler_url| if scheduler_url block.call false next end client..set( scheduler: { url: @options.scheduler.url } ) do @running[token] = client block.call token end end end end |
#clear ⇒ Object
Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
Empties the queue.
231 232 233 234 235 236 237 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 231 def clear @queue.clear @by_priority.clear @id_to_priority.clear nil end |
#completed ⇒ Hash
Returns Completed Instances and their report location.
110 111 112 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 110 def completed @completed end |
#detach(id, &block) ⇒ Hash?
Returns * RPC connection information for the Instance.
-
‘nil` if no running Instance with that ID is found.
185 186 187 188 189 190 191 192 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 185 def detach( id, &block ) client = @running.delete( id ) return block.call if !client client..set( scheduler: { url: nil } ) do block.call( url: client.url, token: client.token, pid: client.pid ) end end |
#empty? ⇒ Bool
79 80 81 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 79 def empty? self.size == 0 end |
#errors(starting_line = 0) ⇒ Array<String>
251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 251 def errors( starting_line = 0 ) return [] if self.error_buffer.empty? error_strings = self.error_buffer if starting_line != 0 error_strings = error_strings[starting_line..-1] end error_strings end |
#failed ⇒ Hash
Returns Failed Instances and the associated error.
116 117 118 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 116 def failed @failed end |
#get(id) ⇒ Hash?
Only returns info for queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
Returns * Instance options and priority.
-
‘nil` if a Instance with the given ID could not be found.
129 130 131 132 133 134 135 136 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 129 def get( id ) return if !@queue.include? id { options: @queue[id], priority: @id_to_priority[id] } end |
#list ⇒ Hash<Integer,Array>
Returns Queued Instances grouped and sorted by priority.
95 96 97 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 95 def list @by_priority end |
#push(options, queue_options = {}) ⇒ String
Returns Instance ID used to reference the Instance from then on.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 144 def push( , = {} ) priority = .delete('priority') || 0 if !Cuboid::Application.application.( ) fail ArgumentError, 'Invalid options!' end id = Utilities.generate_token @queue[id] = @id_to_priority[id] = priority (@by_priority[priority] ||= []) << id @by_priority = Hash[@by_priority.sort_by { |k, _| -k }] id end |
#remove(id) ⇒ Object
Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
167 168 169 170 171 172 173 174 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 167 def remove( id ) return false if !@queue.include? id @queue.delete( id ) @by_priority[@id_to_priority.delete( id )].delete( id ) true end |
#running ⇒ Hash
Returns RPC connection information on running Instances.
101 102 103 104 105 106 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 101 def running @running.inject( {} ) do |h, (id, client)| h.merge! id => { url: client.url, token: client.token, pid: client.pid } h end end |
#shutdown ⇒ Object
Shuts down the service.
240 241 242 243 244 245 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 240 def shutdown print_status 'Shutting down...' reactor.delay 2 do reactor.stop end end |
#size ⇒ Integer
89 90 91 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 89 def size @queue.size end |