Class: MessageStore::Postgres::Put
- Inherits:
-
Object
- Object
- MessageStore::Postgres::Put
- Includes:
- Dependency, Log::Dependency
- Defined in:
- lib/message_store/postgres/put.rb
Class Method Summary collapse
- .build(session: nil) ⇒ Object
- .call(write_message, stream_name, expected_version: nil, session: nil) ⇒ Object
- .configure(receiver, session: nil, attr_name: nil) ⇒ Object
- .statement ⇒ Object
Instance Method Summary collapse
- #call(write_message, stream_name, expected_version: nil) ⇒ Object
- #configure(session: nil) ⇒ Object
- #destructure_message(write_message) ⇒ Object
- #execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) ⇒ Object
- #insert_message(id, stream_name, type, data, metadata, expected_version) ⇒ Object
- #position(records) ⇒ Object
- #raise_error(pg_error) ⇒ Object
- #transformed_data(data) ⇒ Object
- #transformed_metadata(metadata) ⇒ Object
Class Method Details
.build(session: nil) ⇒ Object
10 11 12 13 14 |
# File 'lib/message_store/postgres/put.rb', line 10 def self.build(session: nil) new.tap do |instance| instance.configure(session: session) end end |
.call(write_message, stream_name, expected_version: nil, session: nil) ⇒ Object
27 28 29 30 |
# File 'lib/message_store/postgres/put.rb', line 27 def self.call(, stream_name, expected_version: nil, session: nil) instance = build(session: session) instance.(, stream_name, expected_version: expected_version) end |
.configure(receiver, session: nil, attr_name: nil) ⇒ Object
21 22 23 24 25 |
# File 'lib/message_store/postgres/put.rb', line 21 def self.configure(receiver, session: nil, attr_name: nil) attr_name ||= :put instance = build(session: session) receiver.public_send "#{attr_name}=", instance end |
.statement ⇒ Object
91 92 93 |
# File 'lib/message_store/postgres/put.rb', line 91 def self.statement @statement ||= "SELECT write_message($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::bigint);" end |
Instance Method Details
#call(write_message, stream_name, expected_version: nil) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/message_store/postgres/put.rb', line 32 def call(, stream_name, expected_version: nil) logger.trace(tag: :put) { "Putting message data (Type: #{.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect})" } logger.trace(tags: [:data, :message_data]) { .pretty_inspect } .id ||= identifier.get id, type, data, = () expected_version = ExpectedVersion.canonize(expected_version) (id, stream_name, type, data, , expected_version).tap do |position| logger.info(tag: :put) { "Put message data (Type: #{.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect}, Position: #{position})" } logger.info(tags: [:data, :message_data]) { .pretty_inspect } end end |
#configure(session: nil) ⇒ Object
16 17 18 19 |
# File 'lib/message_store/postgres/put.rb', line 16 def configure(session: nil) Session.configure(self, session: session) Identifier::UUID::Random.configure(self) end |
#destructure_message(write_message) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/message_store/postgres/put.rb', line 47 def () id = .id type = .type data = .data = . logger.debug(tags: [:data, :message_data]) { "ID: #{id.pretty_inspect}" } logger.debug(tags: [:data, :message_data]) { "Type: #{type.pretty_inspect}" } logger.debug(tags: [:data, :message_data]) { "Data: #{data.pretty_inspect}" } logger.debug(tags: [:data, :message_data]) { "Metadata: #{.pretty_inspect}" } return id, type, data, end |
#execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/message_store/postgres/put.rb', line 68 def execute_query(id, stream_name, type, transformed_data, , expected_version) logger.trace(tag: :put) { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } params = [ id, stream_name, type, transformed_data, , expected_version ] begin records = session.execute(self.class.statement, params) rescue PG::RaiseException => e raise_error e end logger.debug(tag: :put) { "Executed insert (Type: #{type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } records end |
#insert_message(id, stream_name, type, data, metadata, expected_version) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/message_store/postgres/put.rb', line 61 def (id, stream_name, type, data, , expected_version) transformed_data = transformed_data(data) = () records = execute_query(id, stream_name, type, transformed_data, , expected_version) position(records) end |
#position(records) ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/message_store/postgres/put.rb', line 127 def position(records) position = nil unless records[0].nil? position = records[0].values[0] end position end |
#raise_error(pg_error) ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'lib/message_store/postgres/put.rb', line 135 def raise_error(pg_error) = pg_error. if .include? 'Wrong expected version' .gsub!('ERROR:', '').strip! logger.error { } raise ExpectedVersion::Error, end raise pg_error end |
#transformed_data(data) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/message_store/postgres/put.rb', line 95 def transformed_data(data) transformed_data = nil if data.is_a?(Hash) && data.empty? data = nil end unless data.nil? transformable_data = MessageData::Hash[data] transformed_data = Transform::Write.(transformable_data, :json) end logger.debug(tags: [:data, :serialize]) { "Transformed Data: #{transformed_data.inspect}" } transformed_data end |
#transformed_metadata(metadata) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/message_store/postgres/put.rb', line 111 def () = nil if .is_a?(Hash) && .empty? = nil end unless .nil? = MessageData::Hash[] = Transform::Write.(, :json) end logger.debug(tags: [:data, :serialize]) { "Transformed Metadata: #{.inspect}" } end |