Class: Nanite::MapperProxy

Inherits:
Object show all
Includes:
AMQPHelper
Defined in:
lib/nanite/mapper_proxy.rb

Overview

This class allows sending requests to nanite agents without having to run a local mapper. It is used by Actor.request which can be used by actors than need to send requests to remote agents. All requests go through the mapper for security purposes.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from AMQPHelper

#start_amqp

Constructor Details

#initialize(id, opts) ⇒ MapperProxy

Returns a new instance of MapperProxy.



22
23
24
25
26
27
28
29
# File 'lib/nanite/mapper_proxy.rb', line 22

def initialize(id, opts)
  @options = opts || {}
  @identity = id
  @pending_requests = {}
  @amqp = start_amqp(options)
  @serializer = Serializer.new(options[:format])
  @@instance = self
end

Instance Attribute Details

#amqpObject

Returns the value of attribute amqp.



15
16
17
# File 'lib/nanite/mapper_proxy.rb', line 15

def amqp
  @amqp
end

#identityObject

Returns the value of attribute identity.



15
16
17
# File 'lib/nanite/mapper_proxy.rb', line 15

def identity
  @identity
end

#optionsObject

Returns the value of attribute options.



15
16
17
# File 'lib/nanite/mapper_proxy.rb', line 15

def options
  @options
end

#pending_requestsObject

Returns the value of attribute pending_requests.



15
16
17
# File 'lib/nanite/mapper_proxy.rb', line 15

def pending_requests
  @pending_requests
end

#serializerObject

Returns the value of attribute serializer.



15
16
17
# File 'lib/nanite/mapper_proxy.rb', line 15

def serializer
  @serializer
end

Class Method Details

.instanceObject

Accessor for actor



18
19
20
# File 'lib/nanite/mapper_proxy.rb', line 18

def self.instance
  @@instance if defined?(@@instance)
end

Instance Method Details

#handle_intermediate_result(res) ⇒ Object

Handle intermediary result



75
76
77
78
# File 'lib/nanite/mapper_proxy.rb', line 75

def handle_intermediate_result(res)
  handlers = pending_requests[res.token]
  handlers[:intermediate_handler].call(res) if handlers && handlers[:intermediate_handler]
end

#handle_result(res) ⇒ Object

Handle final result



81
82
83
84
# File 'lib/nanite/mapper_proxy.rb', line 81

def handle_result(res)
  handlers = pending_requests.delete(res.token)
  handlers[:result_handler].call(res) if handlers && handlers[:result_handler]
end

#push(type, payload = '', opts = {}) ⇒ Object

Send push to given agent through the mapper



45
46
47
48
49
50
51
52
53
# File 'lib/nanite/mapper_proxy.rb', line 45

def push(type, payload = '', opts = {})
  raise "Mapper proxy not initialized" unless identity && options
  push = Push.new(type, payload, opts)
  push.from = identity
  push.token = Identity.generate
  push.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
  Nanite::Log.info("SEND #{push.to_s([:tags, :target])}")
  amqp.fanout('request', :no_declare => options[:secure]).publish(serializer.dump(push))
end

#query_tags(opts, &blk) ⇒ Object

Send tag query to mapper



56
57
58
59
60
61
62
63
64
# File 'lib/nanite/mapper_proxy.rb', line 56

def query_tags(opts, &blk)
  raise "Mapper proxy not initialized" unless identity && options
  tag_query = TagQuery.new(identity, opts)
  tag_query.token = Identity.generate
  tag_query.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]      
  pending_requests[tag_query.token] = { :result_handler => blk }
  Nanite::Log.info("SEND #{tag_query.to_s}")
  amqp.fanout('request', :no_declare => options[:secure]).publish(serializer.dump(tag_query))
end

#request(type, payload = '', opts = {}, &blk) ⇒ Object

Send request to given agent through the mapper



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/nanite/mapper_proxy.rb', line 32

def request(type, payload = '', opts = {}, &blk)
  raise "Mapper proxy not initialized" unless identity && options
  request = Request.new(type, payload, opts)
  request.from = identity
  request.token = Identity.generate
  request.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
  pending_requests[request.token] = 
    { :intermediate_handler => opts[:intermediate_handler], :result_handler => blk }
  Nanite::Log.info("SEND #{request.to_s([:tags, :target])}")
  amqp.fanout('request', :no_declare => options[:secure]).publish(serializer.dump(request))
end

#update_tags(new_tags, obsolete_tags) ⇒ Object

Update tags registered by mapper for agent



67
68
69
70
71
72
# File 'lib/nanite/mapper_proxy.rb', line 67

def update_tags(new_tags, obsolete_tags)
  raise "Mapper proxy not initialized" unless identity && options
  update = TagUpdate.new(identity, new_tags, obsolete_tags)
  Nanite::Log.info("SEND #{update.to_s}")
  amqp.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(update))
end