Module: MessageStore::Postgres::Get
- Included in:
- Category, Stream
- Defined in:
- lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/category.rb,
lib/message_store/postgres/get/condition.rb,
lib/message_store/postgres/get/stream/last.rb,
lib/message_store/postgres/get/category/correlation.rb,
lib/message_store/postgres/get/category/consumer_group.rb
Defined Under Namespace
Modules: BatchSize, Call, Condition, Defaults, Deserialize, Time
Classes: Category, Stream
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.build(stream_name, **args) ⇒ Object
31
32
33
34
|
# File 'lib/message_store/postgres/get.rb', line 31
def self.build(stream_name, **args)
cls = specialization(stream_name)
cls.build(stream_name, **args)
end
|
.call(stream_name, **args) ⇒ Object
48
49
50
51
52
|
# File 'lib/message_store/postgres/get.rb', line 48
def self.call(stream_name, **args)
position = args.delete(:position)
instance = build(stream_name, **args)
instance.(position)
end
|
36
37
38
39
40
41
42
|
# File 'lib/message_store/postgres/get.rb', line 36
def self.configure(receiver, stream_name, **args)
attr_name = args.delete(:attr_name)
attr_name ||= :get
instance = build(stream_name, **args)
receiver.public_send("#{attr_name}=", instance)
end
|
.error_message(pg_error) ⇒ Object
128
129
130
|
# File 'lib/message_store/postgres/get.rb', line 128
def self.error_message(pg_error)
pg_error.message.gsub('ERROR:', '').strip
end
|
.included(cls) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/message_store/postgres/get.rb', line 4
def self.included(cls)
cls.class_exec do
include MessageStore::Get
prepend Call
prepend BatchSize
dependency :session, Session
template_method! :stream_name
template_method! :sql_command
template_method! :parameters
template_method! :parameter_values
template_method! :last_position
template_method! :log_text
template_method :specialize_error
template_method :assure
end
end
|
.message_data(record) ⇒ Object
103
104
105
106
107
108
109
|
# File 'lib/message_store/postgres/get.rb', line 103
def self.message_data(record)
record['data'] = Get::Deserialize.data(record['data'])
record['metadata'] = Get::Deserialize.metadata(record['metadata'])
record['time'] = Get::Time.utc_coerced(record['time'])
MessageData::Read.build(record)
end
|
.specialization(stream_name) ⇒ Object
132
133
134
135
136
137
138
|
# File 'lib/message_store/postgres/get.rb', line 132
def self.specialization(stream_name)
if StreamName.category?(stream_name)
Category
else
Stream
end
end
|
Instance Method Details
44
45
46
|
# File 'lib/message_store/postgres/get.rb', line 44
def configure(session: nil)
Session.configure(self, session: session)
end
|
#convert(result) ⇒ Object
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/message_store/postgres/get.rb', line 91
def convert(result)
logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }
message_data = result.map do |record|
Get.message_data(record)
end
logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }
message_data
end
|
#get_result(stream_name, position) ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/message_store/postgres/get.rb', line 75
def get_result(stream_name, position)
logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" }
parameter_values = parameter_values(stream_name, position)
begin
result = session.execute(sql_command, parameter_values)
rescue PG::RaiseException => e
raise_error(e)
end
logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" }
result
end
|
#raise_error(pg_error) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/message_store/postgres/get.rb', line 111
def raise_error(pg_error)
error_message = Get.error_message(pg_error)
error = Condition.error(error_message)
if error.nil?
error = specialize_error(error_message)
end
if not error.nil?
logger.error { error_message }
raise error
end
raise pg_error
end
|