Class: Pheme::TopicPublisher

Inherits:
Object
  • Object
show all
Includes:
Compression
Defined in:
lib/pheme/topic_publisher.rb

Constant Summary collapse

SNS_SIZE_LIMIT =

Constant with message size limit. The message size also includes some metadata: ‘name’ and ‘type’. We give ourselves a buffer for this metadata.

Source: docs.aws.amazon.com/sns/latest/dg/SNSMessageAttributes.html#SNSMessageAttributesNTV

256.kilobytes
EXPECTED_METADATA_SIZE =
1.kilobyte
MESSAGE_SIZE_LIMIT =
SNS_SIZE_LIMIT - EXPECTED_METADATA_SIZE

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Compression

#compress, #decompress

Constructor Details

#initialize(topic_arn: self.class._topic_arn) ⇒ TopicPublisher

Returns a new instance of TopicPublisher.

Raises:

  • (ArgumentError)


26
27
28
29
30
# File 'lib/pheme/topic_publisher.rb', line 26

def initialize(topic_arn: self.class._topic_arn)
  raise ArgumentError, "must specify non-nil topic_arn" if topic_arn.blank?

  @topic_arn = topic_arn
end

Class Attribute Details

._topic_arnObject (readonly)

Returns the value of attribute _topic_arn.



19
20
21
# File 'lib/pheme/topic_publisher.rb', line 19

def _topic_arn
  @_topic_arn
end

Instance Attribute Details

#topic_arnObject

Returns the value of attribute topic_arn.



31
32
33
# File 'lib/pheme/topic_publisher.rb', line 31

def topic_arn
  @topic_arn
end

Class Method Details

.topic_arn(topic_arn) ⇒ Object



21
22
23
# File 'lib/pheme/topic_publisher.rb', line 21

def topic_arn(topic_arn)
  @_topic_arn = topic_arn
end

Instance Method Details

#publish(message, sns_client: Pheme.configuration.sns_client, message_attributes: nil, message_deduplication_id: nil, message_group_id: nil) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pheme/topic_publisher.rb', line 37

def publish(message,
  sns_client: Pheme.configuration.sns_client,
  message_attributes: nil,
  message_deduplication_id: nil,
  message_group_id: nil)
  payload = {
    message: "#{self.class} publishing message to #{topic_arn}",
    body: message,
    publisher: self.class.to_s,
    topic_arn: topic_arn,
  }

  Pheme.logger.info(payload.except(:body).to_json)

  sns_client.publish(
    topic_arn: topic_arn,
    message: serialize(message),
    message_attributes: message_attributes,
    message_deduplication_id: message_deduplication_id,
    message_group_id: message_group_id,
  )
end

#publish_eventsObject

Raises:

  • (NotImplementedError)


33
34
35
# File 'lib/pheme/topic_publisher.rb', line 33

def publish_events
  raise NotImplementedError
end

#serialize(message) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/pheme/topic_publisher.rb', line 60

def serialize(message)
  message = message.to_json unless message.is_a? String

  return compress(message) if message.bytesize > MESSAGE_SIZE_LIMIT

  message
end