Class: DripDrop::Node
- Inherits:
-
Object
- Object
- DripDrop::Node
- Defined in:
- lib/dripdrop/node.rb,
lib/dripdrop/node/nodelet.rb
Defined Under Namespace
Classes: Nodelet
Constant Summary collapse
- ZCTX =
ZMQ::Context.new 1
Instance Attribute Summary collapse
-
#debug ⇒ Object
Returns the value of attribute debug.
-
#nodelets ⇒ Object
readonly
Returns the value of attribute nodelets.
-
#routing ⇒ Object
readonly
Returns the value of attribute routing.
-
#run_list ⇒ Object
readonly
Returns the value of attribute run_list.
-
#zm_reactor ⇒ Object
readonly
Returns the value of attribute zm_reactor.
Class Method Summary collapse
-
.error_handler(e) ⇒ Object
Catch all error handler Global to all DripDrop Nodes.
Instance Method Summary collapse
-
#action ⇒ Object
When subclassing
DripDrop::Node
you probably want to define this method Otherwise it will attempt to run the @block passed intoDripDrop::Node.new
. -
#http_client(address, opts = {}) ⇒ Object
An EM HTTP client.
-
#http_server(address, opts = {}, &block) ⇒ Object
Starts a new Thin HTTP server listening on address.
-
#initialize(opts = {}, &block) ⇒ Node
constructor
A new instance of Node.
-
#join ⇒ Object
If the reactor has started, this blocks until the thread running the reactor joins.
-
#nodelet(name, klass = Nodelet, *configure_args, &block) ⇒ Object
Nodelets are a way of segmenting a DripDrop::Node.
-
#recv_internal(dest, identifier, &block) ⇒ Object
Defines a subscriber to the channel
dest
, to receive messages fromsend_internal
. -
#remove_recv_internal(dest, identifier) ⇒ Object
Deletes a subscriber to the channel
dest
previously identified by a reciever created withrecv_internal
. -
#route(name, handler_type, *handler_args) ⇒ Object
Defines a new route.
-
#route_full(nodelet, name, handler_type, *handler_args) ⇒ Object
Probably not useful for most, apps.
-
#routes_for(nodelet_name, &block) ⇒ Object
DEPRECATED, will be deleted in 0.8.
-
#send_internal(dest, data) ⇒ Object
An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.
-
#start ⇒ Object
Starts the reactors and runs the block passed to initialize.
-
#start! ⇒ Object
Blocking version of start, equivalent to
start
thenjoin
. -
#stop ⇒ Object
Stops the reactors.
-
#websocket(*args) ⇒ Object
DEPRECATED: Use websocket_server.
-
#websocket_server(address, opts = {}) ⇒ Object
Binds an EM websocket server connection to
address
. - #zmq_m2(addresses, opts = {}, &block) ⇒ Object
-
#zmq_publish(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUB type socket, can only send messages via
send_message
. -
#zmq_pull(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::PULL type socket.
-
#zmq_push(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUSH type socket, can only send messages via
send_message
. -
#zmq_subscribe(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::SUB type socket.
-
#zmq_xrep(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited.
-
#zmq_xreq(address, socket_ctype, opts = {}) ⇒ Object
See the documentation for
zmq_xrep
for more info.
Constructor Details
#initialize(opts = {}, &block) ⇒ Node
Returns a new instance of Node.
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/dripdrop/node.rb', line 29 def initialize(opts={},&block) @block = block @thread = nil # Thread containing the reactors @routing = {} # Routing table @run_list = opts['run_list'] || opts[:run_list] || nil #List of nodelets to run @run_list = @run_list.map(&:to_sym) if @run_list @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} @nodelets = {} # Cache of registered nodelets @zctx = ZCTX end |
Instance Attribute Details
#debug ⇒ Object
Returns the value of attribute debug.
27 28 29 |
# File 'lib/dripdrop/node.rb', line 27 def debug @debug end |
#nodelets ⇒ Object (readonly)
Returns the value of attribute nodelets.
26 27 28 |
# File 'lib/dripdrop/node.rb', line 26 def nodelets @nodelets end |
#routing ⇒ Object (readonly)
Returns the value of attribute routing.
26 27 28 |
# File 'lib/dripdrop/node.rb', line 26 def routing @routing end |
#run_list ⇒ Object (readonly)
Returns the value of attribute run_list.
26 27 28 |
# File 'lib/dripdrop/node.rb', line 26 def run_list @run_list end |
#zm_reactor ⇒ Object (readonly)
Returns the value of attribute zm_reactor.
26 27 28 |
# File 'lib/dripdrop/node.rb', line 26 def zm_reactor @zm_reactor end |
Class Method Details
.error_handler(e) ⇒ Object
Catch all error handler Global to all DripDrop Nodes
310 311 312 |
# File 'lib/dripdrop/node.rb', line 310 def self.error_handler(e) $stderr.write "#{e.class}: #{e.}\n\t#{e.backtrace.join("\n\t")}" end |
Instance Method Details
#action ⇒ Object
When subclassing DripDrop::Node
you probably want to define this method Otherwise it will attempt to run the @block passed into DripDrop::Node.new
64 65 66 67 68 69 70 |
# File 'lib/dripdrop/node.rb', line 64 def action if @block self.instance_eval(&@block) else raise "Could not start, no block or action specified" end end |
#http_client(address, opts = {}) ⇒ Object
An EM HTTP client. Example:
client = http_client(addr)
client.(:name => 'name', :body => 'hi') do |resp_msg|
puts resp_msg.inspect
end
266 267 268 269 270 |
# File 'lib/dripdrop/node.rb', line 266 def http_client(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::HTTPClientHandler.new(uri, h_opts) end |
#http_server(address, opts = {}, &block) ⇒ Object
Starts a new Thin HTTP server listening on address. Can have an on_receive
handler that gets passed msg
and response
args.
http_server(addr) {|msg,response| response.(msg)}
254 255 256 257 258 |
# File 'lib/dripdrop/node.rb', line 254 def http_server(address,opts={},&block) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::HTTPServerHandler.new(uri, h_opts,&block) end |
#join ⇒ Object
If the reactor has started, this blocks until the thread running the reactor joins. This should block forever unless stop
is called.
75 76 77 78 79 80 81 |
# File 'lib/dripdrop/node.rb', line 75 def join if @thread @thread.join else raise "Can't join on a node that isn't yet started" end end |
#nodelet(name, klass = Nodelet, *configure_args, &block) ⇒ Object
Nodelets are a way of segmenting a DripDrop::Node. This can be used for both organization and deployment. One might want the production deployment of an app to be broken across multiple servers or processes for instance:
nodelet :heartbeat do |nlet|
nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
EM::PeriodicalTimer.new(1) do
nlet.ticker.(:name => 'tick')
end
end
Nodelets can also be subclassed, for instance:
class SpecialNodelet < DripDrop::Node::Nodelet
def action
nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
EM::PeriodicalTimer.new(1) do
nlet.ticker.(:name => 'tick')
end
end
end
nodelet :heartbeat, SpecialNodelet
If you specify a block, Nodelet#action will be ignored and the block will be run
Nodelets are made available as instance methods on the current DripDrop::Nodelet Object, so the following works as well:
nodelet :mynodelet
mynodelet.route :route_name, :zmq_xreq, 'tcp://127.0.0.1:2000', ;bind
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/dripdrop/node.rb', line 157 def nodelet(name,klass=Nodelet,*configure_args,&block) # If there's a run list, only run nodes in that list return nil if @run_list && !@run_list.include?(name.to_sym) nlet = @nodelets[name] ||= klass.new(self,name,*configure_args) # Define a method returning the nodelet in the current node unless respond_to?(name) (class << self; self; end).class_eval do define_method(name) { nlet } end end if block block.call(nlet) else nlet.action end nlet end |
#recv_internal(dest, identifier, &block) ⇒ Object
Defines a subscriber to the channel dest
, to receive messages from send_internal
. identifier
is a unique identifier for this receiver. The identifier can be used by remove_recv_internal
293 294 295 296 297 298 299 |
# File 'lib/dripdrop/node.rb', line 293 def recv_internal(dest,identifier,&block) if @recipients_for[dest] @recipients_for[dest][identifier] = block else @recipients_for[dest] = {identifier => block} end end |
#remove_recv_internal(dest, identifier) ⇒ Object
Deletes a subscriber to the channel dest
previously identified by a reciever created with recv_internal
303 304 305 306 |
# File 'lib/dripdrop/node.rb', line 303 def remove_recv_internal(dest,identifier) return false unless @recipients_for[dest] @recipients_for[dest].delete(identifier) end |
#route(name, handler_type, *handler_args) ⇒ Object
Defines a new route. Routes are the recommended way to instantiate handlers. For example:
route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind
route :stats_sub, :zmq_subscribe, stats_pub.address, :connect
Will make the following methods available within the reactor block:
stats_pub # A regular zmq_publish handler
:stats_sub # A regular zmq_subscribe handler
See the docs for routes_for
for more info in grouping routes for nodelets and maintaining sanity in larger apps
95 96 97 |
# File 'lib/dripdrop/node.rb', line 95 def route(name,handler_type,*handler_args) route_full(nil, name, handler_type, *handler_args) end |
#route_full(nodelet, name, handler_type, *handler_args) ⇒ Object
Probably not useful for most, apps. This is used internally to create a route for a given nodelet.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/dripdrop/node.rb', line 101 def route_full(nodelet, name, handler_type, *handler_args) # If we're in a route_for block, prepend appropriately full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name handler = self.send(handler_type, *handler_args) @routing[full_name] = handler # Define the route name as a singleton method (class << self; self; end).class_eval do define_method(full_name) { handler } end handler end |
#routes_for(nodelet_name, &block) ⇒ Object
DEPRECATED, will be deleted in 0.8
117 118 119 120 121 |
# File 'lib/dripdrop/node.rb', line 117 def routes_for(nodelet_name,&block) $stderr.write "routes_for is now deprecated, use nodelet instead" nlet = nodelet(nodelet_name,&block) block.call(nlet) end |
#send_internal(dest, data) ⇒ Object
An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.
This is useful for situations where you want to broadcast messages across your app, but need a way to properly delete listeners.
dest
is the name of the pub/sub channel. data
is any type of ruby var you’d like to send.
281 282 283 284 285 286 287 288 |
# File 'lib/dripdrop/node.rb', line 281 def send_internal(dest,data) return false unless @recipients_for[dest] blocks = @recipients_for[dest].values return false unless blocks blocks.each do |block| block.call(data) end end |
#start ⇒ Object
Starts the reactors and runs the block passed to initialize. This is non-blocking.
44 45 46 47 48 49 |
# File 'lib/dripdrop/node.rb', line 44 def start @thread = Thread.new do EM.error_handler {|e| self.class.error_handler e} EM.run { action } end end |
#start! ⇒ Object
Blocking version of start, equivalent to start
then join
52 53 54 55 |
# File 'lib/dripdrop/node.rb', line 52 def start! self.start self.join end |
#stop ⇒ Object
Stops the reactors. If you were blocked on #join, that will unblock.
58 59 60 |
# File 'lib/dripdrop/node.rb', line 58 def stop EM.stop end |
#websocket(*args) ⇒ Object
DEPRECATED: Use websocket_server
246 247 248 249 |
# File 'lib/dripdrop/node.rb', line 246 def websocket(*args) $stderr.write "DripDrop#websocket handler is deprecated, use DripDrop#websocket_server" websocket_server(*args) end |
#websocket_server(address, opts = {}) ⇒ Object
Binds an EM websocket server connection to address
. takes blocks for on_open
, on_receive
, on_close
and on_error
.
For example on_receive
could be used to echo incoming messages thusly:
websocket_server(addr).on_open {|conn|
ws.(:name => 'ws_open_ack')
}.on_receive {|msg,conn|
conn.send(msg)
}.on_close {|conn|
}.on_error {|reason,conn|
}
The ws
object that’s passed into the handlers is not the DripDrop::WebSocketHandler
object, but an em-websocket object.
239 240 241 242 243 |
# File 'lib/dripdrop/node.rb', line 239 def websocket_server(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::WebSocketHandler.new(uri,h_opts) end |
#zmq_m2(addresses, opts = {}, &block) ⇒ Object
178 179 180 |
# File 'lib/dripdrop/node.rb', line 178 def zmq_m2(addresses, opts={}, &block) zmq_handler(DripDrop::Mongrel2Handler, [ZMQ::PULL, ZMQ::PUB], addresses, [:connect, :connect], opts) end |
#zmq_publish(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUB type socket, can only send messages via send_message
190 191 192 |
# File 'lib/dripdrop/node.rb', line 190 def zmq_publish(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQPubHandler,ZMQ::PUB,address,socket_ctype,opts) end |
#zmq_pull(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::PULL type socket. Can only receive messages via on_receive
195 196 197 |
# File 'lib/dripdrop/node.rb', line 195 def zmq_pull(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQPullHandler,ZMQ::PULL,address,socket_ctype,opts) end |
#zmq_push(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUSH type socket, can only send messages via send_message
200 201 202 |
# File 'lib/dripdrop/node.rb', line 200 def zmq_push(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQPushHandler,ZMQ::PUSH,address,socket_ctype,opts) end |
#zmq_subscribe(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::SUB type socket. Can only receive messages via on_receive
. zmq_subscribe sockets have a topic_filter
option, which restricts which messages they can receive. It takes a regexp as an option.
185 186 187 |
# File 'lib/dripdrop/node.rb', line 185 def zmq_subscribe(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts) end |
#zmq_xrep(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply to the original source of the message.
Receiving with XREP sockets in DripDrop is different than other types of sockets, on_receive passes 2 arguments to its callback, message
, and response
. A minimal example is shown below:
zmq_xrep(z_addr, :bind).on_receive do |,response|
response.()
end
216 217 218 |
# File 'lib/dripdrop/node.rb', line 216 def zmq_xrep(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXRepHandler,ZMQ::XREP,address,socket_ctype,opts) end |
#zmq_xreq(address, socket_ctype, opts = {}) ⇒ Object
See the documentation for zmq_xrep
for more info
221 222 223 |
# File 'lib/dripdrop/node.rb', line 221 def zmq_xreq(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts) end |