Class: ZK::MessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/zk/message_queue.rb

Overview

implements a simple message queue based on Zookeeper recipes

these are good for low-volume queues only

because of the way zookeeper works, all message titles have to be read into memory in order to see what message to process next

Examples:

queue = zk.queue("somequeue")
queue.publish(some_string)
queue.poll! # will return one message
#subscribe will handle messages as they come in
queue.subscribe do |title, data|
  #handle message
end

See Also:

Instance Method Summary collapse

Instance Method Details

#delete_message(message_title) ⇒ Object

you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title

Parameters:

  • message_title (String)

    the title of the message to remove



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/zk/message_queue.rb', line 58

def delete_message(message_title)
  full_path = "#{full_queue_path}/#{message_title}"
  locker = @zk.locker("#{full_queue_path}/#{message_title}")
  if locker.lock!
    begin
      @zk.delete(full_path)
      return true
    ensure
      locker.unlock!
    end
  else
    return false
  end
end

#destroy!Object

highly destructive method! WARNING! Will delete the queue and all messages in it



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/zk/message_queue.rb', line 113

def destroy!
  unsubscribe  # first thing, make sure we don't get any callbacks related to this
  children = @zk.children(full_queue_path)
  locks = []
  children.each do |path|
    lock = @zk.locker("#{full_queue_path}/#{path}")
    lock.lock!    # XXX(slyphon): should this be a blocking lock?
    locks << lock
  end
  children.each do |path|
    begin
      @zk.delete("#{full_queue_path}/#{path}") 
    rescue ZK::Exceptions::NoNode
    end
  end

  begin
    @zk.delete(full_queue_path) 
  rescue ZK::Exceptions::NoNode
  end

  locks.each do |lock|
    lock.unlock!
  end
end

#messagesObject

a list of the message titles in the queue



107
108
109
# File 'lib/zk/message_queue.rb', line 107

def messages
  @zk.children(full_queue_path)
end

#poll!Object

grab one message from the queue

used when you don't want to or can't subscribe

See Also:

  • ZooKeeper::MessageQueue#subscribe


78
79
80
# File 'lib/zk/message_queue.rb', line 78

def poll!
  find_and_process_next_available(messages)
end

#publish(data, message_title = nil) ⇒ Object

publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue

Parameters:

  • data (String)

    any arbitrary string value

  • message_title (String) (defaults to: nil)

    specify a unique message title for this message (optional)



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/zk/message_queue.rb', line 42

def publish(data, message_title = nil)
  mode = :persistent_sequential
  if message_title
    mode = :persistent
  else
    message_title = "message"
  end
  @zk.create("#{full_queue_path}/#{message_title}", data, :mode => mode)
rescue ZK::Exceptions::NodeExists
  return false
end

#subscribe {|title, data| ... } ⇒ Object

Examples:

# subscribe like this:
subscribe {|title, data| handle_message!; true}
# returning true in the block deletes the message, false unlocks and requeues

Yields:

  • (title, data)

    yield to your block with the message title and the data of the message



89
90
91
92
93
94
95
96
# File 'lib/zk/message_queue.rb', line 89

def subscribe(&block)
  @subscription_block = block
  @sub = @zk.register(full_queue_path) do |event, zk|
    find_and_process_next_available(@zk.children(full_queue_path, :watch => true))
  end

  find_and_process_next_available(@zk.children(full_queue_path, :watch => true))
end

#unsubscribeObject

stop listening to this queue



99
100
101
102
103
104
# File 'lib/zk/message_queue.rb', line 99

def unsubscribe
  if @sub
    @sub.unsubscribe
    @sub = nil
  end
end