Class: Cuboid::Processes::Agents

Inherits:
Object
  • Object
show all
Includes:
Utilities, Singleton
Defined in:
lib/cuboid/processes/agents.rb

Overview

Helper for managing RPC::Server::Agent processes.

Author:

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Constructor Details

#initializeAgents

Returns a new instance of Agents.



16
17
18
19
# File 'lib/cuboid/processes/agents.rb', line 16

def initialize
    @list = []
    @agent_connections = {}
end

Instance Attribute Details

#listArray<String> (readonly)

Returns URLs of all running Agents.

Returns:

  • (Array<String>)

    URLs of all running Agents.



14
15
16
# File 'lib/cuboid/processes/agents.rb', line 14

def list
  @list
end

Class Method Details

.method_missing(sym, *args, &block) ⇒ Object



140
141
142
143
144
145
146
# File 'lib/cuboid/processes/agents.rb', line 140

def self.method_missing( sym, *args, &block )
    if instance.respond_to?( sym )
        instance.send( sym, *args, &block )
    else
        super( sym, *args, &block )
    end
end

.respond_to?(m) ⇒ Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/cuboid/processes/agents.rb', line 148

def self.respond_to?( m )
    super( m ) || instance.respond_to?( m )
end

Instance Method Details

#connect(url, options = nil) ⇒ RPC::Client::Agent

Connects to a Agent by URL.

Parameters:

  • url (String)

    URL of the Agent.

  • options (Hash) (defaults to: nil)

    Options for the RPC client.

Returns:



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/cuboid/processes/agents.rb', line 27

def connect( url, options = nil )
    Raktr.global.run_in_thread if !Raktr.global.running?

    fresh = false
    if options
        fresh = options.delete( :fresh )
    end

    if fresh
        @agent_connections[url] = RPC::Client::Agent.new( url, options )
    else
        @agent_connections[url] ||= RPC::Client::Agent.new( url, options )
    end
end

#each(&block) ⇒ Object

Parameters:

  • block (Block)

    Block to pass an RPC client for each Agent.



43
44
45
46
47
# File 'lib/cuboid/processes/agents.rb', line 43

def each( &block )
    @list.each do |url|
        block.call connect( url )
    end
end

#grid_spawn(options = {}) ⇒ Object



112
113
114
115
# File 'lib/cuboid/processes/agents.rb', line 112

def grid_spawn( options = {} )
    d = spawn( options )
    spawn( options.merge peer: d.url )
end

#kill(url) ⇒ Object

Note:

Will also kill all Instances started by the Agent.

Parameters:

  • url (String)

    URL of the Agent to kill.



120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cuboid/processes/agents.rb', line 120

def kill( url )
    agent = connect( url )
    Manager.kill_many agent.statistics['consumed_pids']
    Manager.kill agent.pid
rescue => e
    #ap e
    #ap e.backtrace
    nil
ensure
    @list.delete( url )
    @agent_connections.delete( url )
end

#killallObject

Kills all #list.



134
135
136
137
138
# File 'lib/cuboid/processes/agents.rb', line 134

def killall
    @list.dup.each do |url|
        kill url
    end
end

#spawn(options = {}) ⇒ RPC::Client::Agent

Spawns a RPC::Server::Agent process.

Parameters:

  • options (Hash) (defaults to: {})

    To be passed to Options#set. Allows ‘address` instead of `rpc_server_address` and `port` instead of `rpc_port`.

Returns:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/cuboid/processes/agents.rb', line 56

def spawn( options = {} )
    options = options.dup
    fork = options.delete(:fork)
    daemonize = options.delete(:daemonize)

    options[:ssl] ||= {
      server: {},
      client: {}
    }

    options = {
        agent: {
            name:      options[:name],
            peer: options[:peer],
            strategy:  options[:strategy],
        },
        rpc:        {
            server_port:             options[:port]    || Utilities.available_port,
            server_address:          options[:address] || '127.0.0.1',
            server_external_address: options[:external_address],

            ssl_ca:                 options[:ssl][:ca],
            server_ssl_private_key: options[:ssl][:server][:private_key],
            server_ssl_certificate: options[:ssl][:server][:certificate],
            client_ssl_private_key: options[:ssl][:client][:private_key],
            client_ssl_certificate: options[:ssl][:client][:certificate],
        },
        paths: {
            application: options[:application] || Options.paths.application
        }
    }

    if options[:rpc][:server_external_address].nil?
        options[:rpc].delete :server_external_address
    end

    if options[:agent][:peer].nil?
        options[:agent].delete :peer
    end

    pid = Manager.spawn( :agent, options: options, fork: fork, daemonize: daemonize )

    url = "#{options[:rpc][:server_address]}:#{options[:rpc][:server_port]}"
    while sleep( 0.1 )
        begin
            connect( url, connection_pool_size: 1, max_retries: 1 ).alive?
            break
        rescue => e
            # ap e
        end
    end

    @list << url
    connect( url, fresh: true ).tap { |c| c.pid = pid }
end