Class: RSwim::MemberPool
- Inherits:
-
Object
- Object
- RSwim::MemberPool
- Defined in:
- lib/rswim/member_pool.rb
Instance Method Summary collapse
- #append_custom_state(key, value) ⇒ Object
- #forward_ack_to(member_id) ⇒ Object
- #halt_member(member_id) ⇒ Object
-
#initialize(node_member_id, seed_member_ids) ⇒ MemberPool
constructor
A new instance of MemberPool.
- #member_failed_to_reply(member_id) ⇒ Object
- #member_replied_in_time(member_id) ⇒ Object
- #prepare_output ⇒ Object
- #remove_member(member_id) ⇒ Object
- #send_ping_request_to_k_members(target_id) ⇒ Object
- #send_ping_to_random_healthy_member ⇒ Object
- #status_report ⇒ Object
- #subscribe(&block) ⇒ Object
- #update_member(message) ⇒ Object
- #update_members(elapsed_seconds) ⇒ Object
Constructor Details
#initialize(node_member_id, seed_member_ids) ⇒ MemberPool
Returns a new instance of MemberPool.
5 6 7 8 9 10 11 12 |
# File 'lib/rswim/member_pool.rb', line 5 def initialize(node_member_id, seed_member_ids) seed_member_ids -= [node_member_id] @node_member_id = node_member_id @me = Member::Me.new(node_member_id) @members = { node_member_id => @me } seed_member_ids.each { |id| member(id) } @subscribers = [] end |
Instance Method Details
#append_custom_state(key, value) ⇒ Object
14 15 16 |
# File 'lib/rswim/member_pool.rb', line 14 def append_custom_state(key, value) @me.append_custom_state(key, value) end |
#forward_ack_to(member_id) ⇒ Object
79 80 81 |
# File 'lib/rswim/member_pool.rb', line 79 def forward_ack_to(member_id) member(member_id).forward_ack end |
#halt_member(member_id) ⇒ Object
83 84 85 |
# File 'lib/rswim/member_pool.rb', line 83 def halt_member(member_id) member(member_id).halt end |
#member_failed_to_reply(member_id) ⇒ Object
97 98 99 |
# File 'lib/rswim/member_pool.rb', line 97 def member_failed_to_reply(member_id) member(member_id).failed_to_reply end |
#member_replied_in_time(member_id) ⇒ Object
93 94 95 |
# File 'lib/rswim/member_pool.rb', line 93 def member_replied_in_time(member_id) member(member_id).replied_in_time end |
#prepare_output ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rswim/member_pool.rb', line 48 def prepare_output ms = @members.values.flat_map(&:prepare_output) return ms if ms.empty? update_entries = @members.map { |_k, member| member.prepare_update_entry } .sort_by(&:propagation_count) # sort ascending! .take(15) # TODO: constant update_entries.each do |entry| publish(entry) if entry.propagation_count.zero? member(entry.member_id).increment_propagation_count end ms.each { || .payload[:updates] = update_entries } ms end |
#remove_member(member_id) ⇒ Object
87 88 89 90 91 |
# File 'lib/rswim/member_pool.rb', line 87 def remove_member(member_id) raise 'boom' if member_id == @node_member_id @members.delete(member_id) end |
#send_ping_request_to_k_members(target_id) ⇒ Object
73 74 75 76 77 |
# File 'lib/rswim/member_pool.rb', line 73 def send_ping_request_to_k_members(target_id) @members.inject([]) { |acc, (id, m)| id != target_id && m.can_be_pinged? ? (acc << m) : acc } .take(K) .each { |m| m.ping_request!(target_id) } end |
#send_ping_to_random_healthy_member ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/rswim/member_pool.rb', line 65 def send_ping_to_random_healthy_member ms = @members.values.select(&:can_be_pinged?) return if ms.empty? member = random_member(ms) member.ping! end |
#status_report ⇒ Object
40 41 42 |
# File 'lib/rswim/member_pool.rb', line 40 def status_report StatusReport.print(@node_member_id, @members) end |
#subscribe(&block) ⇒ Object
44 45 46 |
# File 'lib/rswim/member_pool.rb', line 44 def subscribe(&block) @subscribers << block end |
#update_member(message) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/rswim/member_pool.rb', line 18 def update_member() updates = .payload[:updates] incorporate_gossip(updates) unless updates.nil? sender = member(.from) # NB: records member if not seen before case .type when :ping @me.schedule_ack(.from) when :ack sender.replied_with_ack when :ping_req target_id = .payload[:target_id] member(target_id).ping_from!(.from) else raise 'bad message type' end end |
#update_members(elapsed_seconds) ⇒ Object
36 37 38 |
# File 'lib/rswim/member_pool.rb', line 36 def update_members(elapsed_seconds) @members.values.each { |m| m.update(elapsed_seconds) } end |