Class: Cuboid::RPC::Server::Agent::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/cuboid/rpc/server/agent/service.rb

Overview

Base class and namespace for all Agent services.

# RPC accessibility

Only PUBLIC methods YOU have defined will be accessible over RPC.

# Blocking operations

Please try to avoid blocking operations as they will block the main Reactor loop.

However, if you really need to perform such operations, you can update the relevant methods to expect a block and then pass the desired return value to that block instead of returning it the usual way.

This will result in the method’s payload to be deferred into a Thread of its own.

In addition, you can use the #defer and #run_asap methods is you need more control over what gets deferred and general scheduling.

# Asynchronous operations

Methods which perform async operations should expect a block and pass their results to that block instead of returning a value.

Author:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options, agent) ⇒ Service

Returns a new instance of Service.



34
35
36
37
# File 'lib/cuboid/rpc/server/agent/service.rb', line 34

def initialize( options, agent )
    @options    = options
    @agent = agent
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



32
33
34
# File 'lib/cuboid/rpc/server/agent/service.rb', line 32

def agent
  @agent
end

#optionsObject (readonly)

Returns the value of attribute options.



31
32
33
# File 'lib/cuboid/rpc/server/agent/service.rb', line 31

def options
  @options
end

Instance Method Details

#connect_to_agent(url) ⇒ Client::Agent

Connects to a Agent by ‘url`

Parameters:

  • url (String)

Returns:



113
114
115
116
# File 'lib/cuboid/rpc/server/agent/service.rb', line 113

def connect_to_agent( url )
    @agent_connections ||= {}
    @agent_connections[url] ||= Client::Agent.new( url )
end

#connect_to_instance(*args) ⇒ Client::Instance

Connects to an Instance by ‘url`.

Examples:

connect_to_instance( url, token )
connect_to_instance( url: url, token: token )
connect_to_instance( 'url' => url, 'token' => token )

Parameters:

  • args (Vararg)

Returns:



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/cuboid/rpc/server/agent/service.rb', line 128

def connect_to_instance( *args )
    url = token = nil

    if args.size == 2
        url, token = *args
    elsif args.first.is_a? Hash
        connection_options = args.first
        url     = connection_options['url']   || connection_options[:url]
        token   = connection_options['token'] || connection_options[:token]
    end

    @instance_connections ||= {}
    @instance_connections[url] ||= Client::Instance.new( url, token )
end

#defer(operation = nil, callback = nil, &block) ⇒ Object

Defers a blocking operation in order to avoid blocking the main Reactor loop.

The operation will be run in its own Thread - DO NOT block forever.

Accepts either 2 parameters (an ‘operation` and a `callback` or an operation as a block.

Parameters:

  • operation (Proc) (defaults to: nil)

    Operation to defer.

  • callback (Proc) (defaults to: nil)

    Block to call with the results of the operation.

  • block (Block)

    Operation to defer.



83
84
85
# File 'lib/cuboid/rpc/server/agent/service.rb', line 83

def defer( operation = nil, callback = nil, &block )
    Thread.new( *[operation, callback].compact, &block )
end

#each_instance(&block) ⇒ Object

Performs an asynchronous iteration over all running instances.

Parameters:



62
63
64
65
66
67
# File 'lib/cuboid/rpc/server/agent/service.rb', line 62

def each_instance( &block )
    wrap = proc do |instance, iterator|
        block.call( connect_to_instance( instance ), iterator )
    end
    iterator_for( instances ).each( &wrap )
end

#instancesArray<Hash>

Returns Alive instances.

Returns:



104
105
106
# File 'lib/cuboid/rpc/server/agent/service.rb', line 104

def instances
    agent.running_instances
end

#iterator_for(list, max_concurrency = 10) ⇒ Raktr::Iterator

Returns Iterator for the provided array.

Parameters:

Returns:

  • (Raktr::Iterator)

    Iterator for the provided array.



98
99
100
# File 'lib/cuboid/rpc/server/agent/service.rb', line 98

def iterator_for( list, max_concurrency = 10 )
    Raktr.global.create_iterator( list, max_concurrency )
end

#map_instances(each, after) ⇒ Object

Performs an asynchronous map operation over all running instances.

Parameters:

  • each (Proc)

    Block to be passed Client::Instance and ‘Raktr::Iterator`.

  • after (Proc)

    Block to be passed the Array of results.



51
52
53
54
55
56
# File 'lib/cuboid/rpc/server/agent/service.rb', line 51

def map_instances( each, after )
    wrap_each = proc do |instance, iterator|
        each.call( connect_to_instance( instance ), iterator )
    end
    iterator_for( instances ).map( wrap_each, after )
end

#nodeServer::Agent::Node

Returns Local node.

Returns:



41
42
43
# File 'lib/cuboid/rpc/server/agent/service.rb', line 41

def node
    agent.instance_eval { @node }
end

#run_asap(&block) ⇒ Object

Runs a block as soon as possible in the Reactor loop.

Parameters:

  • block (Block)


90
91
92
# File 'lib/cuboid/rpc/server/agent/service.rb', line 90

def run_asap( &block )
    Raktr.global.next_tick( &block )
end