Class: Cuboid::RPC::Server::Scheduler

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/cuboid/rpc/server/scheduler.rb

Overview

RPC scheduler service which:

In addition to the purely queue functionality, it also allows for running Instances to be:

  • Detached from the queue monitor and transfer the management responsibility to the client.

  • Attached to the queue monitor and transfer the management responsibility to the queue.

If a Agent has been provided, instances will be provided by it. If no Agent has been given, instances will be spawned on the Scheduler machine.

Author:

Constant Summary collapse

TICK_CONSUME =
0.1

Instance Method Summary collapse

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Methods included from UI::Output

#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?

Methods included from UI::OutputInterface

initialize

Methods included from UI::OutputInterface::Personalization

#included

Methods included from UI::OutputInterface::Controls

#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on

Methods included from UI::OutputInterface::ErrorLogging

#error_logfile, #has_error_log?, initialize, #set_error_logfile

Methods included from UI::OutputInterface::Implemented

#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception

Methods included from UI::OutputInterface::Abstract

#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/cuboid/rpc/server/scheduler.rb', line 48

def initialize
    @options = Options.instance

    @options.snapshot.path ||= @options.paths.snapshots
    @options.report.path   ||= @options.paths.reports

    @server = Base.new( @options.rpc.to_server_options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    Options.scheduler.url = @url = @server.url

    prep_logging

    @queue          = {}
    @id_to_priority = {}
    @by_priority    = {}

    @running   = {}
    @completed = {}
    @failed    = {}

    set_handlers( @server )
    trap_interrupts { Thread.new { shutdown } }

    monitor_instances
    consume_queue

    run
end

Instance Method Details

#alive?TrueClass

Returns:

  • (TrueClass)


264
265
266
# File 'lib/cuboid/rpc/server/scheduler.rb', line 264

def alive?
    @server.alive?
end

#any?Bool

Returns:

  • (Bool)


84
85
86
# File 'lib/cuboid/rpc/server/scheduler.rb', line 84

def any?
    !empty?
end

#attach(url, token, &block) ⇒ String, ...

Attaches a running Instance to the queue monitor.

Parameters:

  • url (String)

    Instance URL for a running Instance.

  • token (String)

    Authentication token for the Instance.

Returns:

  • (String, false, nil)
    • Instance ID for further queue reference.

    • ‘false` if the Instance is already attached to a Scheduler.

    • ‘nil` if the Instance could not be reached.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/cuboid/rpc/server/scheduler.rb', line 205

def attach( url, token, &block )
    client = connect_to_instance( url, token )
    client.alive? do |bool|
        if bool.rpc_exception?
            block.call
            next
        end

        client.scheduler_url do |scheduler_url|
            if scheduler_url
                block.call false
                next
            end

            client.options.set( scheduler: { url: @options.scheduler.url } ) do
                @running[token] = client
                block.call token
            end
        end
    end
end

#clearObject

Note:

Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Empties the queue.



231
232
233
234
235
236
237
# File 'lib/cuboid/rpc/server/scheduler.rb', line 231

def clear
    @queue.clear
    @by_priority.clear
    @id_to_priority.clear

    nil
end

#completedHash

Returns Completed Instances and their report location.

Returns:

  • (Hash)

    Completed Instances and their report location.



110
111
112
# File 'lib/cuboid/rpc/server/scheduler.rb', line 110

def completed
    @completed
end

#detach(id, &block) ⇒ Hash?

Returns * RPC connection information for the Instance.

  • ‘nil` if no running Instance with that ID is found.

Parameters:

  • id (String)

    Running Instance to detach from the queue monitor.

    Once a Instance is detached it becomes someone else’s responsibility to monitor, manage and shutdown to free its slot.

Returns:



185
186
187
188
189
190
191
192
# File 'lib/cuboid/rpc/server/scheduler.rb', line 185

def detach( id, &block )
    client = @running.delete( id )
    return block.call if !client

    client.options.set( scheduler: { url: nil } ) do
        block.call( url: client.url, token: client.token, pid: client.pid )
    end
end

#empty?Bool

Returns:

  • (Bool)


79
80
81
# File 'lib/cuboid/rpc/server/scheduler.rb', line 79

def empty?
    self.size == 0
end

#errors(starting_line = 0) ⇒ Array<String>

Parameters:

  • starting_line (Integer) (defaults to: 0)

    Sets the starting line for the range of errors to return.

Returns:



251
252
253
254
255
256
257
258
259
260
261
# File 'lib/cuboid/rpc/server/scheduler.rb', line 251

def errors( starting_line = 0 )
    return [] if self.error_buffer.empty?

    error_strings = self.error_buffer

    if starting_line != 0
        error_strings = error_strings[starting_line..-1]
    end

    error_strings
end

#failedHash

Returns Failed Instances and the associated error.

Returns:

  • (Hash)

    Failed Instances and the associated error.



116
117
118
# File 'lib/cuboid/rpc/server/scheduler.rb', line 116

def failed
    @failed
end

#get(id) ⇒ Hash?

Note:

Only returns info for queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Returns * Instance options and priority.

  • ‘nil` if a Instance with the given ID could not be found.

Parameters:

  • id (String)

    ID for a queued Instance.

Returns:

  • (Hash, nil)
    • Instance options and priority.

    • ‘nil` if a Instance with the given ID could not be found.



129
130
131
132
133
134
135
136
# File 'lib/cuboid/rpc/server/scheduler.rb', line 129

def get( id )
    return if !@queue.include? id

    {
        options:  @queue[id],
        priority: @id_to_priority[id]
    }
end

#listHash<Integer,Array>

Returns Queued Instances grouped and sorted by priority.

Returns:

  • (Hash<Integer,Array>)

    Queued Instances grouped and sorted by priority.



95
96
97
# File 'lib/cuboid/rpc/server/scheduler.rb', line 95

def list
    @by_priority
end

#push(options, queue_options = {}) ⇒ String

Returns Instance ID used to reference the Instance from then on.

Parameters:

  • options (Hash)

    Instance options with an extra ‘priority` option which defaults to `0` (higher is more urgent).

Returns:

  • (String)

    Instance ID used to reference the Instance from then on.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/cuboid/rpc/server/scheduler.rb', line 144

def push( options, queue_options = {} )
    priority = queue_options.delete('priority') || 0

    if !Cuboid::Application.application.valid_options?( options )
        fail ArgumentError, 'Invalid options!'
    end

    id = Utilities.generate_token

    @queue[id]          = options
    @id_to_priority[id] = priority

    (@by_priority[priority] ||= []) << id
    @by_priority = Hash[@by_priority.sort_by { |k, _| -k }]

    id
end

#remove(id) ⇒ Object

Note:

Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Parameters:

  • id (String)

    Instance ID to remove from the queue.



167
168
169
170
171
172
173
174
# File 'lib/cuboid/rpc/server/scheduler.rb', line 167

def remove( id )
    return false if !@queue.include? id

    @queue.delete( id )
    @by_priority[@id_to_priority.delete( id )].delete( id )

    true
end

#runningHash

Returns RPC connection information on running Instances.

Returns:



101
102
103
104
105
106
# File 'lib/cuboid/rpc/server/scheduler.rb', line 101

def running
    @running.inject( {} ) do |h, (id, client)|
        h.merge! id => { url: client.url, token: client.token, pid: client.pid }
        h
    end
end

#shutdownObject

Shuts down the service.



240
241
242
243
244
245
# File 'lib/cuboid/rpc/server/scheduler.rb', line 240

def shutdown
    print_status 'Shutting down...'
    reactor.delay 2 do
        reactor.stop
    end
end

#sizeInteger

Returns:

  • (Integer)


89
90
91
# File 'lib/cuboid/rpc/server/scheduler.rb', line 89

def size
    @queue.size
end