Class: Cuboid::Processes::Agents
- Includes:
- Utilities, Singleton
- Defined in:
- lib/cuboid/processes/agents.rb
Overview
Helper for managing RPC::Server::Agent processes.
Instance Attribute Summary collapse
-
#list ⇒ Array<String>
readonly
URLs of all running Agents.
Class Method Summary collapse
Instance Method Summary collapse
-
#connect(url, options = nil) ⇒ RPC::Client::Agent
Connects to a Agent by URL.
- #each(&block) ⇒ Object
- #grid_spawn(options = {}) ⇒ Object
-
#initialize ⇒ Agents
constructor
A new instance of Agents.
- #kill(url) ⇒ Object
-
#killall ⇒ Object
Kills all #list.
-
#spawn(options = {}) ⇒ RPC::Client::Agent
Spawns a RPC::Server::Agent process.
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize ⇒ Agents
Returns a new instance of Agents.
16 17 18 19 |
# File 'lib/cuboid/processes/agents.rb', line 16 def initialize @list = [] @agent_connections = {} end |
Instance Attribute Details
#list ⇒ Array<String> (readonly)
Returns URLs of all running Agents.
14 15 16 |
# File 'lib/cuboid/processes/agents.rb', line 14 def list @list end |
Class Method Details
.method_missing(sym, *args, &block) ⇒ Object
140 141 142 143 144 145 146 |
# File 'lib/cuboid/processes/agents.rb', line 140 def self.method_missing( sym, *args, &block ) if instance.respond_to?( sym ) instance.send( sym, *args, &block ) else super( sym, *args, &block ) end end |
.respond_to?(m) ⇒ Boolean
148 149 150 |
# File 'lib/cuboid/processes/agents.rb', line 148 def self.respond_to?( m ) super( m ) || instance.respond_to?( m ) end |
Instance Method Details
#connect(url, options = nil) ⇒ RPC::Client::Agent
Connects to a Agent by URL.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/cuboid/processes/agents.rb', line 27 def connect( url, = nil ) Raktr.global.run_in_thread if !Raktr.global.running? fresh = false if fresh = .delete( :fresh ) end if fresh @agent_connections[url] = RPC::Client::Agent.new( url, ) else @agent_connections[url] ||= RPC::Client::Agent.new( url, ) end end |
#each(&block) ⇒ Object
43 44 45 46 47 |
# File 'lib/cuboid/processes/agents.rb', line 43 def each( &block ) @list.each do |url| block.call connect( url ) end end |
#grid_spawn(options = {}) ⇒ Object
112 113 114 115 |
# File 'lib/cuboid/processes/agents.rb', line 112 def grid_spawn( = {} ) d = spawn( ) spawn( .merge peer: d.url ) end |
#kill(url) ⇒ Object
Note:
Will also kill all Instances started by the Agent.
120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/cuboid/processes/agents.rb', line 120 def kill( url ) agent = connect( url ) Manager.kill_many agent.statistics['consumed_pids'] Manager.kill agent.pid rescue => e #ap e #ap e.backtrace nil ensure @list.delete( url ) @agent_connections.delete( url ) end |
#killall ⇒ Object
Kills all #list.
134 135 136 137 138 |
# File 'lib/cuboid/processes/agents.rb', line 134 def killall @list.dup.each do |url| kill url end end |
#spawn(options = {}) ⇒ RPC::Client::Agent
Spawns a RPC::Server::Agent process.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/cuboid/processes/agents.rb', line 56 def spawn( = {} ) = .dup fork = .delete(:fork) daemonize = .delete(:daemonize) [:ssl] ||= { server: {}, client: {} } = { agent: { name: [:name], peer: [:peer], strategy: [:strategy], }, rpc: { server_port: [:port] || Utilities.available_port, server_address: [:address] || '127.0.0.1', server_external_address: [:external_address], ssl_ca: [:ssl][:ca], server_ssl_private_key: [:ssl][:server][:private_key], server_ssl_certificate: [:ssl][:server][:certificate], client_ssl_private_key: [:ssl][:client][:private_key], client_ssl_certificate: [:ssl][:client][:certificate], }, paths: { application: [:application] || Options.paths.application } } if [:rpc][:server_external_address].nil? [:rpc].delete :server_external_address end if [:agent][:peer].nil? [:agent].delete :peer end pid = Manager.spawn( :agent, options: , fork: fork, daemonize: daemonize ) url = "#{[:rpc][:server_address]}:#{[:rpc][:server_port]}" while sleep( 0.1 ) begin connect( url, connection_pool_size: 1, max_retries: 1 ).alive? break rescue => e # ap e end end @list << url connect( url, fresh: true ).tap { |c| c.pid = pid } end |