Class: MessageStore::Postgres::Get
- Inherits:
-
Object
- Object
- MessageStore::Postgres::Get
- Includes:
- Get
- Defined in:
- lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/last.rb
Defined Under Namespace
Modules: Defaults, Deserialize, Time Classes: Last
Class Method Summary collapse
- .build(batch_size: nil, session: nil, condition: nil) ⇒ Object
- .call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
- .category_stream?(stream_name) ⇒ Boolean
- .configure(receiver, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
- .constrain_condition(condition) ⇒ Object
- .sql_command(stream_name, position, batch_size, condition) ⇒ Object
Instance Method Summary collapse
- #batch_size ⇒ Object
- #call(stream_name, position: nil) ⇒ Object
- #configure(session: nil) ⇒ Object
- #convert(result) ⇒ Object
- #get_result(stream_name, position) ⇒ Object
Class Method Details
.build(batch_size: nil, session: nil, condition: nil) ⇒ Object
14 15 16 17 18 |
# File 'lib/message_store/postgres/get.rb', line 14 def self.build(batch_size: nil, session: nil, condition: nil) new(batch_size, condition).tap do |instance| instance.configure(session: session) end end |
.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
30 31 32 33 |
# File 'lib/message_store/postgres/get.rb', line 30 def self.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) instance = build(batch_size: batch_size, condition: condition, session: session) instance.(stream_name, position: position) end |
.category_stream?(stream_name) ⇒ Boolean
87 88 89 |
# File 'lib/message_store/postgres/get.rb', line 87 def self.category_stream?(stream_name) StreamName.category?(stream_name) end |
.configure(receiver, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
20 21 22 23 24 |
# File 'lib/message_store/postgres/get.rb', line 20 def self.configure(receiver, attr_name: nil, batch_size: nil, condition: nil, session: nil) attr_name ||= :get instance = build(batch_size: batch_size, condition: condition, session: session) receiver.public_send "#{attr_name}=", instance end |
.constrain_condition(condition) ⇒ Object
71 72 73 74 75 |
# File 'lib/message_store/postgres/get.rb', line 71 def self.constrain_condition(condition) return nil if condition.nil? "(#{condition})" end |
.sql_command(stream_name, position, batch_size, condition) ⇒ Object
77 78 79 80 81 82 83 84 85 |
# File 'lib/message_store/postgres/get.rb', line 77 def self.sql_command(stream_name, position, batch_size, condition) parameters = '$1::varchar, $2::bigint, $3::bigint, $4::varchar' if category_stream?(stream_name) return "SELECT * FROM get_category_messages(#{parameters});" else return "SELECT * FROM get_stream_messages(#{parameters});" end end |
Instance Method Details
#batch_size ⇒ Object
10 11 12 |
# File 'lib/message_store/postgres/get.rb', line 10 def batch_size @batch_size ||= Defaults.batch_size end |
#call(stream_name, position: nil) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/message_store/postgres/get.rb', line 35 def call(stream_name, position: nil) logger.trace(tag: :get) { "Getting message data (Position: #{position.inspect}, Stream Name: #{stream_name}, Batch Size: #{batch_size.inspect})" } position ||= Defaults.position result = get_result(stream_name, position) = convert(result) logger.info(tag: :get) { "Finished getting message data (Count: #{.length}, Position: #{position.inspect}, Stream Name: #{stream_name}, Batch Size: #{batch_size.inspect})" } logger.info(tags: [:data, :message_data]) { .pretty_inspect } end |
#configure(session: nil) ⇒ Object
26 27 28 |
# File 'lib/message_store/postgres/get.rb', line 26 def configure(session: nil) Session.configure self, session: session end |
#convert(result) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/message_store/postgres/get.rb', line 91 def convert(result) logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" } = result.map do |record| record['data'] = Deserialize.data(record['data']) record['metadata'] = Deserialize.(record['metadata']) record['time'] = Time.utc_coerced(record['time']) MessageData::Read.build record end logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{.length})" } end |
#get_result(stream_name, position) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/message_store/postgres/get.rb', line 50 def get_result(stream_name, position) logger.trace(tag: :get) { "Getting result (Stream: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" } sql_command = self.class.sql_command(stream_name, position, batch_size, condition) cond = self.class.constrain_condition(condition) params = [ stream_name, position, batch_size, cond ] result = session.execute(sql_command, params) logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, Stream: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" } result end |