Class: CZTop::Poller::Aggregated
- Inherits:
-
Object
- Object
- CZTop::Poller::Aggregated
- Extended by:
- Forwardable
- Defined in:
- lib/cztop/poller/aggregated.rb
Overview
This is a poller which is able to provide a list of readable and a list of writable sockets. This is useful for when you need to process socket events in batch, rather than one per event loop iteration.
In particular, this is needed in Celluloid::ZMQ, where in a call to Celluloid::ZMQ::Reactor#run_once all readable/writable sockets need to be processed.
Implementation
It wraps a CZTop::Poller and just does the following to support getting an array of readable/writable sockets:
-
in #wait, poll with given timeout
-
in case there was an event:
-
deregister the corresponding event(s) on the registered socket
-
poll again with zero timeout until no more sockets
-
repeat and accumulate results into two lists
-
Forwarded Methods
The following methods are defined on this class too, and calls are forwarded directly to the actual CZTop::Poller instance:
Instance Attribute Summary collapse
-
#poller ⇒ CZTop::Poller.new
readonly
The associated (regular) poller.
-
#readables ⇒ Array<CZTop::Socket>
readonly
Readable sockets.
-
#writables ⇒ Array<CZTop::Socket>
readonly
Writable sockets.
Instance Method Summary collapse
- #extract(event) ⇒ void private
-
#initialize(poller = CZTop::Poller.new) ⇒ Aggregated
constructor
Initializes the aggregated poller.
-
#restore_event_masks ⇒ void
private
Restores the event mask for all registered sockets to the state they were before the call to #wait.
-
#wait(timeout = -1)) ⇒ Boolean
Forgets all previous event information (which sockets are readable/writable) and waits for events anew.
Constructor Details
#initialize(poller = CZTop::Poller.new) ⇒ Aggregated
Initializes the aggregated poller.
66 67 68 69 70 |
# File 'lib/cztop/poller/aggregated.rb', line 66 def initialize(poller = CZTop::Poller.new) @readables = [] @writables = [] @poller = poller end |
Instance Attribute Details
#poller ⇒ CZTop::Poller.new (readonly)
Returns the associated (regular) poller.
40 41 42 |
# File 'lib/cztop/poller/aggregated.rb', line 40 def poller @poller end |
#readables ⇒ Array<CZTop::Socket> (readonly)
Returns readable sockets.
44 45 46 |
# File 'lib/cztop/poller/aggregated.rb', line 44 def readables @readables end |
#writables ⇒ Array<CZTop::Socket> (readonly)
Returns writable sockets.
48 49 50 |
# File 'lib/cztop/poller/aggregated.rb', line 48 def writables @writables end |
Instance Method Details
#extract(event) ⇒ void (private)
This method returns an undefined value.
Extracts the event information, adds the socket to the correct list(s) and modifies the socket’s event mask for the socket to not turn up again during the next call(s) to CZTop::Poller#wait within #wait.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/cztop/poller/aggregated.rb', line 116 def extract(event) event_mask = poller.event_mask_for_socket(event.socket) @event_masks[event.socket] = event_mask if event.readable? @readables << event.socket event_mask &= 0xFFFF ^ CZTop::Poller::ZMQ::POLLIN end if event.writable? @writables << event.socket event_mask &= 0xFFFF ^ CZTop::Poller::ZMQ::POLLOUT end poller.modify(event.socket, event_mask) end |
#restore_event_masks ⇒ void (private)
This method returns an undefined value.
Restores the event mask for all registered sockets to the state they were before the call to #wait.
137 138 139 |
# File 'lib/cztop/poller/aggregated.rb', line 137 def restore_event_masks @event_masks.each { |socket, mask| poller.modify(socket, mask) } end |
#wait(timeout = -1)) ⇒ Boolean
Forgets all previous event information (which sockets are readable/writable) and waits for events anew. After getting the first event, CZTop::Poller#wait is called again with a zero-timeout to get all pending events to extract them into the aggregated lists of readable and writable sockets.
For every event, the corresponding event mask flag is disabled for the associated socket, so it won’t turn up again. Finally, all event masks are restored to what they were before the call to this method.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/cztop/poller/aggregated.rb', line 86 def wait(timeout = -1) @readables = [] @writables = [] @event_masks = {} if (event = @poller.wait(timeout)) extract(event) # get all other pending events, if any, but no more blocking while (event = @poller.wait 0) extract(event) end restore_event_masks return true end false end |