Class: RSwim::IOLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/rswim/io_loop.rb

Direct Known Subclasses

RSwim::Integration::UDP::IOLoop

Instance Method Summary collapse

Constructor Details

#initialize(agent, serializer, deserializer, directory, sleep_time_seconds) ⇒ IOLoop

Returns a new instance of IOLoop.



5
6
7
8
9
10
11
12
# File 'lib/rswim/io_loop.rb', line 5

def initialize(agent, serializer, deserializer, directory, sleep_time_seconds)
  @agent = agent
  @serializer = serializer
  @deserializer = deserializer
  @directory = directory
  @sleep_time_seconds = sleep_time_seconds
  @read_buffer = []
end

Instance Method Details

#runObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rswim/io_loop.rb', line 14

def run
  Async do
    before_run
    start_producer
    loop do
      in_messages = consume_read_buffer
      logger.debug "advancing agent with #{in_messages.size} messages"
      out_messages = @agent.advance(in_messages)
      wire_messages = out_messages.map do |message|
        wire_message = @serializer.serialize(message)
        host = @directory.host(message.to)
        [host, wire_message]
      end
      logger.debug "sending #{wire_messages.size} messages from agent to other hosts"
      send(wire_messages)
    rescue StandardError => e
      logger.debug("Error in I/O loop: #{e}")
    end
    logger.info 'node no longer receiving'
  end
end