Class: Redstream::Producer
- Inherits:
-
Object
- Object
- Redstream::Producer
- Includes:
- MonitorMixin
- Defined in:
- lib/redstream/producer.rb
Overview
A Redstream::Producer is responsible for writing the actual messages to redis. This includes the delay messages as well as the messages for immediate retrieval. Usually, you don’t have to use a producer directly. Instead, Redstream::Model handles all producer related interaction. However, Redstream::Model is not able to recognize model updates resulting from model updates via e.g. #update_all, #delete_all, etc, i.e. updates which by-pass model callbacks. Thus, calls to e.g. #update_all must be wrapped with ‘find_in_batches` and Redstream::Producer#bulk (see example), to write these updates to the redis streams as well.
Instance Method Summary collapse
-
#bulk(records) ⇒ Object
Use to wrap calls to #update_all, #delete_all, etc.
-
#bulk_delay(records) ⇒ Object
private
Writes delay messages to a delay stream in redis.
-
#bulk_queue(records) ⇒ Object
private
Writes messages to a stream in redis for immediate retrieval.
-
#delay(object) ⇒ Object
private
Writes a single delay message to a delay stream in redis.
-
#initialize(wait: false) ⇒ Producer
constructor
Initializes a new producer.
-
#queue(object) ⇒ Object
private
Writes a single message to a stream in redis for immediate retrieval.
Constructor Details
#initialize(wait: false) ⇒ Producer
Initializes a new producer. In case you’re using a distributed redis setup, you can use redis WAIT to improve real world data safety via the wait param.
32 33 34 35 36 37 |
# File 'lib/redstream/producer.rb', line 32 def initialize(wait: false) @wait = wait @stream_name_cache = {} super() end |
Instance Method Details
#bulk(records) ⇒ Object
Use to wrap calls to #update_all, #delete_all, etc. I.e. methods, which by-pass model lifecycle callbacks (after_save, etc.), as Redstream::Model can’t recognize these updates and write them to redis streams automatically. You need to pass the records to be updated to the bulk method. The bulk method writes delay messages for the records to kafka, then yields and the writes the message for immediate retrieval. The method must ensure that the same set of records is used for the delay messages and the instant messages. Thus, you optimally, pass an array of records to it. If you pass an ActiveRecord::Relation, the method converts it to an array, i.e. loading all matching records into memory.
52 53 54 55 56 57 58 59 60 |
# File 'lib/redstream/producer.rb', line 52 def bulk(records) records_array = Array(records) bulk_delay(records_array) yield bulk_queue(records_array) end |
#bulk_delay(records) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes delay messages to a delay stream in redis.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/redstream/producer.rb', line 70 def bulk_delay(records) records.each_slice(250) do |slice| Redstream.connection_pool.with do |redis| redis.pipelined do |pipeline| slice.each do |object| pipeline.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) }) end end end end Redstream.connection_pool.with do |redis| redis.wait(@wait, 0) if @wait end true end |
#bulk_queue(records) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes messages to a stream in redis for immediate retrieval.
94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/redstream/producer.rb', line 94 def bulk_queue(records) records.each_slice(250) do |slice| Redstream.connection_pool.with do |redis| redis.pipelined do |pipeline| slice.each do |object| pipeline.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) }) end end end end true end |
#delay(object) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes a single delay message to a delay stream in redis.
116 117 118 119 120 121 122 |
# File 'lib/redstream/producer.rb', line 116 def delay(object) Redstream.connection_pool.with do |redis| res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) }) redis.wait(@wait, 0) if @wait res end end |
#queue(object) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes a single message to a stream in redis for immediate retrieval.
130 131 132 133 134 135 136 |
# File 'lib/redstream/producer.rb', line 130 def queue(object) Redstream.connection_pool.with do |redis| redis.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) }) end true end |