Class: Nanite::Cluster
Instance Attribute Summary collapse
-
#agent_timeout ⇒ Object
readonly
Returns the value of attribute agent_timeout.
-
#amq ⇒ Object
readonly
Returns the value of attribute amq.
-
#callbacks ⇒ Object
readonly
Returns the value of attribute callbacks.
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#mapper ⇒ Object
readonly
Returns the value of attribute mapper.
-
#nanites ⇒ Object
readonly
Returns the value of attribute nanites.
-
#reaper ⇒ Object
readonly
Returns the value of attribute reaper.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#serializer ⇒ Object
readonly
Returns the value of attribute serializer.
Instance Method Summary collapse
-
#initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration = nil, callbacks = {}) ⇒ Cluster
constructor
A new instance of Cluster.
- #nanite_timed_out(token) ⇒ Object
- #publish(request, target) ⇒ Object
-
#register(reg) ⇒ Object
adds nanite to nanites map: key is nanite’s identity and value is a services/status pair implemented as a hash.
- #route(request, targets) ⇒ Object
-
#targets_for(request) ⇒ Object
determine which nanites should receive the given request.
Constructor Details
#initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration = nil, callbacks = {}) ⇒ Cluster
Returns a new instance of Cluster.
5 6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/nanite/cluster.rb', line 5 def initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration=nil, callbacks = {}) @amq = amq @agent_timeout = agent_timeout @identity = identity @serializer = serializer @mapper = mapper @state = state_configuration @security = SecurityProvider.get @callbacks = callbacks setup_state @reaper = Reaper.new(agent_timeout) setup_queues end |
Instance Attribute Details
#agent_timeout ⇒ Object (readonly)
Returns the value of attribute agent_timeout.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def agent_timeout @agent_timeout end |
#amq ⇒ Object (readonly)
Returns the value of attribute amq.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def amq @amq end |
#callbacks ⇒ Object (readonly)
Returns the value of attribute callbacks.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def callbacks @callbacks end |
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def identity @identity end |
#mapper ⇒ Object (readonly)
Returns the value of attribute mapper.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def mapper @mapper end |
#nanites ⇒ Object (readonly)
Returns the value of attribute nanites.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def nanites @nanites end |
#reaper ⇒ Object (readonly)
Returns the value of attribute reaper.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def reaper @reaper end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def redis @redis end |
#serializer ⇒ Object (readonly)
Returns the value of attribute serializer.
3 4 5 |
# File 'lib/nanite/cluster.rb', line 3 def serializer @serializer end |
Instance Method Details
#nanite_timed_out(token) ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/nanite/cluster.rb', line 49 def nanite_timed_out(token) nanite = nanites[token] if nanite && timed_out?(nanite) Nanite::Log.info("Nanite #{token} timed out") nanite = nanites.delete(token) callbacks[:timeout].call(token, mapper) if callbacks[:timeout] true end end |
#publish(request, target) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/nanite/cluster.rb', line 63 def publish(request, target) # We need to initialize the 'target' field of the request object so that the serializer has # access to it. begin old_target = request.target request.target = target unless target == 'mapper-offline' Nanite::Log.debug("SEND #{request.to_s([:from, :tags, :target])}") amq.queue(target).publish(serializer.dump(request, enforce_format?(target)), :persistent => request.persistent) ensure request.target = old_target end end |
#register(reg) ⇒ Object
adds nanite to nanites map: key is nanite’s identity and value is a services/status pair implemented as a hash
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/nanite/cluster.rb', line 28 def register(reg) case reg when Register if @security.(reg) Nanite::Log.debug("RECV #{reg.to_s}") nanites[reg.identity] = { :services => reg.services, :status => reg.status, :tags => reg., :timestamp => Time.now.utc.to_i } reaper.register(reg.identity, agent_timeout + 1) { nanite_timed_out(reg.identity) } callbacks[:register].call(reg.identity, mapper) if callbacks[:register] else Nanite::Log.warn("RECV NOT AUTHORIZED #{reg.to_s}") end when UnRegister Nanite::Log.info("RECV #{reg.to_s}") reaper.unregister(reg.identity) nanites.delete(reg.identity) callbacks[:unregister].call(reg.identity, mapper) if callbacks[:unregister] else Nanite::Log.warn("RECV [register] Invalid packet type: #{reg.class}") end end |
#route(request, targets) ⇒ Object
59 60 61 |
# File 'lib/nanite/cluster.rb', line 59 def route(request, targets) EM.next_tick { targets.map { |target| publish(request, target) } } end |
#targets_for(request) ⇒ Object
determine which nanites should receive the given request
20 21 22 23 |
# File 'lib/nanite/cluster.rb', line 20 def targets_for(request) return [request.target] if request.target __send__(request.selector, request.type, request.).collect {|name, state| name } end |