Class: Redstream::Producer

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/redstream/producer.rb

Overview

A Redstream::Producer is responsible for writing the actual messages to redis. This includes the delay messages as well as the messages for immediate retrieval. Usually, you don’t have to use a producer directly. Instead, Redstream::Model handles all producer related interaction. However, Redstream::Model is not able to recognize model updates resulting from model updates via e.g. #update_all, #delete_all, etc, i.e. updates which by-pass model callbacks. Thus, calls to e.g. #update_all must be wrapped with ‘find_in_batches` and Redstream::Producer#bulk (see example), to write these updates to the redis streams as well.

Examples:

producer = Redstream::Producer.new

User.where(confirmed: true).find_in_batches do |users|
  producer.bulk users do
    User.where(id: users.map(&:id)).update_all(send_mailing: true)
  end
end

Instance Method Summary collapse

Constructor Details

#initialize(wait: false) ⇒ Producer

Initializes a new producer. In case you’re using a distributed redis setup, you can use redis WAIT to improve real world data safety via the wait param.

Parameters:

  • wait (Boolean, Integer) (defaults to: false)

    Defaults to false. Specify an integer to enable using redis WAIT for writing delay messages. Check out the redis docs for more info regarding WAIT.



32
33
34
35
36
37
# File 'lib/redstream/producer.rb', line 32

def initialize(wait: false)
  @wait = wait
  @stream_name_cache = {}

  super()
end

Instance Method Details

#bulk(records) ⇒ Object

Use to wrap calls to #update_all, #delete_all, etc. I.e. methods, which by-pass model lifecycle callbacks (after_save, etc.), as Redstream::Model can’t recognize these updates and write them to redis streams automatically. You need to pass the records to be updated to the bulk method. The bulk method writes delay messages for the records to kafka, then yields and the writes the message for immediate retrieval. The method must ensure that the same set of records is used for the delay messages and the instant messages. Thus, you optimally, pass an array of records to it. If you pass an ActiveRecord::Relation, the method converts it to an array, i.e. loading all matching records into memory.

Parameters:

  • records (#to_a)

    The object/objects that will be updated or deleted



52
53
54
55
56
57
58
59
60
# File 'lib/redstream/producer.rb', line 52

def bulk(records)
  records_array = Array(records)

  bulk_delay(records_array)

  yield

  bulk_queue(records_array)
end

#bulk_delay(records) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes delay messages to a delay stream in redis.

Parameters:

  • records (#to_a)

    The object/objects that will be updated or deleted

Returns:

  • The redis message ids



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/redstream/producer.rb', line 70

def bulk_delay(records)
  records.each_slice(250) do |slice|
    Redstream.connection_pool.with do |redis|
      redis.pipelined do |pipeline|
        slice.each do |object|
          pipeline.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) })
        end
      end
    end
  end

  Redstream.connection_pool.with do |redis|
    redis.wait(@wait, 0) if @wait
  end

  true
end

#bulk_queue(records) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes messages to a stream in redis for immediate retrieval.

Parameters:

  • records (#to_a)

    The object/objects that will be updated deleted



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/redstream/producer.rb', line 94

def bulk_queue(records)
  records.each_slice(250) do |slice|
    Redstream.connection_pool.with do |redis|
      redis.pipelined do |pipeline|
        slice.each do |object|
          pipeline.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) })
        end
      end
    end
  end

  true
end

#delay(object) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes a single delay message to a delay stream in redis.

Parameters:

  • object

    The object that will be updated, deleted, etc.

Returns:

  • The redis message id



116
117
118
119
120
121
122
# File 'lib/redstream/producer.rb', line 116

def delay(object)
  Redstream.connection_pool.with do |redis|
    res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) })
    redis.wait(@wait, 0) if @wait
    res
  end
end

#queue(object) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes a single message to a stream in redis for immediate retrieval.

Parameters:

  • object

    The object hat will be updated, deleted, etc.



130
131
132
133
134
135
136
# File 'lib/redstream/producer.rb', line 130

def queue(object)
  Redstream.connection_pool.with do |redis|
    redis.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) })
  end

  true
end