Class: NatsWork::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/queue_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ QueueManager

Returns a new instance of QueueManager.



9
10
11
12
13
14
15
16
17
18
# File 'lib/natswork/queue_manager.rb', line 9

def initialize(connection)
  @connection = connection
  @subscriptions = Concurrent::Hash.new
  @callbacks = Concurrent::Hash.new
  @priorities = Concurrent::Hash.new
  @concurrency_limits = Concurrent::Hash.new
  @paused_queues = Concurrent::Array.new
  @active_counts = Concurrent::Hash.new { 0 }
  @mutex = Mutex.new
end

Instance Attribute Details

#concurrency_limitsObject (readonly)

Returns the value of attribute concurrency_limits.



7
8
9
# File 'lib/natswork/queue_manager.rb', line 7

def concurrency_limits
  @concurrency_limits
end

#prioritiesObject (readonly)

Returns the value of attribute priorities.



7
8
9
# File 'lib/natswork/queue_manager.rb', line 7

def priorities
  @priorities
end

#subscriptionsObject (readonly)

Returns the value of attribute subscriptions.



7
8
9
# File 'lib/natswork/queue_manager.rb', line 7

def subscriptions
  @subscriptions
end

Instance Method Details

#add_queue(queue, &block) ⇒ Object



50
51
52
# File 'lib/natswork/queue_manager.rb', line 50

def add_queue(queue, &block)
  subscribe(queue, &block)
end

#drain(queue, timeout: 30) ⇒ Object



90
91
92
93
94
95
96
97
98
# File 'lib/natswork/queue_manager.rb', line 90

def drain(queue, timeout: 30)
  deadline = Time.now + timeout

  # Process any pending messages
  process_pending_messages(queue)

  # Wait for timeout
  sleep 0.1 while Time.now < deadline
end

#drain_all(timeout: 30) ⇒ Object



100
101
102
103
104
105
# File 'lib/natswork/queue_manager.rb', line 100

def drain_all(timeout: 30)
  threads = @subscriptions.keys.map do |queue|
    Thread.new { drain(queue, timeout: timeout) }
  end
  threads.each(&:join)
end

#pause(queue) ⇒ Object



58
59
60
# File 'lib/natswork/queue_manager.rb', line 58

def pause(queue)
  @paused_queues << queue unless @paused_queues.include?(queue)
end

#pause_allObject



62
63
64
# File 'lib/natswork/queue_manager.rb', line 62

def pause_all
  @subscriptions.each_key { |queue| pause(queue) }
end

#paused?(queue) ⇒ Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/natswork/queue_manager.rb', line 74

def paused?(queue)
  @paused_queues.include?(queue)
end

#paused_queuesObject



78
79
80
# File 'lib/natswork/queue_manager.rb', line 78

def paused_queues
  @paused_queues.to_a
end

#remove_queue(queue) ⇒ Object



54
55
56
# File 'lib/natswork/queue_manager.rb', line 54

def remove_queue(queue)
  unsubscribe(queue)
end

#resume(queue) ⇒ Object



66
67
68
# File 'lib/natswork/queue_manager.rb', line 66

def resume(queue)
  @paused_queues.delete(queue)
end

#resume_allObject



70
71
72
# File 'lib/natswork/queue_manager.rb', line 70

def resume_all
  @paused_queues.clear
end

#set_concurrency(queue, limit) ⇒ Object



86
87
88
# File 'lib/natswork/queue_manager.rb', line 86

def set_concurrency(queue, limit)
  @concurrency_limits[queue] = limit
end

#set_priority(queue, priority) ⇒ Object



82
83
84
# File 'lib/natswork/queue_manager.rb', line 82

def set_priority(queue, priority)
  @priorities[queue] = priority
end

#statsObject



107
108
109
110
111
112
113
114
115
116
# File 'lib/natswork/queue_manager.rb', line 107

def stats
  {
    queues: @subscriptions.keys,
    priorities: @priorities.to_h,
    paused: paused_queues,
    total_subscriptions: @subscriptions.size,
    concurrency_limits: @concurrency_limits.to_h,
    active_counts: @active_counts.to_h
  }
end

#subscribe(queue, &block) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/natswork/queue_manager.rb', line 20

def subscribe(queue, &block)
  subject = queue_subject(queue)

  sid = @connection.subscribe(subject) do |msg|
    process_message(queue, msg, &block) unless paused?(queue)
  end

  @subscriptions[queue] = sid
  @callbacks[queue] = block
  sid
end

#unsubscribe(queue) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/natswork/queue_manager.rb', line 32

def unsubscribe(queue)
  @mutex.synchronize do
    sid = @subscriptions.delete(queue)
    @connection.unsubscribe(sid) if sid
    @callbacks.delete(queue)
  end
end

#unsubscribe_allObject



40
41
42
43
44
45
46
47
48
# File 'lib/natswork/queue_manager.rb', line 40

def unsubscribe_all
  @mutex.synchronize do
    @subscriptions.each_value do |sid|
      @connection.unsubscribe(sid)
    end
    @subscriptions.clear
    @callbacks.clear
  end
end