Class: Nanite::Cluster

Inherits:
Object show all
Defined in:
lib/nanite/cluster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration = nil, callbacks = {}) ⇒ Cluster

Returns a new instance of Cluster.



5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/nanite/cluster.rb', line 5

def initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration=nil, callbacks = {})
  @amq = amq
  @agent_timeout = agent_timeout
  @identity = identity
  @serializer = serializer
  @mapper = mapper
  @state = state_configuration
  @security = SecurityProvider.get
  @callbacks = callbacks
  setup_state
  @reaper = Reaper.new(agent_timeout)
  setup_queues
end

Instance Attribute Details

#agent_timeoutObject (readonly)

Returns the value of attribute agent_timeout.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def agent_timeout
  @agent_timeout
end

#amqObject (readonly)

Returns the value of attribute amq.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def amq
  @amq
end

#callbacksObject (readonly)

Returns the value of attribute callbacks.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def callbacks
  @callbacks
end

#identityObject (readonly)

Returns the value of attribute identity.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def identity
  @identity
end

#mapperObject (readonly)

Returns the value of attribute mapper.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def mapper
  @mapper
end

#nanitesObject (readonly)

Returns the value of attribute nanites.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def nanites
  @nanites
end

#reaperObject (readonly)

Returns the value of attribute reaper.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def reaper
  @reaper
end

#redisObject (readonly)

Returns the value of attribute redis.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def redis
  @redis
end

#serializerObject (readonly)

Returns the value of attribute serializer.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def serializer
  @serializer
end

Instance Method Details

#nanite_timed_out(token) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/nanite/cluster.rb', line 49

def nanite_timed_out(token)
  nanite = nanites[token]
  if nanite && timed_out?(nanite)
    Nanite::Log.info("Nanite #{token} timed out")
    nanite = nanites.delete(token)
    callbacks[:timeout].call(token, mapper) if callbacks[:timeout]
    true
  end
end

#publish(request, target) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/nanite/cluster.rb', line 63

def publish(request, target)
  # We need to initialize the 'target' field of the request object so that the serializer has
  # access to it.
  begin
    old_target = request.target
    request.target = target unless target == 'mapper-offline'
    Nanite::Log.debug("SEND #{request.to_s([:from, :tags, :target])}")
    amq.queue(target).publish(serializer.dump(request), :persistent => request.persistent)
  ensure
    request.target = old_target
  end
end

#register(reg) ⇒ Object

adds nanite to nanites map: key is nanite’s identity and value is a services/status pair implemented as a hash



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/nanite/cluster.rb', line 28

def register(reg)
  case reg
  when Register
    if @security.authorize_registration(reg)
      Nanite::Log.debug("RECV #{reg.to_s}")
      nanites[reg.identity] = { :services => reg.services, :status => reg.status, :tags => reg.tags, :timestamp => Time.now.utc.to_i }
      reaper.register(reg.identity, agent_timeout + 1) { nanite_timed_out(reg.identity) }
      callbacks[:register].call(reg.identity, mapper) if callbacks[:register]
    else
      Nanite::Log.warn("RECV NOT AUTHORIZED #{reg.to_s}")
    end
  when UnRegister
    Nanite::Log.info("RECV #{reg.to_s}")
    reaper.unregister(reg.identity)
    nanites.delete(reg.identity)
    callbacks[:unregister].call(reg.identity, mapper) if callbacks[:unregister]
  else
    Nanite::Log.warn("RECV [register] Invalid packet type: #{reg.class}")
  end
end

#route(request, targets) ⇒ Object



59
60
61
# File 'lib/nanite/cluster.rb', line 59

def route(request, targets)
  EM.next_tick { targets.map { |target| publish(request, target) } }
end

#targets_for(request) ⇒ Object

determine which nanites should receive the given request



20
21
22
23
# File 'lib/nanite/cluster.rb', line 20

def targets_for(request)
  return [request.target] if request.target
  __send__(request.selector, request.type, request.tags).collect {|name, state| name }
end