Class: NatsWork::QueueManager
- Inherits:
-
Object
- Object
- NatsWork::QueueManager
- Defined in:
- lib/natswork/queue_manager.rb
Instance Attribute Summary collapse
-
#concurrency_limits ⇒ Object
readonly
Returns the value of attribute concurrency_limits.
-
#priorities ⇒ Object
readonly
Returns the value of attribute priorities.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Instance Method Summary collapse
- #add_queue(queue, &block) ⇒ Object
- #drain(queue, timeout: 30) ⇒ Object
- #drain_all(timeout: 30) ⇒ Object
-
#initialize(connection) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #pause(queue) ⇒ Object
- #pause_all ⇒ Object
- #paused?(queue) ⇒ Boolean
- #paused_queues ⇒ Object
- #remove_queue(queue) ⇒ Object
- #resume(queue) ⇒ Object
- #resume_all ⇒ Object
- #set_concurrency(queue, limit) ⇒ Object
- #set_priority(queue, priority) ⇒ Object
- #stats ⇒ Object
- #subscribe(queue, &block) ⇒ Object
- #unsubscribe(queue) ⇒ Object
- #unsubscribe_all ⇒ Object
Constructor Details
#initialize(connection) ⇒ QueueManager
Returns a new instance of QueueManager.
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/natswork/queue_manager.rb', line 9 def initialize(connection) @connection = connection @subscriptions = Concurrent::Hash.new @callbacks = Concurrent::Hash.new @priorities = Concurrent::Hash.new @concurrency_limits = Concurrent::Hash.new @paused_queues = Concurrent::Array.new @active_counts = Concurrent::Hash.new { 0 } @mutex = Mutex.new end |
Instance Attribute Details
#concurrency_limits ⇒ Object (readonly)
Returns the value of attribute concurrency_limits.
7 8 9 |
# File 'lib/natswork/queue_manager.rb', line 7 def concurrency_limits @concurrency_limits end |
#priorities ⇒ Object (readonly)
Returns the value of attribute priorities.
7 8 9 |
# File 'lib/natswork/queue_manager.rb', line 7 def priorities @priorities end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
7 8 9 |
# File 'lib/natswork/queue_manager.rb', line 7 def subscriptions @subscriptions end |
Instance Method Details
#add_queue(queue, &block) ⇒ Object
50 51 52 |
# File 'lib/natswork/queue_manager.rb', line 50 def add_queue(queue, &block) subscribe(queue, &block) end |
#drain(queue, timeout: 30) ⇒ Object
90 91 92 93 94 95 96 97 98 |
# File 'lib/natswork/queue_manager.rb', line 90 def drain(queue, timeout: 30) deadline = Time.now + timeout # Process any pending messages (queue) # Wait for timeout sleep 0.1 while Time.now < deadline end |
#drain_all(timeout: 30) ⇒ Object
100 101 102 103 104 105 |
# File 'lib/natswork/queue_manager.rb', line 100 def drain_all(timeout: 30) threads = @subscriptions.keys.map do |queue| Thread.new { drain(queue, timeout: timeout) } end threads.each(&:join) end |
#pause(queue) ⇒ Object
58 59 60 |
# File 'lib/natswork/queue_manager.rb', line 58 def pause(queue) @paused_queues << queue unless @paused_queues.include?(queue) end |
#pause_all ⇒ Object
62 63 64 |
# File 'lib/natswork/queue_manager.rb', line 62 def pause_all @subscriptions.each_key { |queue| pause(queue) } end |
#paused?(queue) ⇒ Boolean
74 75 76 |
# File 'lib/natswork/queue_manager.rb', line 74 def paused?(queue) @paused_queues.include?(queue) end |
#paused_queues ⇒ Object
78 79 80 |
# File 'lib/natswork/queue_manager.rb', line 78 def paused_queues @paused_queues.to_a end |
#remove_queue(queue) ⇒ Object
54 55 56 |
# File 'lib/natswork/queue_manager.rb', line 54 def remove_queue(queue) unsubscribe(queue) end |
#resume(queue) ⇒ Object
66 67 68 |
# File 'lib/natswork/queue_manager.rb', line 66 def resume(queue) @paused_queues.delete(queue) end |
#resume_all ⇒ Object
70 71 72 |
# File 'lib/natswork/queue_manager.rb', line 70 def resume_all @paused_queues.clear end |
#set_concurrency(queue, limit) ⇒ Object
86 87 88 |
# File 'lib/natswork/queue_manager.rb', line 86 def set_concurrency(queue, limit) @concurrency_limits[queue] = limit end |
#set_priority(queue, priority) ⇒ Object
82 83 84 |
# File 'lib/natswork/queue_manager.rb', line 82 def set_priority(queue, priority) @priorities[queue] = priority end |
#stats ⇒ Object
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/natswork/queue_manager.rb', line 107 def stats { queues: @subscriptions.keys, priorities: @priorities.to_h, paused: paused_queues, total_subscriptions: @subscriptions.size, concurrency_limits: @concurrency_limits.to_h, active_counts: @active_counts.to_h } end |
#subscribe(queue, &block) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/natswork/queue_manager.rb', line 20 def subscribe(queue, &block) subject = queue_subject(queue) sid = @connection.subscribe(subject) do |msg| (queue, msg, &block) unless paused?(queue) end @subscriptions[queue] = sid @callbacks[queue] = block sid end |
#unsubscribe(queue) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/natswork/queue_manager.rb', line 32 def unsubscribe(queue) @mutex.synchronize do sid = @subscriptions.delete(queue) @connection.unsubscribe(sid) if sid @callbacks.delete(queue) end end |
#unsubscribe_all ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/natswork/queue_manager.rb', line 40 def unsubscribe_all @mutex.synchronize do @subscriptions.each_value do |sid| @connection.unsubscribe(sid) end @subscriptions.clear @callbacks.clear end end |