Module: Emque::Producing::Message

Defined in:
lib/emque/producing/message/message.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

InvalidMessageError =
Class.new(StandardError)
MessagesNotSentError =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



75
76
77
78
79
# File 'lib/emque/producing/message/message.rb', line 75

def self.included(base)
  base.extend(ClassMethods)
  base.send(:include, Virtus.value_object)
  base.send(:attribute, :partition_key, String, :default => nil, :required => false)
end

Instance Method Details

#add_metadataObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/emque/producing/message/message.rb', line 81

def 
  {
    :metadata =>
    {
      :host => hostname,
      :app => app_name,
      :topic => topic,
      :created_at => formatted_time,
      :uuid => uuid,
      :type => message_type,
      :partition_key => partition_key
    }
  }.merge(public_attributes)
end

#ignored_exceptionsObject



108
109
110
# File 'lib/emque/producing/message/message.rb', line 108

def ignored_exceptions
  self.class.read_ignored_exceptions
end

#invalid_attributesObject



120
121
122
123
124
125
126
# File 'lib/emque/producing/message/message.rb', line 120

def invalid_attributes
  invalid_attrs = self.class.attribute_set.inject([]) do |attrs, attr|
    attrs << attr.name if attr.required? && self.attributes.fetch(attr.name).nil?
    attrs
  end
  Array(invalid_attrs) - self.class.private_attrs
end

#message_typeObject



100
101
102
# File 'lib/emque/producing/message/message.rb', line 100

def message_type
  self.class.read_message_type
end

#publish(publisher = nil) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/emque/producing/message/message.rb', line 133

def publish(publisher=nil)
  publisher ||= Emque::Producing.publisher
  log "publishing...", true
  if valid?
    log "valid...", true
    if Emque::Producing.configuration.publish_messages
      message = process_middleware(to_json)
      sent = publisher.publish(topic, message_type, message, partition_key, raise_on_failure?)
      log "sent #{sent}"
      raise MessagesNotSentError.new unless sent
    end
  else
    log "failed...", true
    raise InvalidMessageError.new(invalid_message)
  end
rescue *ignored_exceptions => error
  if raise_on_failure?
    raise
  else
    log "failed ignoring exception... #{error}", true
  end
end

#raise_on_failure?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/emque/producing/message/message.rb', line 104

def raise_on_failure?
  self.class.read_raise_on_failure
end

#to_jsonObject



128
129
130
131
# File 'lib/emque/producing/message/message.rb', line 128

def to_json
  data = self.
  Oj.dump(data, :mode => :compat)
end

#topicObject



96
97
98
# File 'lib/emque/producing/message/message.rb', line 96

def topic
  self.class.read_topic
end

#valid?Boolean

Returns:

  • (Boolean)


112
113
114
115
116
117
118
# File 'lib/emque/producing/message/message.rb', line 112

def valid?
  if invalid_attributes.empty? && topic && message_type
    true
  else
    false
  end
end