Class: ZK::MessageQueue
- Inherits:
-
Object
- Object
- ZK::MessageQueue
- Defined in:
- lib/z_k/message_queue.rb
Overview
implements a simple message queue based on Zookeeper recipes these are good for low-volume queues only because of the way zookeeper works, all message titles have to be read into memory in order to see what message to process next
Instance Method Summary collapse
-
#delete_message(message_title) ⇒ Object
you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title.
-
#destroy! ⇒ Object
highly destructive method! WARNING! Will delete the queue and all messages in it.
-
#messages ⇒ Object
a list of the message titles in the queue.
-
#poll! ⇒ Object
grab one message from the queue used when you don’t want to or can’t subscribe.
-
#publish(data, message_title = nil) ⇒ Object
publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue.
- #subscribe {|title, data| ... } ⇒ Object
-
#unsubscribe ⇒ Object
stop listening to this queue.
Instance Method Details
#delete_message(message_title) ⇒ Object
you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/z_k/message_queue.rb', line 52 def () full_path = "#{full_queue_path}/#{}" locker = @zk.locker("#{full_queue_path}/#{}") if locker.lock! begin @zk.delete(full_path) return true ensure locker.unlock! end else return false end end |
#destroy! ⇒ Object
highly destructive method! WARNING! Will delete the queue and all messages in it
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/z_k/message_queue.rb', line 100 def destroy! children = @zk.children(full_queue_path) locks = [] children.each do |path| lock = @zk.locker("#{full_queue_path}/#{path}") lock.lock! # XXX(slyphon): should this be a blocking lock? locks << lock end children.each do |path| @zk.delete("#{full_queue_path}/#{path}") rescue ZK::Exceptions::NoNode end @zk.delete(full_queue_path) rescue ZK::Exceptions::NoNode locks.each do |lock| lock.unlock! end end |
#messages ⇒ Object
a list of the message titles in the queue
94 95 96 |
# File 'lib/z_k/message_queue.rb', line 94 def @zk.children(full_queue_path) end |
#poll! ⇒ Object
grab one message from the queue used when you don’t want to or can’t subscribe
70 71 72 |
# File 'lib/z_k/message_queue.rb', line 70 def poll! find_and_process_next_available() end |
#publish(data, message_title = nil) ⇒ Object
publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/z_k/message_queue.rb', line 36 def publish(data, = nil) mode = :persistent_sequential if mode = :persistent else = "message" end @zk.create("#{full_queue_path}/#{}", data, :mode => mode) rescue KeeperException::NodeExists return false end |
#subscribe {|title, data| ... } ⇒ Object
80 81 82 83 84 85 86 |
# File 'lib/z_k/message_queue.rb', line 80 def subscribe(&block) @subscription_block = block @subscription_reference = @zk.watcher.subscribe(full_queue_path) do |event, zk| find_and_process_next_available(@zk.children(full_queue_path, :watch => true)) end find_and_process_next_available(@zk.children(full_queue_path, :watch => true)) end |
#unsubscribe ⇒ Object
stop listening to this queue
89 90 91 |
# File 'lib/z_k/message_queue.rb', line 89 def unsubscribe @subscription_reference.unsubscribe end |