Class: ZK::MessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/z_k/message_queue.rb

Overview

implements a simple message queue based on Zookeeper recipes these are good for low-volume queues only because of the way zookeeper works, all message titles have to be read into memory in order to see what message to process next

Examples:

queue = zk.queue("somequeue")
queue.publish(some_string)
queue.poll! # will return one message
#subscribe will handle messages as they come in
queue.subscribe do |title, data|
  #handle message
end

See Also:

Instance Method Summary collapse

Instance Method Details

#delete_message(message_title) ⇒ Object

you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title

Parameters:

  • message_title (String)

    the title of the message to remove



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/z_k/message_queue.rb', line 52

def delete_message(message_title)
  full_path = "#{full_queue_path}/#{message_title}"
  locker = @zk.locker("#{full_queue_path}/#{message_title}")
  if locker.lock!
    begin
      @zk.delete(full_path)
      return true
    ensure
      locker.unlock!
    end
  else
    return false
  end
end

#destroy!Object

highly destructive method! WARNING! Will delete the queue and all messages in it



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/z_k/message_queue.rb', line 100

def destroy!
  children = @zk.children(full_queue_path)
  locks = []
  children.each do |path|
    lock = @zk.locker("#{full_queue_path}/#{path}")
    lock.lock!    # XXX(slyphon): should this be a blocking lock?
    locks << lock
  end
  children.each do |path|
    @zk.delete("#{full_queue_path}/#{path}") rescue ZK::Exceptions::NoNode
  end
  @zk.delete(full_queue_path) rescue ZK::Exceptions::NoNode
  locks.each do |lock|
    lock.unlock!
  end
end

#messagesObject

a list of the message titles in the queue



94
95
96
# File 'lib/z_k/message_queue.rb', line 94

def messages
  @zk.children(full_queue_path)
end

#poll!Object

grab one message from the queue used when you don’t want to or can’t subscribe

See Also:

  • ZooKeeper::MessageQueue#subscribe


70
71
72
# File 'lib/z_k/message_queue.rb', line 70

def poll!
  find_and_process_next_available(messages)
end

#publish(data, message_title = nil) ⇒ Object

publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue

Parameters:

  • data (String)
    • any arbitrary string value

  • optional (String)

    message_title - specify a unique message title for this message



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/z_k/message_queue.rb', line 36

def publish(data, message_title = nil)
  mode = :persistent_sequential
  if message_title
    mode = :persistent
  else
    message_title = "message"
  end
  @zk.create("#{full_queue_path}/#{message_title}", data, :mode => mode)
rescue KeeperException::NodeExists
  return false
end

#subscribe {|title, data| ... } ⇒ Object

Examples:

# subscribe like this:
subscribe {|title, data| handle_message!; true}
# returning true in the block deletes the message, false unlocks and requeues

Yields:

  • (title, data)

    yield to your block with the message title and the data of the message



80
81
82
83
84
85
86
# File 'lib/z_k/message_queue.rb', line 80

def subscribe(&block)
  @subscription_block = block
  @subscription_reference = @zk.watcher.subscribe(full_queue_path) do |event, zk|
    find_and_process_next_available(@zk.children(full_queue_path, :watch => true))
  end
  find_and_process_next_available(@zk.children(full_queue_path, :watch => true))
end

#unsubscribeObject

stop listening to this queue



89
90
91
# File 'lib/z_k/message_queue.rb', line 89

def unsubscribe
  @subscription_reference.unsubscribe
end