Class: SuperQueue
- Inherits:
-
Object
- Object
- SuperQueue
- Defined in:
- lib/super_queue.rb
Class Method Summary collapse
Instance Method Summary collapse
- #clear ⇒ Object
- #destroy ⇒ Object
- #empty? ⇒ Boolean
-
#initialize(opts) ⇒ SuperQueue
constructor
A new instance of SuperQueue.
- #length ⇒ Object (also: #size)
- #localized? ⇒ Boolean
- #name ⇒ Object
- #num_waiting ⇒ Object
- #pop(non_block = false) ⇒ Object (also: #deq, #shift)
- #push(p) ⇒ Object (also: #enq, #<<)
- #shutdown ⇒ Object
- #sqs_requests ⇒ Object
- #url ⇒ Object
Constructor Details
#initialize(opts) ⇒ SuperQueue
Returns a new instance of SuperQueue.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/super_queue.rb', line 18 def initialize(opts) check_opts(opts) opts[:localize_queue] = true unless opts.has_key? :localized_queue @buffer_size = opts[:buffer_size] || 100 @localize_queue = opts[:localize_queue] @queue_name = generate_queue_name(opts) @request_count = 0 initialize_sqs(opts) @waiting = [] @waiting.taint self.taint @mutex = Mutex.new @in_buffer = [] @out_buffer = [] @deletion_queue = [] @mock_length = 0 if SuperQueue.mocking? @compressor = Zlib::Deflate.new @decompressor = Zlib::Inflate.new @sqs_tracker = Thread.new { poll_sqs } @gc = Thread.new { collect_garbage } end |
Class Method Details
.mock! ⇒ Object
9 10 11 12 |
# File 'lib/super_queue.rb', line 9 def self.mock! @@mock = true Fog.mock! end |
.mocking? ⇒ Boolean
14 15 16 |
# File 'lib/super_queue.rb', line 14 def self.mocking? defined?(@@mock) && @@mock end |
Instance Method Details
#clear ⇒ Object
88 89 90 91 92 93 94 |
# File 'lib/super_queue.rb', line 88 def clear begin self.pop(true) rescue ThreadError retry unless self.empty? end until self.empty? end |
#destroy ⇒ Object
103 104 105 106 107 |
# File 'lib/super_queue.rb', line 103 def destroy @sqs_tracker.terminate @gc.terminate delete_queue end |
#empty? ⇒ Boolean
80 81 82 |
# File 'lib/super_queue.rb', line 80 def empty? self.length == 0 end |
#length ⇒ Object Also known as: size
74 75 76 77 78 |
# File 'lib/super_queue.rb', line 74 def length @mutex.synchronize { return sqs_length + @in_buffer.size + @out_buffer.size } end |
#localized? ⇒ Boolean
129 130 131 |
# File 'lib/super_queue.rb', line 129 def localized? !!@localize_queue end |
#name ⇒ Object
125 126 127 |
# File 'lib/super_queue.rb', line 125 def name queue_name end |
#num_waiting ⇒ Object
84 85 86 |
# File 'lib/super_queue.rb', line 84 def num_waiting @waiting.size end |
#pop(non_block = false) ⇒ Object Also known as: deq, shift
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/super_queue.rb', line 56 def pop(non_block=false) @mutex.synchronize { while true if @out_buffer.empty? if fill_out_buffer_from_sqs_queue || fill_out_buffer_from_in_buffer return pop_out_buffer else raise ThreadError, "queue empty" if non_block @waiting.push Thread.current @mutex.sleep end else return pop_out_buffer end end } end |
#push(p) ⇒ Object Also known as: enq, <<
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/super_queue.rb', line 43 def push(p) @mutex.synchronize { @in_buffer.push p clear_in_buffer if @in_buffer.size >= @buffer_size begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end } end |
#shutdown ⇒ Object
96 97 98 99 100 101 |
# File 'lib/super_queue.rb', line 96 def shutdown @sqs_tracker.terminate @mutex.synchronize { clear_in_buffer } @gc.terminate @mutex.synchronize { clear_deletion_queue } end |
#sqs_requests ⇒ Object
109 110 111 |
# File 'lib/super_queue.rb', line 109 def sqs_requests @request_count end |
#url ⇒ Object
121 122 123 |
# File 'lib/super_queue.rb', line 121 def url q_url end |