Class: Cuboid::Processes::Schedulers
- Includes:
- Utilities, Singleton
- Defined in:
- lib/cuboid/processes/schedulers.rb
Overview
Helper for managing RPC::Server::Scheduler processes.
Instance Attribute Summary collapse
-
#list ⇒ Array<String>
readonly
URLs of all running Queues.
Class Method Summary collapse
Instance Method Summary collapse
-
#connect(url, options = nil) ⇒ RPC::Client::Scheduler
Connects to a Scheduler by URL.
- #each(&block) ⇒ Object
-
#initialize ⇒ Schedulers
constructor
A new instance of Schedulers.
- #kill(url) ⇒ Object
-
#killall ⇒ Object
Kills all #list.
-
#spawn(options = {}) ⇒ RPC::Client::Queue
Spawns a RPC::Server::Scheduler process.
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize ⇒ Schedulers
Returns a new instance of Schedulers.
14 15 16 17 |
# File 'lib/cuboid/processes/schedulers.rb', line 14 def initialize @list = [] @clients = {} end |
Instance Attribute Details
#list ⇒ Array<String> (readonly)
Returns URLs of all running Queues.
12 13 14 |
# File 'lib/cuboid/processes/schedulers.rb', line 12 def list @list end |
Class Method Details
.method_missing(sym, *args, &block) ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/cuboid/processes/schedulers.rb', line 127 def self.method_missing( sym, *args, &block ) if instance.respond_to?( sym ) instance.send( sym, *args, &block ) else super( sym, *args, &block ) end end |
.respond_to?(m) ⇒ Boolean
135 136 137 |
# File 'lib/cuboid/processes/schedulers.rb', line 135 def self.respond_to?( m ) super( m ) || instance.respond_to?( m ) end |
Instance Method Details
#connect(url, options = nil) ⇒ RPC::Client::Scheduler
Connects to a Scheduler by URL.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/cuboid/processes/schedulers.rb', line 25 def connect( url, = nil ) Raktr.global.run_in_thread if !Raktr.global.running? fresh = false if fresh = .delete( :fresh ) end if fresh @clients[url] = RPC::Client::Scheduler.new( url, ) else @clients[url] ||= RPC::Client::Scheduler.new( url, ) end end |
#each(&block) ⇒ Object
41 42 43 44 45 |
# File 'lib/cuboid/processes/schedulers.rb', line 41 def each( &block ) @list.each do |url| block.call connect( url ) end end |
#kill(url) ⇒ Object
Note:
Will also kill all Instances started by the Scheduler.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/cuboid/processes/schedulers.rb', line 104 def kill( url ) scheduler = connect( url ) scheduler.clear scheduler.running.each do |id, instance| Manager.kill instance['pid'] end Manager.kill scheduler.pid rescue => e #ap e #ap e.backtrace nil ensure @list.delete( url ) @clients.delete( url ).close end |
#killall ⇒ Object
Kills all #list.
121 122 123 124 125 |
# File 'lib/cuboid/processes/schedulers.rb', line 121 def killall @list.dup.each do |url| kill url end end |
#spawn(options = {}) ⇒ RPC::Client::Queue
Spawns a RPC::Server::Scheduler process.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/cuboid/processes/schedulers.rb', line 54 def spawn( = {} ) = .dup fork = .delete(:fork) daemonize = .delete(:daemonize) [:ssl] ||= { server: {}, client: {} } = { agent: { url: [:agent], strategy: [:strategy] }, rpc: { server_port: [:port] || Utilities.available_port, server_address: [:address] || '127.0.0.1', server_external_address: [:external_address], ssl_ca: [:ssl][:ca], server_ssl_private_key: [:ssl][:server][:private_key], server_ssl_certificate: [:ssl][:server][:certificate], client_ssl_private_key: [:ssl][:client][:private_key], client_ssl_certificate: [:ssl][:client][:certificate], }, paths: { application: [:application] || Options.paths.application } } pid = Manager.spawn( :scheduler, options: , fork: fork, daemonize: daemonize ) url = "#{[:rpc][:server_address]}:#{[:rpc][:server_port]}" while sleep( 0.1 ) begin connect( url, connection_pool_size: 1, max_retries: 1 ).alive? break rescue => e # ap e end end @list << url connect( url, fresh: true ).tap { |c| c.pid = pid } end |