Class: RJR::Node

Inherits:
Object show all
Defined in:
lib/rjr/node.rb

Overview

Base RJR Node interface. Nodes are the central transport mechanism of RJR, this class provides the core methods common among all transport types and mechanisms to start and run the subsystems which drives all requests.

A subclass of RJR::Node should be defined for each transport that is supported. Each subclass should define

* RJR_NODE_TYPE - unique id of the transport
* listen method - begin listening for new requests and return
* send_message(msg, connection) - send message using the specified connection
  (transport dependent)
* invoke - establish connection, send message, and wait for / return result
* notify - establish connection, send message, and immediately return

Not all methods necessarily have to be implemented depending on the context / use of the node, and the base node class provides many utility methods which to assist in message processing (see below).

See nodes residing in lib/rjr/nodes/ for specific examples.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Node

RJR::Node initializer

Parameters:

  • args (Hash) (defaults to: {})

    options to set on request

Options Hash (args):

  • :node_id (String)

    unique id of the node

  • :headers (Hash<String,String>)

    optional headers to set on all json-rpc messages

  • :dispatcher (Dispatcher)

    dispatcher to assign to the node



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rjr/node.rb', line 103

def initialize(args = {})
   clear_event_handlers
   @response_lock = Mutex.new
   @response_cv   = ConditionVariable.new
   @responses     = []

   @node_id         = args[:node_id]
   @dispatcher      = args[:dispatcher] || RJR::Dispatcher.new
   @message_headers = args.has_key?(:headers) ? {}.merge(args[:headers]) : {}

   @@tp ||= ThreadPool.new
   @@em ||= EMAdapter.new

   # will do nothing if already started
   tp.start
   em.start
end

Instance Attribute Details

#connection_event_handlersObject (readonly)

Handlers for various connection events



48
49
50
# File 'lib/rjr/node.rb', line 48

def connection_event_handlers
  @connection_event_handlers
end

#dispatcherObject

Dispatcher to use to satisfy requests



45
46
47
# File 'lib/rjr/node.rb', line 45

def dispatcher
  @dispatcher
end

#message_headersObject

Attitional header fields to set on all requests and responses received and sent by node



42
43
44
# File 'lib/rjr/node.rb', line 42

def message_headers
  @message_headers
end

#node_idObject (readonly)

Unique string identifier of the node



38
39
40
# File 'lib/rjr/node.rb', line 38

def node_id
  @node_id
end

Class Method Details

.emObject



80
81
82
# File 'lib/rjr/node.rb', line 80

def self.em
  defined?(@@em) ? @@em : nil
end

.indirect?Boolean

Bool indiciting if this node is indirect

Returns:

  • (Boolean)


58
59
60
61
# File 'lib/rjr/node.rb', line 58

def indirect?
  self.const_defined?(:INDIRECT_NODE) &&
  self.const_get(:INDIRECT_NODE)
end

.persistent?Boolean

Bool indiciting if this node is persistent

Returns:

  • (Boolean)


52
53
54
55
# File 'lib/rjr/node.rb', line 52

def persistent?
  self.const_defined?(:PERSISTENT_NODE) &&
  self.const_get(:PERSISTENT_NODE)
end

.tpObject



88
89
90
# File 'lib/rjr/node.rb', line 88

def self.tp
  defined?(@@tp) ? @@tp : nil
end

Instance Method Details

#clear_event_handlersObject

Reset connection event handlers



145
146
147
# File 'lib/rjr/node.rb', line 145

def clear_event_handlers
  @connection_event_handlers = {:closed => [], :error => []}
end

#emObject



84
85
86
# File 'lib/rjr/node.rb', line 84

def em
  self.class.em
end

#haltObject

Immediately terminate the node

Warning this does what it says it does. All running threads, and reactor jobs are immediately killed

Returns:

  • self



137
138
139
140
141
# File 'lib/rjr/node.rb', line 137

def halt
  em.stop_event_loop
  tp.stop
  self
end

#indirect?Boolean

Bool indicating if this node class is indirect

Returns:

  • (Boolean)


70
71
72
# File 'lib/rjr/node.rb', line 70

def indirect?
  self.class.indirect?
end

#joinObject

Block until the eventmachine reactor and thread pool have both completed running.

Returns:

  • self



125
126
127
128
129
# File 'lib/rjr/node.rb', line 125

def join
  tp.join
  em.join
  self
end

#node_typeObject

alias of RJR_NODE_TYPE



75
76
77
78
# File 'lib/rjr/node.rb', line 75

def node_type
  self.class.const_defined?(:RJR_NODE_TYPE) ?
  self.class.const_get(:RJR_NODE_TYPE) : nil
end

#on(event, &handler) {|Node| ... } ⇒ Object

Register connection event handler

Parameters:

  • event (:error, :close)

    the event to register the handler for

  • handler (Callable)

    block param to be added to array of handlers that are called when event occurs

Yields:

  • (Node)

    self is passed to each registered handler when event occurs



154
155
156
157
# File 'lib/rjr/node.rb', line 154

def on(event, &handler)
  return unless @connection_event_handlers.keys.include?(event)
  @connection_event_handlers[event] << handler
end

#persistent?Boolean

Bool indicating if this node class is persistent

Returns:

  • (Boolean)


65
66
67
# File 'lib/rjr/node.rb', line 65

def persistent?
  self.class.persistent?
end

#tpObject



92
93
94
# File 'lib/rjr/node.rb', line 92

def tp
  self.class.tp
end