Class: IPCTransit

Inherits:
Object
  • Object
show all
Defined in:
lib/ipc_transit.rb

Overview

Fast, brokerless message queueing

Author

Dana M. Diederich ([email protected])

Copyright

Copyright © 2012 Dana M. Diederich

License

Distributes under the same terms as Ruby

Constant Summary collapse

@@queues =
{}
@@wire_header_arg_translate =
{
    'destination' => 'd',
    'compression' => 'c',
    'encoding' => 'e'
}
@@wire_header_args =
{
    'e' => { #encoding
        'json' => 1,
        'yaml' => 1,
    },
    'c' => { #compression
        'zlib' => 1,
        'snappy' => 1,
        'none' => 1,
    },
    'd' => 1, #destination address
    't' => 1, #hop TTL
    'q' => 1, #destination qname
}
@@std_args =

destination qname

{
    'message' => 1,
    'qname' => 1,
    'nowait' => 1,
}

Class Method Summary collapse

Class Method Details

.all_queue_infoObject



135
136
137
138
# File 'lib/ipc_transit.rb', line 135

def self.all_queue_info()
    self.gather_queue_info()
    return @@queues
end

.all_queuesObject

Return info about all of the queues on the system

Arguments: none

Returns: hash. key is qname, value contains:

qid - integer queue ID
count - number of messages in this queue


149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/ipc_transit.rb', line 149

def self.all_queues()
    ret = {}
    self.all_queue_info().each_pair do |qname,v|
        qid = v['qid']
        x = MessageQueue.new(qid, IPC_CREAT | 0666)
        y = x.ipc_stat
        ct = y.msg_qnum
        ret[qname] = {
            'qid' => qid,
            'count' => ct,
        }
    end
    return ret
end

.receive(args) ⇒ Object

Receive a message from a queue

Arguments:

qname - name of queue to send to
nowait - do not block if the queue is full (optional)
raw - return the full meta-data (optional)

Returns:
 Normally: message
 Raw: the message and its meta data


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/ipc_transit.rb', line 109

def self.receive(args)
    ret = nil
    flags = 0
    if args['nowait']
        flags = IPC_NOWAIT
    end
    begin
        key = self.get_queue_id(args)
        mq = MessageQueue.new(key, IPC_CREAT | 0666)
        args['serialized_wire_data'] = mq.receive(0, 10000, flags)
        self.unpack_data(args)
        #at this point I need to see if this is a remote transit
        #if it is, then do not thaw the message proper
        args['message'] = transit_thaw(args)
    rescue Exception => msg
#            puts "Exception: #{msg}"
#            need to do something smarter with this
    end
    if args['raw']
        ret = args;
    else
        ret = args['message']
    end
    return ret
end

.remove(args) ⇒ Object

Remove a queue

Arguments:

qname - name of queue to remove


87
88
89
90
91
92
93
94
# File 'lib/ipc_transit.rb', line 87

def self.remove(args)
    qname = args['qname']
    key = self.get_queue_id(args)
    mq = MessageQueue.new(key, 0)
    mq.ipc_rmid
    File.delete("#{$ipc_transit_config_path}/#{qname}")
    @@queues.delete(qname)
end

.send(args) ⇒ Object

Send message to a queue

Arguments:

message - hash reference
qname - name of queue to send to
nowait - do not block if the queue is full (optional)
encoding - currently JSON and YAML are implemented (optional, defaults to JSON)
compression - currently none and zlib are implemented (optional, defaults to none)
destination - IP or name of remote destination (optional)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ipc_transit.rb', line 58

def self.send(args)
    ret = nil
    flags = IPC_NOWAIT
    begin
        if args['destination']
            args['q'] = args['qname']
            args['qname'] = 'transitd'
            if not args['t']
                args['t'] = 9
            end
        end
        key = self.get_queue_id(args)
        mq = MessageQueue.new(key, IPC_CREAT | 0666)
        if args['serialized_wire_data'].nil?
            pack_message(args)
        end
        ret = mq.snd(1, args['serialized_wire_data'], flags)
    rescue Exception => msg
        puts "Exception: #{msg}"
    end
    return ret
end

.unpack_data(args) ⇒ Object

Unpack the wire meta data from a message

Arguments:

serialized_wire_data - the serialized message

Returns: (in the passed args)

wire_headers - all of the wire headers
serialized_message - the message itself, still serialized


174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/ipc_transit.rb', line 174

def self.unpack_data(args)
    stuff = args['serialized_wire_data'].split(':')
    offset = Integer(stuff.shift)
    header_and_message = stuff.join(':')
    if offset == 0
        args['serialized_header'] = ''
    else
        args['serialized_header'] = header_and_message[0..offset-1]
        self.thaw_wire_headers(args)
    end
    args['serialized_message'] = header_and_message[offset..header_and_message.length]
    return true
end