Module: Puppet::Util::Queue

Extended by:
InstanceLoader
Included in:
Indirector::Queue
Defined in:
lib/vendor/puppet/util/queue.rb

Overview

Implements a message queue client type plugin registry for use by the indirector facility. Client modules for speaking a particular protocol (e.g. Stomp::Client for Stomp message brokers, Memcached for Starling and Sparrow, etc.) register themselves with this module.

Client classes are expected to live under the Puppet::Util::Queue namespace and corresponding directory; the attempted use of a client via its typename (see below) will cause Puppet::Util::Queue to attempt to load the corresponding plugin if it is not yet loaded. The client class registers itself with Puppet::Util::Queue and should use the same type name as the autloader expects for the plugin file.

class Puppet::Util::Queue::SpecialMagicalClient < Messaging::SpecialMagic
    ...
    Puppet::Util::Queue.register_queue_type_class(self)
end

This module reduces the rightmost segment of the class name into a pretty symbol that will serve as the queuing client’s name. Which means that the “SpecialMagicalClient” above will be named :special_magical_client within the registry.

Another class/module may mix-in this module, and may then make use of the registered clients.

class Queue::Fue
    # mix it in at the class object level rather than instance level
    extend ::Puppet::Util::Queue
end

Queue::Fue instances can get a message queue client through the registry through the mixed-in method client, which will return a class-wide singleton client instance, determined by client_class.

The client plugins are expected to implement an interface similar to that of Stomp::Client:

  • new should return a connected, ready-to-go client instance. Note that no arguments are passed in.

  • publish_message(queue, message) should publish the message to the specified queue.

  • subscribe(queue) block subscribes to queue and executes block upon receiving a message.

  • queue names are simple names independent of the message broker or client library. No “/queue/” prefixes like in Stomp::Client.

Defined Under Namespace

Classes: Stomp

Constant Summary

Constants included from Puppet::Util

AbsolutePathPosix, AbsolutePathWindows

Class Method Summary collapse

Instance Method Summary collapse

Methods included from InstanceLoader

instance_docs, instance_hash, instance_load, instance_loader, instance_loading?, loaded_instance, loaded_instances

Methods included from Puppet::Util

absolute_path?, activerecord_version, benchmark, binread, chuser, classproxy, #execfail, #execpipe, execute, execute_posix, execute_windows, logmethods, memory, path_to_uri, proxy, replace_file, safe_posix_fork, symbolize, symbolizehash, symbolizehash!, synchronize_on, thinmark, #threadlock, uri_to_path, wait_for_output, which, withumask

Methods included from POSIX

#get_posix_field, #gid, #idfield, #methodbyid, #methodbyname, #search_posix_field, #uid

Class Method Details

.queue_type_from_class(klass) ⇒ Object

Given a class object klass, returns the programmatic default queue type name symbol for klass. The algorithm is as shown in earlier examples; the last namespace segment of klass.name is taken and converted from mixed case to underscore-separated lowercase, and interned.

queue_type_from_class(Foo) -> :foo
queue_type_from_class(Foo::Too) -> :too
queue_type_from_class(Foo::ForYouTwo) -> :for_you_too

The implicit assumption here, consistent with Puppet’s approach to plugins in general, is that all your client modules live in the same namespace, such that reduction to a flat namespace of symbols is reasonably safe.



77
78
79
80
# File 'lib/vendor/puppet/util/queue.rb', line 77

def self.queue_type_from_class(klass)
  # convert last segment of classname from studly caps to lower case with underscores, and symbolize
  klass.name.split('::').pop.sub(/^[A-Z]/) {|c| c.downcase}.gsub(/[A-Z]/) {|c| '_' + c.downcase }.intern
end

.queue_type_to_class(type) ⇒ Object

Given a queue type symbol, returns the associated Class object. If the queue type is unknown (meaning it hasn’t been registered with this module), an exception is thrown.

Raises:



61
62
63
64
65
# File 'lib/vendor/puppet/util/queue.rb', line 61

def self.queue_type_to_class(type)
  c = loaded_instance :queue_clients, type
  raise Puppet::Error, "Queue type #{type} is unknown." unless c
  c
end

.register_queue_type(klass, type = nil) ⇒ Object

Adds a new class/queue-type pair to the registry. The type argument is optional; if not provided, type defaults to a lowercased, underscored symbol programmatically derived from the rightmost namespace of klass.name.

# register with default name +:you+
register_queue_type(Foo::You)

# register with explicit queue type name +:myself+
register_queue_type(Foo::Me, :myself)

If the type is already registered, an exception is thrown. No checking is performed of klass, however; a given class could be registered any number of times, as long as the type differs with each registration.

Raises:



53
54
55
56
57
# File 'lib/vendor/puppet/util/queue.rb', line 53

def self.register_queue_type(klass, type = nil)
  type ||= queue_type_from_class(klass)
  raise Puppet::Error, "Queue type #{type} is already registered" if instance_hash(:queue_clients).include?(type)
  instance_hash(:queue_clients)[type] = klass
end

Instance Method Details

#clientObject

Returns (instantiating as necessary) the singleton queue client instance, according to the client_class. No arguments go to the client class constructor, meaning its up to the client class to know how to determine its queue message source (presumably through Puppet configuration data).



93
94
95
# File 'lib/vendor/puppet/util/queue.rb', line 93

def client
  @client ||= client_class.new
end

#client_classObject

The class object for the client to be used, determined by queue configuration settings. Looks to the :queue_type configuration entry in the running application for the default queue type to use.



86
87
88
# File 'lib/vendor/puppet/util/queue.rb', line 86

def client_class
  Puppet::Util::Queue.queue_type_to_class(Puppet[:queue_type])
end