Class: IPCTransit
- Inherits:
-
Object
- Object
- IPCTransit
- Defined in:
- lib/ipc_transit.rb
Overview
Fast, brokerless message queueing
- Author
-
Dana M. Diederich ([email protected])
- Copyright
-
Copyright © 2012 Dana M. Diederich
- License
-
Distributes under the same terms as Ruby
Constant Summary collapse
- @@queues =
{}
- @@wire_header_arg_translate =
{ 'destination' => 'd', 'compression' => 'c', 'encoding' => 'e' }
- @@wire_header_args =
{ 'e' => { #encoding 'json' => 1, 'yaml' => 1, }, 'c' => { #compression 'zlib' => 1, 'snappy' => 1, 'none' => 1, }, 'd' => 1, #destination address 't' => 1, #hop TTL 'q' => 1, #destination qname }
- @@std_args =
destination qname
{ 'message' => 1, 'qname' => 1, 'nowait' => 1, }
Class Method Summary collapse
- .all_queue_info ⇒ Object
-
.all_queues ⇒ Object
Return info about all of the queues on the system.
-
.receive(args) ⇒ Object
Receive a message from a queue.
-
.remove(args) ⇒ Object
Remove a queue.
-
.send(args) ⇒ Object
Send message to a queue.
-
.unpack_data(args) ⇒ Object
Unpack the wire meta data from a message.
Class Method Details
.all_queue_info ⇒ Object
135 136 137 138 |
# File 'lib/ipc_transit.rb', line 135 def self.all_queue_info() self.gather_queue_info() return @@queues end |
.all_queues ⇒ Object
Return info about all of the queues on the system
Arguments: none
Returns: hash. key is qname, value contains:
qid - integer queue ID
count - number of messages in this queue
149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/ipc_transit.rb', line 149 def self.all_queues() ret = {} self.all_queue_info().each_pair do |qname,v| qid = v['qid'] x = MessageQueue.new(qid, IPC_CREAT | 0666) y = x.ipc_stat ct = y.msg_qnum ret[qname] = { 'qid' => qid, 'count' => ct, } end return ret end |
.receive(args) ⇒ Object
Receive a message from a queue
Arguments:
qname - name of queue to send to
nowait - do not block if the queue is full (optional)
raw - return the full meta-data (optional)
Returns:
Normally: message
Raw: the message and its meta data
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/ipc_transit.rb', line 109 def self.receive(args) ret = nil flags = 0 if args['nowait'] flags = IPC_NOWAIT end begin key = self.get_queue_id(args) mq = MessageQueue.new(key, IPC_CREAT | 0666) args['serialized_wire_data'] = mq.receive(0, 10000, flags) self.unpack_data(args) #at this point I need to see if this is a remote transit #if it is, then do not thaw the message proper args['message'] = transit_thaw(args) rescue Exception => msg # puts "Exception: #{msg}" # need to do something smarter with this end if args['raw'] ret = args; else ret = args['message'] end return ret end |
.remove(args) ⇒ Object
Remove a queue
Arguments:
qname - name of queue to remove
87 88 89 90 91 92 93 94 |
# File 'lib/ipc_transit.rb', line 87 def self.remove(args) qname = args['qname'] key = self.get_queue_id(args) mq = MessageQueue.new(key, 0) mq.ipc_rmid File.delete("#{$ipc_transit_config_path}/#{qname}") @@queues.delete(qname) end |
.send(args) ⇒ Object
Send message to a queue
Arguments:
message - hash reference
qname - name of queue to send to
nowait - do not block if the queue is full (optional)
encoding - currently JSON and YAML are implemented (optional, defaults to JSON)
compression - currently none and zlib are implemented (optional, defaults to none)
destination - IP or name of remote destination (optional)
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ipc_transit.rb', line 58 def self.send(args) ret = nil flags = IPC_NOWAIT begin if args['destination'] args['q'] = args['qname'] args['qname'] = 'transitd' if not args['t'] args['t'] = 9 end end key = self.get_queue_id(args) mq = MessageQueue.new(key, IPC_CREAT | 0666) if args['serialized_wire_data'].nil? (args) end ret = mq.snd(1, args['serialized_wire_data'], flags) rescue Exception => msg puts "Exception: #{msg}" end return ret end |
.unpack_data(args) ⇒ Object
Unpack the wire meta data from a message
Arguments:
serialized_wire_data - the serialized message
Returns: (in the passed args)
wire_headers - all of the wire headers
serialized_message - the message itself, still serialized
174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/ipc_transit.rb', line 174 def self.unpack_data(args) stuff = args['serialized_wire_data'].split(':') offset = Integer(stuff.shift) = stuff.join(':') if offset == 0 args['serialized_header'] = '' else args['serialized_header'] = [0..offset-1] self.thaw_wire_headers(args) end args['serialized_message'] = [offset...length] return true end |