Class: Cuboid::RPC::Server::Agent

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/cuboid/rpc/server/agent.rb

Overview

Dispatches RPC Instances on demand and allows for extensive process monitoring.

The process goes something like this:

  • A client issues a #spawn call.

  • The Agent spawns and returns Instance info to the client (url, auth token, etc.).

  • The client connects to the Instance using that info.

Once the client finishes using the RPC Instance it must shut it down otherwise the system will be eaten away by zombie processes.

Author:

Defined Under Namespace

Classes: Node, Service

Constant Summary collapse

SERVICE_NAMESPACE =
Service
PREFERENCE_STRATEGIES =
Cuboid::OptionGroups::Agent::STRATEGIES

Instance Method Summary collapse

Methods included from UI::Output

#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?

Methods included from UI::OutputInterface

initialize

Methods included from UI::OutputInterface::Personalization

#included

Methods included from UI::OutputInterface::Controls

#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on

Methods included from UI::OutputInterface::ErrorLogging

#error_logfile, #has_error_log?, initialize, #set_error_logfile

Methods included from UI::OutputInterface::Implemented

#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception

Methods included from UI::OutputInterface::Abstract

#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Constructor Details

#initialize(options = Options.instance) ⇒ Agent

Returns a new instance of Agent.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/cuboid/rpc/server/agent.rb', line 36

def initialize( options = Options.instance )
    @options = options

    @options.snapshot.path ||= @options.paths.snapshots

    @server = Base.new( @options.rpc.to_server_options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    Options.agent.url = @url = @server.url

    prep_logging

    print_status 'Starting the Agent...'
    @server.logger.info( 'System' ) { "Logfile at: #{@logfile}"  }

    @server.add_handler( 'agent', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @instances = []

    Cuboid::Application.application.agent_services.each do |name, service|
        @server.add_handler( name.to_s, service.new( @options, self ) )
    end

    @node = Node.new( @options, @server, @logfile )
    @server.add_handler( 'node', @node )

    run
end

Instance Method Details

#alive?TrueClass

Returns true.

Returns:

  • (TrueClass)

    true



79
80
81
# File 'lib/cuboid/rpc/server/agent.rb', line 79

def alive?
    @server.alive?
end

#finished_instancesArray<Hash>

Returns info for all finished (dead) instances.

Returns:

  • (Array<Hash>)

    Returns info for all finished (dead) instances.

See Also:



249
250
251
# File 'lib/cuboid/rpc/server/agent.rb', line 249

def finished_instances
    instances.reject { |i| i['alive'] }
end

#instance(pid) ⇒ Hash

Returns proc info for a given pid

Parameters:

  • pid (Fixnum)

Returns:



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/cuboid/rpc/server/agent.rb', line 214

def instance( pid )
    @instances.each do |i|
        next if i['pid'] != pid
        i = i.dup

        now = Time.now

        i['now']   = now.to_s
        i['age']   = now - Time.parse( i['birthdate'] )
        i['alive'] = Cuboid::Processes::Manager.alive?( pid )

        return i
    end

    nil
end

#instancesArray<Hash>

Returns info for all instances.

Returns:

  • (Array<Hash>)

    Returns info for all instances.



233
234
235
# File 'lib/cuboid/rpc/server/agent.rb', line 233

def instances
    @instances.map { |i| instance( i['pid'] ) }.compact
end

#logString

Returns Contents of the log file.

Returns:

  • (String)

    Contents of the log file



279
280
281
# File 'lib/cuboid/rpc/server/agent.rb', line 279

def log
    IO.read prep_logging
end

#pidObject



284
285
286
# File 'lib/cuboid/rpc/server/agent.rb', line 284

def pid
    Process.pid
end

#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?

Returns Depending on strategy and availability:

  • URL of the preferred Agent. If not a grid member it will return

    this Agent's URL.
    
  • ‘nil` if all nodes are at max utilization or on error.

  • ‘ArgumentError` – On invalid `strategy`.

Parameters:

  • strategy (Symbol) (defaults to: Cuboid::Options.agent.strategy)

    ‘:horizontal` – Pick the Agent with the least amount of workload. `:vertical` – Pick the Agent with the most amount of workload. `:direct` – Bypass the grid and get an Instance directly from this agent.

Returns:

  • (String, nil)

    Depending on strategy and availability:

    • URL of the preferred Agent. If not a grid member it will return

      this Agent's URL.
      
    • ‘nil` if all nodes are at max utilization or on error.

    • ‘ArgumentError` – On invalid `strategy`.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/cuboid/rpc/server/agent.rb', line 95

def preferred( strategy = Cuboid::Options.agent.strategy, &block )
    strategy = strategy.to_sym
    if !PREFERENCE_STRATEGIES.include? strategy
        block.call :error_unknown_strategy
        raise ArgumentError, "Unknown strategy: #{strategy}"
    end

    if strategy == :direct || !@node.grid_member?
        block.call( self.utilization == 1 ? nil : @url )
        return
    end

    pick_utilization = proc do |url, utilization|
        (utilization == 1 || utilization.rpc_exception?) ?
            nil : [url, utilization]
    end

    adjust_score_by_strategy = proc do |score|
        case strategy
            when :horizontal
                score

            when :vertical
                -score
        end
    end

    each = proc do |peer, iter|
        connect_to_peer( peer ).utilization do |utilization|
            iter.return pick_utilization.call( peer, utilization )
        end
    end

    after = proc do |nodes|
        nodes << pick_utilization.call( @url, self.utilization )
        nodes.compact!

        # All nodes are at max utilization, pass.
        if nodes.empty?
            block.call
            next
        end

        block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0]
    end

    Raktr.global.create_iterator( @node.peers ).map( each, after )
end

#running_instancesArray<Hash>

Returns info for all running (alive) instances.

Returns:

  • (Array<Hash>)

    Returns info for all running (alive) instances.

See Also:



241
242
243
# File 'lib/cuboid/rpc/server/agent.rb', line 241

def running_instances
    instances.select { |i| i['alive'] }
end

#servicesObject



73
74
75
# File 'lib/cuboid/rpc/server/agent.rb', line 73

def services
    Cuboid::Application.application.agent_services.keys
end

#spawn(options = {}, &block) ⇒ Hash?

Spawns an Instance.

Parameters:

  • options (String) (defaults to: {})
  • [String] (Hash)

    a customizable set of options

  • [Hash] (Hash)

    a customizable set of options

Returns:

  • (Hash, nil)

    Depending on availability:

    • ‘Hash`: Connection and proc info.

    • ‘nil`: Max utilization or currently spawning, wait and retry.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cuboid/rpc/server/agent.rb', line 158

def spawn( options = {}, &block )
    if @spawning
        block.call nil
        return
    end

    options      = options.my_symbolize_keys
    strategy     = options.delete(:strategy)
    owner        = options[:owner]
    helpers      = options[:helpers] || {}

    if strategy != 'direct' && @node.grid_member?
        preferred *[strategy].compact do |url|
            if !url
                block.call
                next
            end

            if url == :error_unknown_strategy
                block.call :error_unknown_strategy
                next
            end

            connect_to_peer( url ).spawn( options.merge(
                  helpers:  helpers.merge( via: @url ),
                  strategy: :direct
                ),
                &block
            )
        end
        return
    end

    if System.max_utilization?
        block.call
        return
    end

    @spawning = true
    spawn_instance do |info|
        info['owner']   = owner
        info['helpers'] = helpers

        @instances << info

        block.call info

        @spawning = false
    end
end

#statisticsHash

Returns server stats regarding the instances and pool.

Returns:

  • (Hash)

    Returns server stats regarding the instances and pool.



266
267
268
269
270
271
272
273
274
275
# File 'lib/cuboid/rpc/server/agent.rb', line 266

def statistics
    {
        'utilization'         => utilization,
        'running_instances'   => running_instances,
        'finished_instances'  => finished_instances,
        'consumed_pids'       => @instances.map { |i| i['pid'] }.compact,
        'snapshots'           => Dir.glob( "#{@options.snapshot.path}*.#{Snapshot::EXTENSION}" ),
        'node'                => @node.info
    }
end

#utilizationFloat

Returns Workload score for this Agent, calculated using System#utilization.

  • ‘0.0` => No utilization.

  • ‘1.0` => Max utilization.

Lower is better.

Returns:

  • (Float)

    Workload score for this Agent, calculated using System#utilization.

    • ‘0.0` => No utilization.

    • ‘1.0` => Max utilization.

    Lower is better.



260
261
262
# File 'lib/cuboid/rpc/server/agent.rb', line 260

def utilization
    System.utilization
end