Class: MessageStore::Postgres::Put

Inherits:
Object
  • Object
show all
Includes:
Dependency, Log::Dependency
Defined in:
lib/message_store/postgres/put.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(session: nil) ⇒ Object



10
11
12
13
14
# File 'lib/message_store/postgres/put.rb', line 10

def self.build(session: nil)
  new.tap do |instance|
    instance.configure(session: session)
  end
end

.call(write_message, stream_name, expected_version: nil, session: nil) ⇒ Object



27
28
29
30
# File 'lib/message_store/postgres/put.rb', line 27

def self.call(write_message, stream_name, expected_version: nil, session: nil)
  instance = build(session: session)
  instance.(write_message, stream_name, expected_version: expected_version)
end

.configure(receiver, session: nil, attr_name: nil) ⇒ Object



21
22
23
24
25
# File 'lib/message_store/postgres/put.rb', line 21

def self.configure(receiver, session: nil, attr_name: nil)
  attr_name ||= :put
  instance = build(session: session)
  receiver.public_send "#{attr_name}=", instance
end

.statementObject



91
92
93
# File 'lib/message_store/postgres/put.rb', line 91

def self.statement
  @statement ||= "SELECT write_message($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::bigint);"
end

Instance Method Details

#call(write_message, stream_name, expected_version: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/message_store/postgres/put.rb', line 32

def call(write_message, stream_name, expected_version: nil)
  logger.trace(tag: :put) { "Putting message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect})" }
  logger.trace(tags: [:data, :message_data]) { write_message.pretty_inspect }

  write_message.id ||= identifier.get

  id, type, data,  = destructure_message(write_message)
  expected_version = ExpectedVersion.canonize(expected_version)

  insert_message(id, stream_name, type, data, , expected_version).tap do |position|
    logger.info(tag: :put) { "Put message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect}, Position: #{position})" }
    logger.info(tags: [:data, :message_data]) { write_message.pretty_inspect }
  end
end

#configure(session: nil) ⇒ Object



16
17
18
19
# File 'lib/message_store/postgres/put.rb', line 16

def configure(session: nil)
  Session.configure(self, session: session)
  Identifier::UUID::Random.configure(self)
end

#destructure_message(write_message) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/message_store/postgres/put.rb', line 47

def destructure_message(write_message)
  id = write_message.id
  type = write_message.type
  data = write_message.data
   = write_message.

  logger.debug(tags: [:data, :message_data]) { "ID: #{id.pretty_inspect}" }
  logger.debug(tags: [:data, :message_data]) { "Type: #{type.pretty_inspect}" }
  logger.debug(tags: [:data, :message_data]) { "Data: #{data.pretty_inspect}" }
  logger.debug(tags: [:data, :message_data]) { "Metadata: #{.pretty_inspect}" }

  return id, type, data, 
end

#execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/message_store/postgres/put.rb', line 68

def execute_query(id, stream_name, type, transformed_data, , expected_version)
  logger.trace(tag: :put) { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }

  params = [
    id,
    stream_name,
    type,
    transformed_data,
    ,
    expected_version
  ]

  begin
    records = session.execute(self.class.statement, params)
  rescue PG::RaiseException => e
    raise_error e
  end

  logger.debug(tag: :put) { "Executed insert (Type: #{type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }

  records
end

#insert_message(id, stream_name, type, data, metadata, expected_version) ⇒ Object



61
62
63
64
65
66
# File 'lib/message_store/postgres/put.rb', line 61

def insert_message(id, stream_name, type, data, , expected_version)
  transformed_data = transformed_data(data)
   = ()
  records = execute_query(id, stream_name, type, transformed_data, , expected_version)
  position(records)
end

#position(records) ⇒ Object



127
128
129
130
131
132
133
# File 'lib/message_store/postgres/put.rb', line 127

def position(records)
  position = nil
  unless records[0].nil?
    position = records[0].values[0]
  end
  position
end

#raise_error(pg_error) ⇒ Object



135
136
137
138
139
140
141
142
143
# File 'lib/message_store/postgres/put.rb', line 135

def raise_error(pg_error)
  error_message = pg_error.message
  if error_message.include? 'Wrong expected version'
    error_message.gsub!('ERROR:', '').strip!
    logger.error { error_message }
    raise ExpectedVersion::Error, error_message
  end
  raise pg_error
end

#transformed_data(data) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/message_store/postgres/put.rb', line 95

def transformed_data(data)
  transformed_data = nil

  if data.is_a?(Hash) && data.empty?
    data = nil
  end

  unless data.nil?
    transformable_data = MessageData::Hash[data]
    transformed_data = Transform::Write.(transformable_data, :json)
  end

  logger.debug(tags: [:data, :serialize]) { "Transformed Data: #{transformed_data.inspect}" }
  transformed_data
end

#transformed_metadata(metadata) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/message_store/postgres/put.rb', line 111

def ()
   = nil

  if .is_a?(Hash) && .empty?
     = nil
  end

  unless .nil?
     = MessageData::Hash[]
     = Transform::Write.(, :json)
  end

  logger.debug(tags: [:data, :serialize]) { "Transformed Metadata: #{.inspect}" }
  
end