Class: Cuboid::Processes::Schedulers

Inherits:
Object
  • Object
show all
Includes:
Utilities, Singleton
Defined in:
lib/cuboid/processes/schedulers.rb

Overview

Helper for managing RPC::Server::Scheduler processes.

Author:

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Constructor Details

#initializeSchedulers

Returns a new instance of Schedulers.



14
15
16
17
# File 'lib/cuboid/processes/schedulers.rb', line 14

def initialize
    @list    = []
    @clients = {}
end

Instance Attribute Details

#listArray<String> (readonly)

Returns URLs of all running Queues.

Returns:

  • (Array<String>)

    URLs of all running Queues.



12
13
14
# File 'lib/cuboid/processes/schedulers.rb', line 12

def list
  @list
end

Class Method Details

.method_missing(sym, *args, &block) ⇒ Object



127
128
129
130
131
132
133
# File 'lib/cuboid/processes/schedulers.rb', line 127

def self.method_missing( sym, *args, &block )
    if instance.respond_to?( sym )
        instance.send( sym, *args, &block )
    else
        super( sym, *args, &block )
    end
end

.respond_to?(m) ⇒ Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/cuboid/processes/schedulers.rb', line 135

def self.respond_to?( m )
    super( m ) || instance.respond_to?( m )
end

Instance Method Details

#connect(url, options = nil) ⇒ RPC::Client::Scheduler

Connects to a Scheduler by URL.

Parameters:

  • url (String)

    URL of the Scheduler.

  • options (Hash) (defaults to: nil)

    Options for the RPC client.

Returns:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/cuboid/processes/schedulers.rb', line 25

def connect( url, options = nil )
    Raktr.global.run_in_thread if !Raktr.global.running?

    fresh = false
    if options
        fresh = options.delete( :fresh )
    end

    if fresh
        @clients[url] = RPC::Client::Scheduler.new( url, options )
    else
        @clients[url] ||= RPC::Client::Scheduler.new( url, options )
    end
end

#each(&block) ⇒ Object

Parameters:

  • block (Block)

    Block to pass an RPC client for each Scheduler.



41
42
43
44
45
# File 'lib/cuboid/processes/schedulers.rb', line 41

def each( &block )
    @list.each do |url|
        block.call connect( url )
    end
end

#kill(url) ⇒ Object

Note:

Will also kill all Instances started by the Scheduler.

Parameters:

  • url (String)

    URL of the Scheduler to kill.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/cuboid/processes/schedulers.rb', line 104

def kill( url )
    scheduler = connect( url )
    scheduler.clear
    scheduler.running.each do |id, instance|
        Manager.kill instance['pid']
    end
    Manager.kill scheduler.pid
rescue => e
    #ap e
    #ap e.backtrace
    nil
ensure
    @list.delete( url )
    @clients.delete( url ).close
end

#killallObject

Kills all #list.



121
122
123
124
125
# File 'lib/cuboid/processes/schedulers.rb', line 121

def killall
    @list.dup.each do |url|
        kill url
    end
end

#spawn(options = {}) ⇒ RPC::Client::Queue

Spawns a RPC::Server::Scheduler process.

Parameters:

  • options (Hash) (defaults to: {})

    To be passed to Options#set. Allows ‘address` instead of `rpc_server_address` and `port` instead of `rpc_port`.

Returns:

  • (RPC::Client::Queue)


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/cuboid/processes/schedulers.rb', line 54

def spawn( options = {} )
    options = options.dup
    fork = options.delete(:fork)
    daemonize = options.delete(:daemonize)

    options[:ssl] ||= {
      server: {},
      client: {}
    }

    options = {
        agent: {
            url:      options[:agent],
            strategy: options[:strategy]
        },
        rpc:        {
            server_port:             options[:port]    || Utilities.available_port,
            server_address:          options[:address] || '127.0.0.1',
            server_external_address: options[:external_address],

            ssl_ca:                 options[:ssl][:ca],
            server_ssl_private_key: options[:ssl][:server][:private_key],
            server_ssl_certificate: options[:ssl][:server][:certificate],
            client_ssl_private_key: options[:ssl][:client][:private_key],
            client_ssl_certificate: options[:ssl][:client][:certificate],
        },
        paths: {
          application: options[:application]  || Options.paths.application
        }
    }

    pid = Manager.spawn( :scheduler, options: options, fork: fork, daemonize: daemonize )

    url = "#{options[:rpc][:server_address]}:#{options[:rpc][:server_port]}"
    while sleep( 0.1 )
        begin
            connect( url, connection_pool_size: 1, max_retries: 1 ).alive?
            break
        rescue => e
            # ap e
        end
    end

    @list << url
    connect( url, fresh: true ).tap { |c| c.pid = pid }
end