Class: MessageStore::Postgres::Get

Inherits:
Object
  • Object
show all
Includes:
Get
Defined in:
lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/last.rb

Defined Under Namespace

Modules: Defaults, Deserialize, Time Classes: Last

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(batch_size: nil, session: nil, condition: nil) ⇒ Object



14
15
16
17
18
# File 'lib/message_store/postgres/get.rb', line 14

def self.build(batch_size: nil, session: nil, condition: nil)
  new(batch_size, condition).tap do |instance|
    instance.configure(session: session)
  end
end

.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object



30
31
32
33
# File 'lib/message_store/postgres/get.rb', line 30

def self.call(stream_name, position: nil, batch_size: nil, condition: nil,  session: nil)
  instance = build(batch_size: batch_size, condition: condition, session: session)
  instance.(stream_name, position: position)
end

.category_stream?(stream_name) ⇒ Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/message_store/postgres/get.rb', line 87

def self.category_stream?(stream_name)
  StreamName.category?(stream_name)
end

.configure(receiver, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object



20
21
22
23
24
# File 'lib/message_store/postgres/get.rb', line 20

def self.configure(receiver, attr_name: nil, batch_size: nil, condition: nil, session: nil)
  attr_name ||= :get
  instance = build(batch_size: batch_size, condition: condition, session: session)
  receiver.public_send "#{attr_name}=", instance
end

.constrain_condition(condition) ⇒ Object



71
72
73
74
75
# File 'lib/message_store/postgres/get.rb', line 71

def self.constrain_condition(condition)
  return nil if condition.nil?

  "(#{condition})"
end

.sql_command(stream_name, position, batch_size, condition) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/message_store/postgres/get.rb', line 77

def self.sql_command(stream_name, position, batch_size, condition)
  parameters = '$1::varchar, $2::bigint, $3::bigint, $4::varchar'

  if category_stream?(stream_name)
    return "SELECT * FROM get_category_messages(#{parameters});"
  else
    return "SELECT * FROM get_stream_messages(#{parameters});"
  end
end

Instance Method Details

#batch_sizeObject



10
11
12
# File 'lib/message_store/postgres/get.rb', line 10

def batch_size
  @batch_size ||= Defaults.batch_size
end

#call(stream_name, position: nil) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/message_store/postgres/get.rb', line 35

def call(stream_name, position: nil)
  logger.trace(tag: :get) { "Getting message data (Position: #{position.inspect}, Stream Name: #{stream_name}, Batch Size: #{batch_size.inspect})" }

  position ||= Defaults.position

  result = get_result(stream_name, position)

  message_data = convert(result)

  logger.info(tag: :get) { "Finished getting message data (Count: #{message_data.length}, Position: #{position.inspect}, Stream Name: #{stream_name}, Batch Size: #{batch_size.inspect})" }
  logger.info(tags: [:data, :message_data]) { message_data.pretty_inspect }

  message_data
end

#configure(session: nil) ⇒ Object



26
27
28
# File 'lib/message_store/postgres/get.rb', line 26

def configure(session: nil)
  Session.configure self, session: session
end

#convert(result) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/message_store/postgres/get.rb', line 91

def convert(result)
  logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }

  message_data = result.map do |record|
    record['data'] = Deserialize.data(record['data'])
    record['metadata'] = Deserialize.(record['metadata'])
    record['time'] = Time.utc_coerced(record['time'])

    MessageData::Read.build record
  end

  logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }

  message_data
end

#get_result(stream_name, position) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/message_store/postgres/get.rb', line 50

def get_result(stream_name, position)
  logger.trace(tag: :get) { "Getting result (Stream: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" }

  sql_command = self.class.sql_command(stream_name, position, batch_size, condition)

  cond = self.class.constrain_condition(condition)

  params = [
    stream_name,
    position,
    batch_size,
    cond
  ]

  result = session.execute(sql_command, params)

  logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, Stream: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" }

  result
end