Class: CZTop::Message
- Inherits:
-
Object
- Object
- CZTop::Message
- Extended by:
- HasFFIDelegate::ClassMethods
- Includes:
- CZMQ::FFI, HasFFIDelegate
- Defined in:
- lib/cztop/message.rb,
lib/cztop/message/frames.rb
Overview
Represents a CZMQ::FFI::Zmsg.
Defined Under Namespace
Classes: FramesAccessor
Instance Attribute Summary
Attributes included from HasFFIDelegate
Class Method Summary collapse
-
.coerce(msg) ⇒ Message
Coerces an object into a Message.
- .receive_from(source) ⇒ Message
Instance Method Summary collapse
-
#<<(frame) ⇒ self
Append a frame to this message.
-
#[](frame_index) ⇒ String?
Return a frame’s content.
-
#content_size ⇒ Integer
Size of this message in bytes.
-
#empty? ⇒ Boolean
If this message is empty or not (no frames or every frame has length zero).
- #frames ⇒ FramesAccessor
-
#initialize(parts = nil) ⇒ Message
constructor
A new instance of Message.
-
#inspect ⇒ String
Inspects this Message.
-
#pop ⇒ String?
Removes first part from message and returns it as a string.
-
#prepend(frame) ⇒ void
Prepend a frame to this message.
-
#routing_id ⇒ Integer
Gets the routing ID.
-
#routing_id=(new_routing_id) ⇒ new_routing_id
Sets a new routing ID.
- #send_to(destination) ⇒ void
-
#size ⇒ Integer
Number of frames.
-
#to_a ⇒ Array<String>
Returns all frames as strings in an array.
Methods included from HasFFIDelegate::ClassMethods
ffi_delegate, from_ffi_delegate
Methods included from HasFFIDelegate
#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #to_ptr
Constructor Details
#initialize(parts = nil) ⇒ Message
Returns a new instance of Message.
29 30 31 32 |
# File 'lib/cztop/message.rb', line 29 def initialize(parts = nil) attach_ffi_delegate(Zmsg.new) Array(parts).each { |part| self << part } if parts end |
Class Method Details
.coerce(msg) ⇒ Message
Coerces an object into a CZTop::Message.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/cztop/message.rb', line 15 def self.coerce(msg) case msg when Message msg when String, Frame, Array new(msg) else raise ArgumentError, format('cannot coerce message: %p', msg) end end |
.receive_from(source) ⇒ Message
Receive a CZTop::Message from a Socket or Actor.
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/cztop/message.rb', line 85 def self.receive_from(source) source.wait_readable delegate = Zmsg.recv(source) return from_ffi_delegate(delegate) unless delegate.null? HasFFIDelegate.raise_zmq_err rescue Errno::EAGAIN raise IO::EAGAINWaitReadable end |
Instance Method Details
#<<(frame) ⇒ self
If you provide a Frame, do NOT use that frame afterwards anymore, as its native counterpart will have been destroyed.
Append a frame to this message.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/cztop/message.rb', line 104 def <<(frame) case frame when String # NOTE: can't use addstr because the data might be binary mem = FFI::MemoryPointer.from_string(frame) rc = ffi_delegate.addmem(mem, mem.size - 1) # without NULL byte when Frame rc = ffi_delegate.append(frame.ffi_delegate) else raise ArgumentError, format('invalid frame: %p', frame) end raise_zmq_err unless rc.zero? self end |
#[](frame_index) ⇒ String?
Return a frame’s content.
191 192 193 194 |
# File 'lib/cztop/message.rb', line 191 def [](frame_index) frame = frames[frame_index] or return nil frame.to_s end |
#content_size ⇒ Integer
Returns size of this message in bytes.
155 156 157 |
# File 'lib/cztop/message.rb', line 155 def content_size ffi_delegate.content_size end |
#empty? ⇒ Boolean
Returns if this message is empty or not (no frames or every frame has length zero).
36 37 38 |
# File 'lib/cztop/message.rb', line 36 def empty? content_size.zero? end |
#frames ⇒ FramesAccessor
Access to this CZTop::Message‘s Frames.
15 16 17 |
# File 'lib/cztop/message/frames.rb', line 15 def frames FramesAccessor.new(self) end |
#inspect ⇒ String
Inspects this CZTop::Message.
182 183 184 185 |
# File 'lib/cztop/message.rb', line 182 def inspect format('#<%s:0x%x frames=%i content_size=%i content=%s>', self.class, to_ptr.address, size, content_size, content_size <= 500 ? to_a.inspect : '[...]') end |
#pop ⇒ String?
Removes first part from message and returns it as a string.
144 145 146 147 148 149 150 |
# File 'lib/cztop/message.rb', line 144 def pop # NOTE: can't use popstr because the data might be binary ptr = ffi_delegate.pop return nil if ptr.null? Frame.from_ffi_delegate(ptr).to_s end |
#prepend(frame) ⇒ void
If you provide a Frame, do NOT use that frame afterwards anymore, as its native counterpart will have been destroyed.
This method returns an undefined value.
Prepend a frame to this message.
127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/cztop/message.rb', line 127 def prepend(frame) case frame when String # NOTE: can't use pushstr because the data might be binary mem = FFI::MemoryPointer.from_string(frame) rc = ffi_delegate.pushmem(mem, mem.size - 1) # without NULL byte when Frame rc = ffi_delegate.prepend(frame.ffi_delegate) else raise ArgumentError, format('invalid frame: %p', frame) end raise_zmq_err unless rc.zero? end |
#routing_id ⇒ Integer
This only set when the frame came from a Socket::SERVER socket.
Gets the routing ID.
200 |
# File 'lib/cztop/message.rb', line 200 ffi_delegate :routing_id |
#routing_id=(new_routing_id) ⇒ new_routing_id
This is used when the message is sent to a Socket::SERVER socket.
Sets a new routing ID.
209 210 211 212 213 214 215 216 217 |
# File 'lib/cztop/message.rb', line 209 def routing_id=(new_routing_id) raise ArgumentError unless new_routing_id.is_a? Integer # need to raise manually, as FFI lacks this feature. # @see https://github.com/ffi/ffi/issues/473 raise RangeError if new_routing_id.negative? ffi_delegate.set_routing_id(new_routing_id) end |
#send_to(destination) ⇒ void
Do NOT use this CZTop::Message anymore afterwards. Its native counterpart will have been destroyed.
This method returns an undefined value.
Send CZTop::Message to a Socket or Actor.
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/cztop/message.rb', line 63 def send_to(destination) fail ArgumentError, "message has no frames" if size.zero? destination.wait_writable rc = Zmsg.send(ffi_delegate, destination) return if rc.zero? raise_zmq_err rescue Errno::EAGAIN raise IO::EAGAINWaitWritable end |
#size ⇒ Integer
Returns number of frames.
8 9 10 |
# File 'lib/cztop/message/frames.rb', line 8 def size ffi_delegate.size end |
#to_a ⇒ Array<String>
It’ll read all frames in the message and turn them into Ruby strings. This can be a problem if the message is huge/has huge frames.
Returns all frames as strings in an array. This is useful if for quick inspection of the message.
165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/cztop/message.rb', line 165 def to_a ffi_delegate = ffi_delegate() frame = ffi_delegate.first return [] if frame.null? arr = [frame.data.read_bytes(frame.size)] while (frame = ffi_delegate.next) && !frame.null? arr << frame.data.read_bytes(frame.size) end arr end |