Class: Messagebus::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/messagebus/message.rb

Overview

Container class for frames, misnamed technically

Constant Summary collapse

@@serializer =
::Thrift::Serializer.new
@@deserializer =
::Thrift::Deserializer.new
@@serializer_lock =
Mutex.new
@@deserializer_lock =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#raw_messageObject (readonly)

Returns the value of attribute raw_message.



38
39
40
# File 'lib/messagebus/message.rb', line 38

def raw_message
  @raw_message
end

Class Method Details

.create(payload, properties = nil, binary = false) ⇒ Object



46
47
48
# File 'lib/messagebus/message.rb', line 46

def create(payload, properties = nil, binary = false)
  Messagebus::Message.create_message(define_thrift(payload, binary), properties)
end

.create_message(payload, properties = nil) ⇒ Object



157
158
159
160
161
162
163
164
# File 'lib/messagebus/message.rb', line 157

def create_message(payload, properties = nil)
  raw_message = Messagebus::Thrift::MessageInternal.new
  raw_message.messageId = Digest::MD5.hexdigest(get_salted_payload(payload))
  raw_message.payload = payload
  raw_message.properties = properties if properties

  Message.new(raw_message)
end

.create_message_from_message_internal(raw_message) ⇒ Object

TODO: Why use this in utils when trying to follow a factory pattern?



99
100
101
# File 'lib/messagebus/message.rb', line 99

def create_message_from_message_internal(raw_message)
  Message.new(raw_message)
end

.define_thrift(payload, binary = false) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/messagebus/message.rb', line 63

def define_thrift(payload, binary = false)
  options = {}
  
  if binary        
	  if RUBY_VERSION.to_f >= 1.9 
	      payload.force_encoding('UTF-8')
	  end
    options.merge!({
      :messageFormat => Messagebus::Thrift::MessagePayloadType::BINARY,
      :binaryPayload => payload
    })
  elsif payload.is_a?(Hash) || (payload.respond_to?(:to_json) && !payload.is_a?(String))
    options.merge!({
      :messageFormat => Messagebus::Thrift::MessagePayloadType::JSON,
      :stringPayload => payload.to_json
    })
  elsif payload.is_a?(String)
    #Only UTF-8 is supported by thrift. Should warn to use binary if not ascii or utf-8 but there's no logger
    #I believe all users use utf8 or ascii.
    if RUBY_VERSION.to_f >= 1.9 
       payload.force_encoding('UTF-8')
    end
    options.merge!({
      :messageFormat => Messagebus::Thrift::MessagePayloadType::STRING,
      :stringPayload => payload
    })
  else
    # TODO: Create specific error class
    raise "Type not supported"
  end

  Messagebus::Thrift::MessagePayload.new(options)
end

.get_message_from_thrift_binary(body) ⇒ Object

Creates message from base64 encoded thrift bytes. We use Stomp protocol for server communication which internally uses string (utf-8). Base64 encoding is needed to avoid corner cases with weird bytes etc.



54
55
56
57
58
59
60
61
# File 'lib/messagebus/message.rb', line 54

def get_message_from_thrift_binary(body)
  binary_string = Base64.decode64(body)
  rmessage = nil
  @@deserializer_lock.synchronize do
    rmessage = @@deserializer.deserialize(Messagebus::Thrift::MessageInternal.new, binary_string)
  end
  Messagebus::Message.create_message_from_message_internal(rmessage)
end

.get_salted_payload(payload) ⇒ Object

Returns the payload as a string salted with current milliseconds since epoch.



149
150
151
152
153
154
155
# File 'lib/messagebus/message.rb', line 149

def get_salted_payload(payload)
  t = Time.now
  data = payload.binary? ? payload.binaryPayload : payload.stringPayload
  data += t.to_i.to_s
  data += t.tv_usec.to_s 
  data
end

Instance Method Details

#message_idObject



104
105
106
# File 'lib/messagebus/message.rb', line 104

def message_id
  @raw_message.messageId
end

#message_propertiesObject



108
109
110
# File 'lib/messagebus/message.rb', line 108

def message_properties
  @raw_message.properties
end

#message_properties=(hash) ⇒ Object



112
113
114
# File 'lib/messagebus/message.rb', line 112

def message_properties=(hash)
  @raw_message.properties = hash
end

#payloadObject



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/messagebus/message.rb', line 120

def payload
  payload = @raw_message.payload

  if payload.binary?
    @raw_message.payload.binaryPayload
  elsif payload.json? || payload.string?
    @raw_message.payload.stringPayload
  else
    raise "Payload is not an understandable type: #{payload.messageFormat}"
  end
end

#payload_typeObject



116
117
118
# File 'lib/messagebus/message.rb', line 116

def payload_type
  @raw_message.payload.messageFormat
end

#to_thrift_binaryObject



132
133
134
135
136
137
138
# File 'lib/messagebus/message.rb', line 132

def to_thrift_binary
  binary_string = nil
  @@serializer_lock.synchronize do
    binary_string = @@serializer.serialize(@raw_message)
  end
  Base64.encode64(binary_string);
end