Class: Nanite::Cluster

Inherits:
Object show all
Defined in:
lib/nanite/cluster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration = nil, tag_store = nil, callbacks = {}) ⇒ Cluster

Returns a new instance of Cluster.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/nanite/cluster.rb', line 5

def initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration=nil, tag_store=nil, callbacks = {})
  @amq = amq
  @agent_timeout = agent_timeout
  @identity = identity
  @serializer = serializer
  @mapper = mapper
  @state = state_configuration
  @tag_store = tag_store
  @security = SecurityProvider.get
  @callbacks = callbacks
  setup_state
  @reaper = Reaper.new(agent_timeout)
  setup_queues
end

Instance Attribute Details

#agent_timeoutObject (readonly)

Returns the value of attribute agent_timeout.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def agent_timeout
  @agent_timeout
end

#amqObject (readonly)

Returns the value of attribute amq.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def amq
  @amq
end

#callbacksObject (readonly)

Returns the value of attribute callbacks.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def callbacks
  @callbacks
end

#identityObject (readonly)

Returns the value of attribute identity.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def identity
  @identity
end

#mapperObject (readonly)

Returns the value of attribute mapper.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def mapper
  @mapper
end

#nanitesObject (readonly)

Returns the value of attribute nanites.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def nanites
  @nanites
end

#reaperObject (readonly)

Returns the value of attribute reaper.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def reaper
  @reaper
end

#redisObject (readonly)

Returns the value of attribute redis.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def redis
  @redis
end

#serializerObject (readonly)

Returns the value of attribute serializer.



3
4
5
# File 'lib/nanite/cluster.rb', line 3

def serializer
  @serializer
end

Instance Method Details

#nanite_timed_out(token) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/nanite/cluster.rb', line 53

def nanite_timed_out(token)
  nanite = nanites[token]
  if nanite && timed_out?(nanite)
    Nanite::Log.info("Nanite #{token} timed out")
    nanite = nanites.delete(token)
    callbacks[:timeout].call(token, mapper) if callbacks && callbacks[:timeout]
    true
  end
end

#publish(request, target) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/nanite/cluster.rb', line 67

def publish(request, target)
  # We need to initialize the 'target' field of the request object so that the serializer and
  # the security provider have access to it.
  begin
    old_target = request.target
    request.target = target unless target == 'mapper-offline'
    if @security.authorize_request(request)
      Nanite::Log.info("SEND #{request.to_s([:from, :scope, :tags, :target])}")
      amq.queue(target).publish(serializer.dump(request), :persistent => request.persistent)
    else
      Nanite::Log.error("RECV NOT AUTHORIZED #{request.to_s}")
    end
  ensure
    request.target = old_target
  end
end

#register(reg) ⇒ Object

adds nanite to nanites map: key is nanite’s identity and value is a services/status pair implemented as a hash



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/nanite/cluster.rb', line 29

def register(reg)
  case reg
  when Register
    if @security.authorize_registration(reg)
      Nanite::Log.info("RECV #{reg.to_s}")
      nanites[reg.identity] = { :services => reg.services, :status => reg.status, :tags => reg.tags, :timestamp => Time.now.utc.to_i }
      reaper.register(reg.identity, agent_timeout + 1) { nanite_timed_out(reg.identity) }
      callbacks[:register].call(reg.identity, mapper) if callbacks && callbacks[:register]
    else
      Nanite::Log.warn("RECV NOT AUTHORIZED #{reg.to_s}")
    end
  when UnRegister
    Nanite::Log.info("RECV #{reg.to_s}")
    reaper.unregister(reg.identity)
    nanites.delete(reg.identity)
    callbacks[:unregister].call(reg.identity, mapper) if callbacks && callbacks[:unregister]
  when TagUpdate
    Nanite::Log.info("RECV #{reg.to_s}")
    nanites.update_tags(reg.identity, reg.new_tags, reg.obsolete_tags)
  else
    Nanite::Log.warn("RECV [register] Invalid packet type: #{reg.class}")
  end
end

#route(request, targets) ⇒ Object



63
64
65
# File 'lib/nanite/cluster.rb', line 63

def route(request, targets)
  EM.next_tick { targets.map { |target| publish(request, target) } }
end

#targets_for(request, include_timed_out) ⇒ Object

determine which nanites should receive the given request



21
22
23
24
# File 'lib/nanite/cluster.rb', line 21

def targets_for(request, include_timed_out)
  return [request.target] if request.target
  __send__(request.selector, request, include_timed_out)
end