Class: Qwirk::Adapter::InMemory::Queue
- Inherits:
-
Object
- Object
- Qwirk::Adapter::InMemory::Queue
- Defined in:
- lib/qwirk/adapter/in_memory/queue.rb
Instance Attribute Summary collapse
-
#max_size ⇒ Object
TODO: Look into reimplementing using a Ruby Queue which is probably better performant Size of the queue before it write-blocks.
-
#name ⇒ Object
TODO: Look into reimplementing using a Ruby Queue which is probably better performant Size of the queue before it write-blocks.
Instance Method Summary collapse
-
#initialize(name) ⇒ Queue
constructor
A new instance of Queue.
- #interrupt_read ⇒ Object
-
#read(stoppable) ⇒ Object
Block read until a message or we get stopped.
- #size ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #to_s ⇒ Object
- #write(obj) ⇒ Object
Constructor Details
#initialize(name) ⇒ Queue
Returns a new instance of Queue.
11 12 13 14 15 16 17 18 19 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 11 def initialize(name) @name = name @max_size = 0 @array_mutex = Mutex.new @read_condition = ConditionVariable.new @write_condition = ConditionVariable.new @array = [] @stopping = false end |
Instance Attribute Details
#max_size ⇒ Object
TODO: Look into reimplementing using a Ruby Queue which is probably better performant Size of the queue before it write-blocks. If 0, messages will be dropped. If -1, then it’s unlimited. TODO: Should implement a queue_full_strategy which would be publish_block, drop_oldest, drop_newest
9 10 11 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 9 def max_size @max_size end |
#name ⇒ Object
TODO: Look into reimplementing using a Ruby Queue which is probably better performant Size of the queue before it write-blocks. If 0, messages will be dropped. If -1, then it’s unlimited. TODO: Should implement a queue_full_strategy which would be publish_block, drop_oldest, drop_newest
9 10 11 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 9 def name @name end |
Instance Method Details
#interrupt_read ⇒ Object
40 41 42 43 44 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 40 def interrupt_read @array_mutex.synchronize do @read_condition.broadcast end end |
#read(stoppable) ⇒ Object
Block read until a message or we get stopped.
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 47 def read(stoppable) @array_mutex.synchronize do until stoppable.stopped || (@stopping && @array.empty?) if @array.empty? @read_condition.wait(@array_mutex) else @write_condition.signal return @array.shift end end end return nil end |
#size ⇒ Object
21 22 23 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 21 def size @array.size end |
#stop ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 25 def stop return if @stopping @stopping = true @array_mutex.synchronize do @write_condition.broadcast @read_condition.broadcast end end |
#stopped? ⇒ Boolean
34 35 36 37 38 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 34 def stopped? @array_mutex.synchronize do return @stopping && @array.empty? end end |
#to_s ⇒ Object
81 82 83 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 81 def to_s "queue:#{@name}" end |
#write(obj) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 61 def write(obj) @array_mutex.synchronize do # We just drop the message if no workers have been configured yet while !@stopping if @max_size == 0 Qwirk.logger.warn "No worker for queue #{@name}, dropping message #{obj.inspect}" return end if @max_size < 0 || @array.size < @max_size @array << obj @read_condition.signal return end # TODO: Let's allow various write_full_modes such as :block, :remove_oldest, ? (Currently only blocks) @write_condition.wait(@array_mutex) end Qwirk.logger.warn "Queue has been stopped #{@name}, dropping message #{obj.inspect}" end end |