Class: RSwim::MemberPool

Inherits:
Object
  • Object
show all
Defined in:
lib/rswim/member_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(node_member_id, seed_member_ids) ⇒ MemberPool

Returns a new instance of MemberPool.



5
6
7
8
9
10
11
12
# File 'lib/rswim/member_pool.rb', line 5

def initialize(node_member_id, seed_member_ids)
  seed_member_ids -= [node_member_id]
  @node_member_id = node_member_id
  @me = Member::Me.new(node_member_id)
  @members = { node_member_id => @me }
  seed_member_ids.each { |id| member(id) }
  @subscribers = []
end

Instance Method Details

#append_custom_state(key, value) ⇒ Object



14
15
16
# File 'lib/rswim/member_pool.rb', line 14

def append_custom_state(key, value)
  @me.append_custom_state(key, value)
end

#forward_ack_to(member_id) ⇒ Object



79
80
81
# File 'lib/rswim/member_pool.rb', line 79

def forward_ack_to(member_id)
  member(member_id).forward_ack
end

#halt_member(member_id) ⇒ Object



83
84
85
# File 'lib/rswim/member_pool.rb', line 83

def halt_member(member_id)
  member(member_id).halt
end

#member_failed_to_reply(member_id) ⇒ Object



97
98
99
# File 'lib/rswim/member_pool.rb', line 97

def member_failed_to_reply(member_id)
  member(member_id).failed_to_reply
end

#member_replied_in_time(member_id) ⇒ Object



93
94
95
# File 'lib/rswim/member_pool.rb', line 93

def member_replied_in_time(member_id)
  member(member_id).replied_in_time
end

#prepare_outputObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rswim/member_pool.rb', line 48

def prepare_output
  ms = @members.values.flat_map(&:prepare_output)
  return ms if ms.empty?

  update_entries = @members.map { |_k, member| member.prepare_update_entry }
                           .sort_by(&:propagation_count) # sort ascending!
                           .take(15) # TODO: constant

  update_entries.each do |entry|
    publish(entry) if entry.propagation_count.zero?
    member(entry.member_id).increment_propagation_count
  end

  ms.each { |message| message.payload[:updates] = update_entries }
  ms
end

#remove_member(member_id) ⇒ Object



87
88
89
90
91
# File 'lib/rswim/member_pool.rb', line 87

def remove_member(member_id)
  raise 'boom' if member_id == @node_member_id

  @members.delete(member_id)
end

#send_ping_request_to_k_members(target_id) ⇒ Object



73
74
75
76
77
# File 'lib/rswim/member_pool.rb', line 73

def send_ping_request_to_k_members(target_id)
  @members.inject([]) { |acc, (id, m)| id != target_id && m.can_be_pinged? ? (acc << m) : acc }
          .take(K)
          .each { |m| m.ping_request!(target_id) }
end

#send_ping_to_random_healthy_memberObject



65
66
67
68
69
70
71
# File 'lib/rswim/member_pool.rb', line 65

def send_ping_to_random_healthy_member
  ms = @members.values.select(&:can_be_pinged?)
  return if ms.empty?

  member = random_member(ms)
  member.ping!
end

#status_reportObject



40
41
42
# File 'lib/rswim/member_pool.rb', line 40

def status_report
  StatusReport.print(@node_member_id, @members)
end

#subscribe(&block) ⇒ Object



44
45
46
# File 'lib/rswim/member_pool.rb', line 44

def subscribe(&block)
  @subscribers << block
end

#update_member(message) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rswim/member_pool.rb', line 18

def update_member(message)
  updates = message.payload[:updates]
  incorporate_gossip(updates) unless updates.nil?

  sender = member(message.from) # NB: records member if not seen before
  case message.type
  when :ping
    @me.schedule_ack(message.from)
  when :ack
    sender.replied_with_ack
  when :ping_req
    target_id = message.payload[:target_id]
    member(target_id).ping_from!(message.from)
  else
    raise 'bad message type'
  end
end

#update_members(elapsed_seconds) ⇒ Object



36
37
38
# File 'lib/rswim/member_pool.rb', line 36

def update_members(elapsed_seconds)
  @members.values.each { |m| m.update(elapsed_seconds) }
end