Class: RSwim::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/rswim/node.rb

Direct Known Subclasses

Integration::UDP::Node

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(my_host, seed_hosts, t_ms, r_ms) ⇒ Node

Returns a new instance of Node.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rswim/node.rb', line 9

def initialize(my_host, seed_hosts, t_ms, r_ms)
  RSwim.validate_config!
  @my_host = my_host
  @directory = Directory.new
  @my_id = @directory.id(@my_host)
  serialization = RSwim.encrypted ? Serialization::Encrypted : Serialization::Simple
  @deserializer = serialization::Deserializer.new(@directory, @my_id)
  @serializer = serialization::Serializer.new(@directory)
  @seed_ids = seed_hosts.map { |host| @directory.id(host) }
  @t_ms = t_ms
  @r_ms = r_ms
  @agent = RSwim::Agent::PushBased.new(@my_id, @seed_ids, t_ms, r_ms)
  @sleep_time_seconds = r_ms / 1_000
  @io_loop = create_io_loop
end

Class Method Details

.udp(my_host, seed_hosts, port, t_ms = T_MS, r_ms = R_MS) ⇒ Object



5
6
7
# File 'lib/rswim/node.rb', line 5

def self.udp(my_host, seed_hosts, port, t_ms = T_MS, r_ms = R_MS)
  Integration::UDP::Node.new(my_host, seed_hosts, port, t_ms, r_ms)
end

Instance Method Details

#append_custom_state(key, value) ⇒ Object



32
33
34
# File 'lib/rswim/node.rb', line 32

def append_custom_state(key, value)
  @agent.append_custom_state(key, value)
end

#startObject



36
37
38
39
40
41
42
# File 'lib/rswim/node.rb', line 36

def start
  logger.info 'starting node'
  @agent.run
  @io_loop.run
rescue StandardError => e
  logger.error("Node failed: #{e}")
end

#subscribe(&block) ⇒ Object



25
26
27
28
29
30
# File 'lib/rswim/node.rb', line 25

def subscribe(&block)
  @agent.subscribe do |update_entry|
    host = @directory.host(update_entry.member_id)
    block.call(host, update_entry.status, update_entry.custom_state)
  end
end