Class: Hoodoo::Communicators::Pool
- Inherits:
-
Object
- Object
- Hoodoo::Communicators::Pool
- Defined in:
- lib/hoodoo/communicators/pool.rb
Overview
Maintains a pool of object instances which are expected to be communicating with “the outside world” in some way. A message sent to the pool is replicated to all the communicators in that pool. Some communicators are fast, which means they are called synchronously and expected to return very quickly. Some communicators are slow, which means they are called asynchronously through a work queue.
See #add for more information.
Defined Under Namespace
Classes: QueueEntry, QueueWithTimeout
Constant Summary collapse
- MAX_SLOW_QUEUE_SIZE =
Hoodoo::Communicators::Slow subclass communicators are called in their own Threads via a processing Queue. There is the potential for a flood of communications to cause the queue to back up considerably, so a maximum number of messages is defined. If the queue size is _equal to or greater_ than this amount when a message arrives, it will be dropped and a ‘dropped message’ count incremented.
50
- THREAD_EXIT_TIMEOUT =
When asking slow communicator threads to exit, a timeout must be used in case the thread doesn’t seem to be responsive. This is the timeout value in seconds - it can take a floating point or integer value.
5
- THREAD_WAIT_TIMEOUT =
Analogous to THREAD_WAIT_TIMEOUT but used when waiting for a processing Thread to drain its Queue, without asking it to exit.
5
Instance Attribute Summary collapse
-
#group ⇒ Object
Retrieve the ThreadGroup instance managing the collection of slow communicator threads.
Instance Method Summary collapse
-
#add(communicator) ⇒ Object
Add a communicator instance to the pool.
-
#communicate(object) ⇒ Object
Call the #communicate method on each communicator instance added via #add.
-
#initialize ⇒ Pool
constructor
Create a new pool of communicators - instances of subclasses of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow, are added with #add and called with #communicate.
-
#remove(communicator) ⇒ Object
Remove a communicator previously added by #add.
-
#terminate(per_instance_timeout: THREAD_EXIT_TIMEOUT) ⇒ Object
The communication pool is “emptied” by this call, going back to a clean state as if just initialised.
-
#wait(per_instance_timeout: THREAD_WAIT_TIMEOUT, communicator: nil) ⇒ Object
This method is only useful if there are any Hoodoo::Communicators::Slow subclass instances in the communication pool.
Constructor Details
#initialize ⇒ Pool
Create a new pool of communicators - instances of subclasses of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow, are added with #add and called with #communicate.
59 60 61 62 |
# File 'lib/hoodoo/communicators/pool.rb', line 59 def initialize @pool = {} @group = ::ThreadGroup.new end |
Instance Attribute Details
#group ⇒ Object
Retrieve the ThreadGroup instance managing the collection of slow communicator threads. This is mostly used for testing purposes and has little general purpose utility.
53 54 55 |
# File 'lib/hoodoo/communicators/pool.rb', line 53 def group @group end |
Instance Method Details
#add(communicator) ⇒ Object
Add a communicator instance to the pool. Future calls to #communicate will call the same-named method in that instance.
Subclasses of Hoodoo::Communicators::Slow are called within a processing Thread. Subclasses of Hoodoo::Communicators::Fast are called inline. The instances are called in the order of addition, but since each slow communicator runs in its own Thread, the execution order is indeterminate for such instances.
If a slow communicator’s inbound message queue length matches or exceeds MAX_SLOW_QUEUE_SIZE, messages for that specific communicator will start being dropped until the communicator clears the backlog and at last one space opens on the queue. Slow communicators can detect when this has happened by implementing Hoodoo::Communicators::Slow#dropped in the subclass.
If you pass the same instance more than once, the subsequent calls are ignored. You can add many instances of the same class if that’s useful for any reason.
Returns the passed-in communicator instance parameter, for convenience.
communicator
-
Instance is to be added to the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/hoodoo/communicators/pool.rb', line 90 def add( communicator ) unless ( communicator.class < Hoodoo::Communicators::Fast || communicator.class < Hoodoo::Communicators::Slow ) raise "Hoodoo::Communicators::Pool\#add must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only" end return if @pool.has_key?( communicator ) if communicator.is_a?( Hoodoo::Communicators::Fast ) add_fast_communicator( communicator ) else add_slow_communicator( communicator ) end return communicator end |
#communicate(object) ⇒ Object
Call the #communicate method on each communicator instance added via #add. Each instance is called in the same order as corresponding calls are made to the pool. Across instances, fast communicators are called in the order they were added to the pool, but since each slow communicator runs in its own Thread, execution order is indeterminate.
object
-
Parameter passed to the communicator subclass instance #communicate methods.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/hoodoo/communicators/pool.rb', line 149 def communicate( object ) @pool.each do | communicator, item | if item.has_key?( :fast ) begin communicator.communicate( object ) rescue => exception handle_exception( exception, communicator, object ) end else data = item[ :slow ] thread = data[ :thread ] work_queue = data[ :work_queue ] # This is inaccurate if one or more "dropped messages" reports are # on the queue, but since some communicators might report them in # the same way as other messages, it's not necessarily incorrect # either. # if work_queue.size < MAX_SLOW_QUEUE_SIZE dropped = thread[ :dropped_messages ] if dropped > 0 thread[ :dropped_messages ] = 0 # Opposite of comment above on MAX_SLOW_QUEUE_SIZE check... # Yes, this takes up a queue entry and the payload addition # afterwards might take it one above max size, but that's OK # since this is just a "dropped messages" report and though # some communicators might deal with them slowly, others may # just ignore them. # work_queue << QueueEntry.new( dropped: dropped ) end work_queue << QueueEntry.new( payload: object ) else thread[ :dropped_messages ] += 1 end end end end |
#remove(communicator) ⇒ Object
Remove a communicator previously added by #add. See that for details.
It is harmless to try and remove communicator instances more than once or to try to remove something that was never added in the first place; the call simply has no side effects.
If removing a slow communicator, its thread will be terminated with default timeout value of THREAD_EXIT_TIMEOUT seconds. For this reason, removing a slow communicator may take a long time.
Returns the passed-in communicator instance parameter, for convenience.
communicator
-
Instance is to be removed from the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/hoodoo/communicators/pool.rb', line 123 def remove( communicator ) unless ( communicator.class < Hoodoo::Communicators::Fast || communicator.class < Hoodoo::Communicators::Slow ) raise "Hoodoo::Communicators::Pool\#remove must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only" end return unless @pool.has_key?( communicator ) if communicator.is_a?( Hoodoo::Communicators::Fast ) remove_fast_communicator( communicator ) else remove_slow_communicator( communicator ) end return communicator end |
#terminate(per_instance_timeout: THREAD_EXIT_TIMEOUT) ⇒ Object
The communication pool is “emptied” by this call, going back to a clean state as if just initialised. New workers can be added via #add and then called via #communicate if you so wish.
Hoodoo::Communciators::Fast subclass instances are removed immediately without complications.
Hoodoo::Communicators::Slow subclass instances in the communication pool are called via a worker Thread; this method shuts down all such worker Threads, clearing their work queues and asking each one to exit (politely). There is no mechanism (other than overall Ruby process exit) available to shut down the Threads by force, so some Threads may not respond and time out.
When this method exits, all workers will have either exited or timed out and possibly still be running, but are considered too slow or dead. No further communications are made to them.
The following named parameters are supported:
per_instance_timeout
-
Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_EXIT_TIMEOUT. For example, with three slow communicators in the pool and all three reached a 5 second timeout, the termination method would not return for 15 seconds (3 * 5 seconds full timeout).
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/hoodoo/communicators/pool.rb', line 277 def terminate( per_instance_timeout: THREAD_EXIT_TIMEOUT ) loop do klass, item = @pool.shift() # Hash#shift -> remove a key/value pair. break if klass.nil? next unless item.has_key?( :slow ) data = item[ :slow ] request_termination_for( thread: data[ :thread ], work_queue: data[ :work_queue ], timeout: per_instance_timeout ) end end |
#wait(per_instance_timeout: THREAD_WAIT_TIMEOUT, communicator: nil) ⇒ Object
This method is only useful if there are any Hoodoo::Communicators::Slow subclass instances in the communication pool. Each instance is called via a worker Thread; this method waits for each communicator to drain its queue before returning. This is useful if you have a requirement to wait for all communications to finish on all threads, presumably for wider synchronisation reasons.
Since fast communicators are called synchronously there is never any need to wait for them, so this call ignores such pool entries.
The following named parameters are supported:
per_instance_timeout
-
Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_WAIT_TIMEOUT.
communicator
-
If you want to wait for specific instance only (see #add), pass it here. If the instance is a fast communicator, or any object not added to the pool, then there is no error raised. The method simply returns immediately.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/hoodoo/communicators/pool.rb', line 218 def wait( per_instance_timeout: THREAD_WAIT_TIMEOUT, communicator: nil ) if communicator.nil? @pool.each do | communicator, item | next unless item.has_key?( :slow ) data = item[ :slow ] wait_for( work_queue: data[ :work_queue ], sync_queue: data[ :sync_queue ], timeout: per_instance_timeout ) end else return unless @pool.has_key?( communicator ) item = @pool[ communicator ] return unless item.has_key?( :slow ) data = item[ :slow ] wait_for( work_queue: data[ :work_queue ], sync_queue: data[ :sync_queue ], timeout: per_instance_timeout ) end end |