Class: Aerospike::Recordset
- Inherits:
-
Object
- Object
- Aerospike::Recordset
- Defined in:
- lib/aerospike/query/recordset.rb
Overview
Recordset implements a queue for a producer-consumer pattern a producer is a thread that fetches records from one node and puts them on this queue a consumer fetches records from this queue so the production and the consumptoin are decoupled there can be an unlimited count of producer threads and consumer threads
Instance Attribute Summary collapse
-
#records ⇒ Object
readonly
Returns the value of attribute records.
Instance Method Summary collapse
-
#active? ⇒ Boolean
recordset is active unless it is cancelled by the user or an exception has occurred in of threads.
-
#cancel(expn = nil) ⇒ Object
this is called by a thread who faced an exception to singnal to terminate the whole operation it also may be called by the user to terminate the command in the middle of fetching records from server nodes it clears the queue so that if any threads are waiting for the queue get unblocked and find out about the cancellation.
-
#each(&block) ⇒ Object
fetches and returns all the records from the queue until the whole operation is finished and it reaches an EOF mark calling cancel inside the each block raises an exception to signal other consumer threads.
-
#initialize(queue_size = 5000, thread_count = 1, type) ⇒ Recordset
constructor
A new instance of Recordset.
-
#is_scan? ⇒ Boolean
the command is a scan if there are no filters applied otherwise it is a query.
-
#next_record ⇒ Object
fetches and return the first record from the queue if the operation is not finished and the queue is empty it blocks and waits for new records it sets the exception if it reaches the EOF mark, and returns nil EOF means the operation has finished and no more records are comming from server nodes it re-raises the exception occurred in threads, or which was set after reaching the EOF in the previous call.
-
#thread_finished(expn = nil) ⇒ Object
this is called by working threads to signal their job is finished it decreases the count of active threads and puts an EOF on queue when all threads are finished e is an exception that has happened in the exceutor, and outside of the threads themselves.
Constructor Details
#initialize(queue_size = 5000, thread_count = 1, type) ⇒ Recordset
Returns a new instance of Recordset.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/aerospike/query/recordset.rb', line 27 def initialize(queue_size = 5000, thread_count = 1, type) queue_size = thread_count if queue_size < thread_count @records = SizedQueue.new(queue_size) # holds the count of active threads. # when it reaches zero it means the whole operations of fetching records from server nodes is finished @active_threads = Atomic.new(thread_count) # operation cancelled by user or an exception occured in one of the threads @cancelled = Atomic.new(false) # saves the exception that occurred inside one of the threads to reraise it in the main thread # and also is a signal to terminate other threads as the whole operation is assumed as failed @thread_exception = Atomic.new(nil) # type of the operation. it is either :scan or :query @type = type end |
Instance Attribute Details
#records ⇒ Object (readonly)
Returns the value of attribute records.
25 26 27 |
# File 'lib/aerospike/query/recordset.rb', line 25 def records @records end |
Instance Method Details
#active? ⇒ Boolean
recordset is active unless it is cancelled by the user or an exception has occurred in of threads
62 63 64 |
# File 'lib/aerospike/query/recordset.rb', line 62 def active? !@cancelled.get end |
#cancel(expn = nil) ⇒ Object
this is called by a thread who faced an exception to singnal to terminate the whole operation it also may be called by the user to terminate the command in the middle of fetching records from server nodes it clears the queue so that if any threads are waiting for the queue get unblocked and find out about the cancellation
82 83 84 85 86 |
# File 'lib/aerospike/query/recordset.rb', line 82 def cancel(expn = nil) set_exception(expn) @cancelled.set(true) @records.clear end |
#each(&block) ⇒ Object
fetches and returns all the records from the queue until the whole operation is finished and it reaches an EOF mark calling cancel inside the each block raises an exception to signal other consumer threads
90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/aerospike/query/recordset.rb', line 90 def each(&block) r = true while r r = next_record # nil means EOF unless r.nil? block.call(r) else # reached the EOF break end end end |
#is_scan? ⇒ Boolean
the command is a scan if there are no filters applied otherwise it is a query
105 106 107 |
# File 'lib/aerospike/query/recordset.rb', line 105 def is_scan? @filters.nil? || @filters.empty? end |
#next_record ⇒ Object
fetches and return the first record from the queue if the operation is not finished and the queue is empty it blocks and waits for new records it sets the exception if it reaches the EOF mark, and returns nil EOF means the operation has finished and no more records are comming from server nodes it re-raises the exception occurred in threads, or which was set after reaching the EOF in the previous call
51 52 53 54 55 56 57 58 59 |
# File 'lib/aerospike/query/recordset.rb', line 51 def next_record raise @thread_exception.get unless @thread_exception.get.nil? r = @records.deq set_exception if r.nil? r end |
#thread_finished(expn = nil) ⇒ Object
this is called by working threads to signal their job is finished it decreases the count of active threads and puts an EOF on queue when all threads are finished e is an exception that has happened in the exceutor, and outside of the threads themselves
69 70 71 72 73 74 75 76 77 |
# File 'lib/aerospike/query/recordset.rb', line 69 def thread_finished(expn = nil) @active_threads.update do |v| v -= 1 @records.enq(nil) if v == 0 v end raise expn unless expn.nil? end |