Class: Nanite::Mapper
- Includes:
- AMQPHelper, ConsoleHelper, DaemonizeHelper
- Defined in:
- lib/nanite/mapper.rb
Overview
Mappers are control nodes in nanite clusters. Nanite clusters can follow peer-to-peer model of communication as well as client-server, and mappers are nodes that know who to send work requests to agents.
Mappers can reside inside a front end web application written in Merb/Rails and distribute heavy lifting to actors that register with the mapper as soon as they go online.
Each mapper tracks nanites registered with it. It periodically checks when the last time a certain nanite sent a heartbeat notification, and removes those that have timed out from the list of available workers. As soon as a worker goes back online again it re-registers itself and the mapper adds it to the list and makes it available to be called again.
This makes Nanite clusters self-healing and immune to individual node failures.
Constant Summary collapse
- DEFAULT_OPTIONS =
COMMON_DEFAULT_OPTIONS.merge({ :user => 'mapper', :identity => Identity.generate, :agent_timeout => 15, :offline_redelivery_frequency => 10, :persistent => false, :offline_failsafe => false, :callbacks => {} })
Instance Attribute Summary collapse
-
#amq ⇒ Object
readonly
Returns the value of attribute amq.
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#job_warden ⇒ Object
readonly
Returns the value of attribute job_warden.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#serializer ⇒ Object
readonly
Returns the value of attribute serializer.
Class Method Summary collapse
-
.start(options = {}) ⇒ Object
Initializes a new mapper and establishes AMQP connection.
Instance Method Summary collapse
-
#initialize(options) ⇒ Mapper
constructor
A new instance of Mapper.
-
#push(type, payload = '', opts = {}) ⇒ Object
Make a nanite request which does not expect a response.
-
#request(type, payload = '', opts = {}, &blk) ⇒ Object
Make a nanite request which expects a response.
- #run ⇒ Object
- #send_push(push, opts = {}) ⇒ Object
-
#send_request(request, opts = {}, &blk) ⇒ Object
Send request with pre-built request instance.
Methods included from DaemonizeHelper
Methods included from ConsoleHelper
Methods included from AMQPHelper
Constructor Details
#initialize(options) ⇒ Mapper
Returns a new instance of Mapper.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/nanite/mapper.rb', line 111 def initialize() @options = DEFAULT_OPTIONS.clone.merge() root = [:root] || @options[:root] custom_config = if root file = File.(File.join(root, 'config.yml')) File.exists?(file) ? (YAML.load(IO.read(file)) || {}) : {} else {} end .delete(:identity) unless [:identity] @options.update(custom_config.merge()) @identity = "mapper-#{@options[:identity]}" @options[:file_root] ||= File.join(@options[:root], 'files') @options[:log_path] = false if @options[:daemonize] @options[:log_path] = (@options[:log_dir] || @options[:root] || Dir.pwd) end @options.freeze end |
Instance Attribute Details
#amq ⇒ Object (readonly)
Returns the value of attribute amq.
24 25 26 |
# File 'lib/nanite/mapper.rb', line 24 def amq @amq end |
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
24 25 26 |
# File 'lib/nanite/mapper.rb', line 24 def cluster @cluster end |
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
24 25 26 |
# File 'lib/nanite/mapper.rb', line 24 def identity @identity end |
#job_warden ⇒ Object (readonly)
Returns the value of attribute job_warden.
24 25 26 |
# File 'lib/nanite/mapper.rb', line 24 def job_warden @job_warden end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
24 25 26 |
# File 'lib/nanite/mapper.rb', line 24 def @options end |
#serializer ⇒ Object (readonly)
Returns the value of attribute serializer.
24 25 26 |
# File 'lib/nanite/mapper.rb', line 24 def serializer @serializer end |
Class Method Details
.start(options = {}) ⇒ Object
Initializes a new mapper and establishes AMQP connection. This must be used inside EM.run block or if EventMachine reactor is already started, for instance, by a Thin server that your Merb/Rails application runs on.
Mapper options:
identity : identity of this mapper, may be any string
format : format to use for packets serialization. Can be :marshal, :json or :yaml or :secure.
Defaults to Ruby's Marshall format. For interoperability with
AMQP clients implemented in other languages, use JSON.
Note that Nanite uses JSON gem,
and ActiveSupport's JSON encoder may cause clashes
if ActiveSupport is loaded after JSON gem.
Also using the secure format requires prior initialization of the serializer, see
SecureSerializer.init
log_level : the verbosity of logging, can be debug, info, warn, error or fatal.
agent_timeout : how long to wait before an agent is considered to be offline
and thus removed from the list of available agents.
log_dir : log file path, defaults to the current working directory.
console : true tells mapper to start interactive console
daemonize : true tells mapper to daemonize
pid_dir : path to the directory where the agent stores its pid file (only if daemonized)
defaults to the root or the current working directory.
offline_redelivery_frequency : The frequency in seconds that messages stored in the offline queue will be retrieved
for attempted redelivery to the nanites. Default is 10 seconds.
persistent : true instructs the AMQP broker to save messages to persistent storage so that they aren’t lost when the
broker is restarted. Default is false. Can be overriden on a per-message basis using the request and push methods.
secure : use Security features of rabbitmq to restrict nanites to themselves
prefetch : Sets prefetch (only supported in RabbitMQ >= 1.6)
callbacks : A set of callbacks to be executed on specific events. Supported events are :register,
:unregister, :timeout and :exception. This option must be a hash with event names as
as keys and Procs as values. The Proc's arity (number of parameters) depends on the
type of callback:
exception -- the exception, the message being processed, a reference to the mapper
all others -- the corresponding nanite's identity, a reference to the mapper
tag_store : Name of class which implements tag store backend interface, RedisTagStore by default
Connection options:
vhost : AMQP broker vhost that should be used
user : AMQP broker user
pass : AMQP broker password
host : host AMQP broker (or node of interest) runs on,
defaults to 0.0.0.0
port : port AMQP broker (or node of interest) runs on,
this defaults to 5672, port used by some widely
used AMQP brokers (RabbitMQ and ZeroMQ)
105 106 107 108 109 |
# File 'lib/nanite/mapper.rb', line 105 def self.start( = {}) mapper = new() mapper.run mapper end |
Instance Method Details
#push(type, payload = '', opts = {}) ⇒ Object
Make a nanite request which does not expect a response.
Parameters
- type<String>
-
The dispatch route for the request
- payload<Object>
-
Payload to send. This will get marshalled en route
Options
- :selector<Symbol>
-
Method for selecting an actor. Default is :least_loaded.
- :least_loaded
-
Pick the nanite which has the lowest load.
- :all
-
Send the request to all nanites which respond to the service.
- :random
-
Randomly pick a nanite.
:rr: Select a nanite according to round robin ordering.
- :offline_failsafe<Boolean>
-
Store messages in an offline queue when all
the nanites are offline. Messages will be redelivered when nanites come online.
Default is false unless the mapper was started with the --offline-failsafe flag.
- :persistent<Boolean>
-
Instructs the AMQP broker to save the message to persistent
storage so that it isnt lost when the broker is restarted.
Default is false unless the mapper was started with the --persistent flag.
232 233 234 235 |
# File 'lib/nanite/mapper.rb', line 232 def push(type, payload = '', opts = {}) push = build_deliverable(Push, type, payload, opts) send_push(push, opts) end |
#request(type, payload = '', opts = {}, &blk) ⇒ Object
Make a nanite request which expects a response.
Parameters
- type<String>
-
The dispatch route for the request
- payload<Object>
-
Payload to send. This will get marshalled en route
Options
- :selector<Symbol>
-
Method for selecting an actor. Default is :least_loaded.
- :least_loaded
-
Pick the nanite which has the lowest load.
- :all
-
Send the request to all nanites which respond to the service.
- :random
-
Randomly pick a nanite.
:rr: Select a nanite according to round robin ordering.
- :target<String>
-
Select a specific nanite via identity, rather than using
a selector.
- :offline_failsafe<Boolean>
-
Store messages in an offline queue when all
the nanites are offline. Messages will be redelivered when nanites come online.
Default is false unless the mapper was started with the --offline-failsafe flag.
- :persistent<Boolean>
-
Instructs the AMQP broker to save the message to persistent
storage so that it isnt lost when the broker is restarted.
Default is false unless the mapper was started with the --persistent flag.
- :intermediate_handler
-
Takes a lambda to call when an IntermediateMessage
event arrives from a nanite. If passed a Hash, hash keys should correspond to
the IntermediateMessage keys provided by the nanite, and each should have a value
that is a lambda/proc taking the parameters specified here. Can supply a key '*'
as a catch-all for unmatched keys.
Block Parameters for intermediate_handler
- key<String>
-
array of unique keys for which intermediate state has been received
since the last call to this block.
- nanite<String>
-
nanite which sent the message.
- state
-
most recently delivered intermediate state for the key provided.
- job
-
(optional) – if provided, this parameter gets the whole job object, if there’s
a reason to do more complex work with the job.
Block Parameters
- :results<Object>
-
The returned value from the nanite actor.
189 190 191 192 |
# File 'lib/nanite/mapper.rb', line 189 def request(type, payload = '', opts = {}, &blk) request = build_deliverable(Request, type, payload, opts) send_request(request, opts, &blk) end |
#run ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/nanite/mapper.rb', line 131 def run setup_logging @serializer = Serializer.new(@options[:format]) pid_file = PidFile.new(@identity, @options) pid_file.check if @options[:daemonize] daemonize(@identity, @options) pid_file.write at_exit { pid_file.remove } else trap("INT") {exit} end @amq = start_amqp(@options) @job_warden = JobWarden.new(@serializer) setup_cluster Nanite::Log.info('[setup] starting mapper') setup_queues start_console if @options[:console] && !@options[:daemonize] end |
#send_push(push, opts = {}) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/nanite/mapper.rb', line 237 def send_push(push, opts = {}) include_timed_out = push.selector == :all targets = cluster.targets_for(push, include_timed_out) if !targets.empty? cluster.route(push, targets) true elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : [:offline_failsafe] cluster.publish(push, 'mapper-offline') :offline else false end end |
#send_request(request, opts = {}, &blk) ⇒ Object
Send request with pre-built request instance
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/nanite/mapper.rb', line 195 def send_request(request, opts = {}, &blk) request.reply_to = identity intm_handler = opts.delete(:intermediate_handler) targets = cluster.targets_for(request, include_timed_out=false) if !targets.empty? job = job_warden.new_job(request, targets, intm_handler, blk) cluster.route(request, job.targets) job elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : [:offline_failsafe] job_warden.new_job(request, [], intm_handler, blk) cluster.publish(request, 'mapper-offline') :offline else false end end |