Class: TopicTrackingState

Inherits:
Object
  • Object
show all
Includes:
ActiveModel::SerializerSupport, TopicTrackingStatePublishable
Defined in:
app/models/topic_tracking_state.rb

Overview

This class is used to mirror unread and new status back to end users in JavaScript there is a mirror class that is kept in-sync using MessageBus the allows end users to always know which topics have unread posts in them and which topics are new. This is used in various places in the UI, such as counters, indicators, and messages at the top of topic lists, so the user knows there is something worth reading at a glance.

The TopicTrackingState.report data is preloaded in ApplicationController for the current user under the topicTrackingStates key, and the existing state is loaded into memory on page load. From there the MessageBus is used to keep topic state up to date, as well as syncing with topics from corresponding lists fetched from the server (e.g. the /new, /latest, /unread topic lists).

See discourse/app/models/topic-tracking-state.js

Constant Summary collapse

UNREAD_MESSAGE_TYPE =
"unread"
LATEST_MESSAGE_TYPE =
"latest"
MUTED_MESSAGE_TYPE =
"muted"
UNMUTED_MESSAGE_TYPE =
"unmuted"
NEW_TOPIC_MESSAGE_TYPE =
"new_topic"
RECOVER_MESSAGE_TYPE =
"recover"
DELETE_MESSAGE_TYPE =
"delete"
DESTROY_MESSAGE_TYPE =
"destroy"
READ_MESSAGE_TYPE =
"read"
DISMISS_NEW_MESSAGE_TYPE =
"dismiss_new"
DISMISS_NEW_POSTS_MESSAGE_TYPE =
"dismiss_new_posts"
MAX_TOPICS =
5000
NEW_MESSAGE_BUS_CHANNEL =
"/new"
LATEST_MESSAGE_BUS_CHANNEL =
"/latest"
UNREAD_MESSAGE_BUS_CHANNEL =
"/unread"
RECOVER_MESSAGE_BUS_CHANNEL =
"/recover"
DELETE_MESSAGE_BUS_CHANNEL =
"/delete"
DESTROY_MESSAGE_BUS_CHANNEL =
"/destroy"

Class Method Summary collapse

Class Method Details

.highest_post_number_column_select(whisperer) ⇒ Object



507
508
509
# File 'app/models/topic_tracking_state.rb', line 507

def self.highest_post_number_column_select(whisperer)
  "#{whisperer ? "topics.highest_staff_post_number AS highest_post_number" : "topics.highest_post_number"}"
end

.include_tags_in_report=(v) ⇒ Object



282
283
284
# File 'app/models/topic_tracking_state.rb', line 282

def self.include_tags_in_report=(v)
  @include_tags_in_report = v
end

.include_tags_in_report?Boolean

Returns:

  • (Boolean)


278
279
280
# File 'app/models/topic_tracking_state.rb', line 278

def self.include_tags_in_report?
  SiteSetting.tagging_enabled && (@include_tags_in_report || !SiteSetting.legacy_navigation_menu?)
end

.muted_tag_ids(user) ⇒ Object



358
359
360
# File 'app/models/topic_tracking_state.rb', line 358

def self.muted_tag_ids(user)
  TagUser.lookup(user, :muted).pluck(:tag_id)
end

.new_and_unread_sql(topic_id, user, tag_ids) ⇒ Object



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'app/models/topic_tracking_state.rb', line 314

def self.new_and_unread_sql(topic_id, user, tag_ids)
  sql =
    report_raw_sql(
      topic_id: topic_id,
      skip_unread: true,
      skip_order: true,
      staff: user.staff?,
      admin: user.admin?,
      whisperer: user.whisperer?,
      user: user,
      muted_tag_ids: tag_ids,
    )

  sql << "\nUNION ALL\n\n"

  sql << report_raw_sql(
    topic_id: topic_id,
    skip_new: true,
    skip_order: true,
    staff: user.staff?,
    filter_old_unread: true,
    admin: user.admin?,
    whisperer: user.whisperer?,
    user: user,
    muted_tag_ids: tag_ids,
  )
end

.new_filter_sqlObject



240
241
242
243
244
245
246
247
# File 'app/models/topic_tracking_state.rb', line 240

def self.new_filter_sql
  TopicQuery
    .new_filter(Topic, treat_as_new_topic_clause_sql: treat_as_new_topic_clause)
    .where_clause
    .ast
    .to_sql + " AND topics.created_at > :min_new_topic_date" +
    " AND dismissed_topic_users.id IS NULL"
end

.publish_delete(topic) ⇒ Object



199
200
201
202
203
204
205
206
207
# File 'app/models/topic_tracking_state.rb', line 199

def self.publish_delete(topic)
  return unless topic.regular?

  group_ids = secure_category_group_ids(topic)

  message = { topic_id: topic.id, message_type: DELETE_MESSAGE_TYPE }

  MessageBus.publish("/delete", message.as_json, group_ids: group_ids)
end

.publish_destroy(topic) ⇒ Object



209
210
211
212
213
214
215
216
217
# File 'app/models/topic_tracking_state.rb', line 209

def self.publish_destroy(topic)
  return unless topic.regular?

  group_ids = secure_category_group_ids(topic)

  message = { topic_id: topic.id, message_type: DESTROY_MESSAGE_TYPE }

  MessageBus.publish(DESTROY_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end

.publish_dismiss_new(user_id, topic_ids: []) ⇒ Object



230
231
232
233
# File 'app/models/topic_tracking_state.rb', line 230

def self.publish_dismiss_new(user_id, topic_ids: [])
  message = { message_type: DISMISS_NEW_MESSAGE_TYPE, payload: { topic_ids: topic_ids } }
  MessageBus.publish(self.unread_channel_key(user_id), message.as_json, user_ids: [user_id])
end

.publish_dismiss_new_posts(user_id, topic_ids: []) ⇒ Object



235
236
237
238
# File 'app/models/topic_tracking_state.rb', line 235

def self.publish_dismiss_new_posts(user_id, topic_ids: [])
  message = { message_type: DISMISS_NEW_POSTS_MESSAGE_TYPE, payload: { topic_ids: topic_ids } }
  MessageBus.publish(self.unread_channel_key(user_id), message.as_json, user_ids: [user_id])
end

.publish_latest(topic, whisper = false) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'app/models/topic_tracking_state.rb', line 70

def self.publish_latest(topic, whisper = false)
  return unless topic.regular?

  tag_ids, tags = nil
  tag_ids, tags = topic.tags.pluck(:id, :name).transpose if SiteSetting.tagging_enabled

  message = {
    topic_id: topic.id,
    message_type: LATEST_MESSAGE_TYPE,
    payload: {
      bumped_at: topic.bumped_at,
      category_id: topic.category_id,
      archetype: topic.archetype,
    },
  }

  if tags
    message[:payload][:tags] = tags
    message[:payload][:topic_tag_ids] = tag_ids
  end

  group_ids =
    if whisper
      [Group::AUTO_GROUPS[:staff], *SiteSetting.whispers_allowed_group_ids]
    else
      secure_category_group_ids(topic)
    end
  MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end

.publish_muted(topic) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'app/models/topic_tracking_state.rb', line 104

def self.publish_muted(topic)
  return unless topic.regular?

  user_ids =
    topic
      .topic_users
      .where(notification_level: NotificationLevels.all[:muted])
      .joins(:user)
      .where("users.last_seen_at > ?", 7.days.ago)
      .order("users.last_seen_at DESC")
      .limit(100)
      .pluck(:user_id)
  return if user_ids.blank?

  message = { topic_id: topic.id, message_type: MUTED_MESSAGE_TYPE }

  MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end

.publish_new(topic) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'app/models/topic_tracking_state.rb', line 42

def self.publish_new(topic)
  return unless topic.regular?

  tag_ids, tags = nil
  tag_ids, tags = topic.tags.pluck(:id, :name).transpose if SiteSetting.tagging_enabled

  payload = {
    last_read_post_number: nil,
    highest_post_number: 1,
    created_at: topic.created_at,
    category_id: topic.category_id,
    archetype: topic.archetype,
    created_in_new_period: true,
  }

  if tags
    payload[:tags] = tags
    payload[:topic_tag_ids] = tag_ids
  end

  message = { topic_id: topic.id, message_type: NEW_TOPIC_MESSAGE_TYPE, payload: payload }

  group_ids = secure_category_group_ids(topic)

  MessageBus.publish(NEW_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
  publish_read(topic.id, 1, topic.user)
end

.publish_read(topic_id, last_read_post_number, user, notification_level = nil) ⇒ Object



219
220
221
222
223
224
225
226
227
228
# File 'app/models/topic_tracking_state.rb', line 219

def self.publish_read(topic_id, last_read_post_number, user, notification_level = nil)
  self.publish_read_message(
    message_type: READ_MESSAGE_TYPE,
    channel_name: self.unread_channel_key(user.id),
    topic_id: topic_id,
    user: user,
    last_read_post_number: last_read_post_number,
    notification_level: notification_level,
  )
end

.publish_read_indicator_on_read(topic_id, last_read_post_number, user_id) ⇒ Object



524
525
526
527
528
529
530
531
532
533
534
535
536
537
# File 'app/models/topic_tracking_state.rb', line 524

def self.publish_read_indicator_on_read(topic_id, last_read_post_number, user_id)
  topic =
    Topic
      .includes(:allowed_groups)
      .select(:highest_post_number, :archetype, :id)
      .find_by(id: topic_id)

  if topic&.private_message?
    groups = read_allowed_groups_of(topic)
    post = Post.find_by(topic_id: topic.id, post_number: last_read_post_number)
    trigger_post_read_count_update(post, groups, last_read_post_number, user_id)
    update_topic_list_read_indicator(topic, groups, last_read_post_number, user_id, false)
  end
end

.publish_read_indicator_on_write(topic_id, last_read_post_number, user_id) ⇒ Object



511
512
513
514
515
516
517
518
519
520
521
522
# File 'app/models/topic_tracking_state.rb', line 511

def self.publish_read_indicator_on_write(topic_id, last_read_post_number, user_id)
  topic =
    Topic
      .includes(:allowed_groups)
      .select(:highest_post_number, :archetype, :id)
      .find_by(id: topic_id)

  if topic&.private_message?
    groups = read_allowed_groups_of(topic)
    update_topic_list_read_indicator(topic, groups, topic.highest_post_number, user_id, true)
  end
end

.publish_recover(topic) ⇒ Object



189
190
191
192
193
194
195
196
197
# File 'app/models/topic_tracking_state.rb', line 189

def self.publish_recover(topic)
  return unless topic.regular?

  group_ids = secure_category_group_ids(topic)

  message = { topic_id: topic.id, message_type: RECOVER_MESSAGE_TYPE }

  MessageBus.publish(RECOVER_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end

.publish_unmuted(topic) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'app/models/topic_tracking_state.rb', line 123

def self.publish_unmuted(topic)
  return unless topic.regular?

  user_ids =
    User
      .watching_topic(topic)
      .where("users.last_seen_at > ?", 7.days.ago)
      .order("users.last_seen_at DESC")
      .limit(100)
      .pluck(:id)
  return if user_ids.blank?

  message = { topic_id: topic.id, message_type: UNMUTED_MESSAGE_TYPE }

  MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end

.publish_unread(post) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'app/models/topic_tracking_state.rb', line 140

def self.publish_unread(post)
  return unless post.topic.regular?
  # TODO at high scale we are going to have to defer this,
  #   perhaps cut down to users that are around in the last 7 days as well
  tags = nil
  tag_ids = nil
  tag_ids, tags = post.topic.tags.pluck(:id, :name).transpose if include_tags_in_report?

  # We don't need to publish unread to the person who just made the post,
  # this is why they are excluded from the initial scope.
  scope =
    TopicUser.tracking(post.topic_id).includes(user: :user_stat).where.not(user_id: post.user_id)

  group_ids =
    if post.post_type == Post.types[:whisper]
      [Group::AUTO_GROUPS[:staff], *SiteSetting.whispers_allowed_group_ids]
    else
      post.topic.category && post.topic.category.secure_group_ids
    end

  if group_ids.present?
    scope =
      scope.joins("INNER JOIN group_users gu ON gu.user_id = topic_users.user_id").where(
        "gu.group_id IN (?)",
        group_ids,
      )
  end

  user_ids = scope.pluck(:user_id)
  return if user_ids.empty?

  payload = {
    highest_post_number: post.post_number,
    updated_at: post.topic.updated_at,
    created_at: post.created_at,
    category_id: post.topic.category_id,
    archetype: post.topic.archetype,
  }

  if tags
    payload[:tags] = tags
    payload[:topic_tag_ids] = tag_ids
  end

  message = { topic_id: post.topic_id, message_type: UNREAD_MESSAGE_TYPE, payload: payload }

  MessageBus.publish(UNREAD_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end

.read_allowed_groups_of(topic) ⇒ Object



539
540
541
542
543
544
545
546
# File 'app/models/topic_tracking_state.rb', line 539

def self.read_allowed_groups_of(topic)
  topic
    .allowed_groups
    .joins(:group_users)
    .where(publish_read_state: true)
    .select("ARRAY_AGG(group_users.user_id) AS members", :name, :id)
    .group("groups.id")
end

.report(user, topic_id = nil) ⇒ Object

Sam: this is a hairy report, in particular I need custom joins and fancy conditions

Dropping to sql_builder so I can make sense of it.

Keep in mind, we need to be able to filter on a GROUP of users, and zero in on topic

all our existing scope work does not do this

This code needs to be VERY efficient as it is triggered via the message bus and may steal

cycles from usual requests


294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'app/models/topic_tracking_state.rb', line 294

def self.report(user, topic_id = nil)
  tag_ids = muted_tag_ids(user)
  sql = new_and_unread_sql(topic_id, user, tag_ids)
  sql = tags_included_wrapped_sql(sql)

  report =
    DB.query(
      sql + "\n\n LIMIT :max_topics",
      {
        user_id: user.id,
        topic_id: topic_id,
        min_new_topic_date: Time.at(SiteSetting.min_new_topics_time).to_datetime,
        max_topics: TopicTrackingState::MAX_TOPICS,
        user_first_unread_at: user.user_stat.first_unread_at,
      }.merge(treat_as_new_topic_params),
    )

  report
end

.report_raw_sql(user:, muted_tag_ids:, topic_id: nil, filter_old_unread: false, skip_new: false, skip_unread: false, skip_order: false, staff: false, admin: false, whisperer: false, select: nil, custom_state_filter: nil, additional_join_sql: nil) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
# File 'app/models/topic_tracking_state.rb', line 362

def self.report_raw_sql(
  user:,
  muted_tag_ids:,
  topic_id: nil,
  filter_old_unread: false,
  skip_new: false,
  skip_unread: false,
  skip_order: false,
  staff: false,
  admin: false,
  whisperer: false,
  select: nil,
  custom_state_filter: nil,
  additional_join_sql: nil
)
  unread =
    if skip_unread
      "1=0"
    else
      unread_filter_sql(whisperer: whisperer)
    end

  filter_old_unread_sql =
    if filter_old_unread
      " topics.updated_at >= :user_first_unread_at AND "
    else
      ""
    end

  new =
    if skip_new
      "1=0"
    else
      new_filter_sql
    end

  category_topic_id_column_select =
    if SiteSetting.show_category_definitions_in_topic_lists
      ""
    else
      "c.topic_id AS category_topic_id,"
    end

  select_sql =
    select ||
      "
         DISTINCT topics.id as topic_id,
         u.id as user_id,
         topics.created_at,
         topics.updated_at,
         #{highest_post_number_column_select(whisperer)},
         last_read_post_number,
         c.id as category_id,
         #{category_topic_id_column_select}
         tu.notification_level,
         GREATEST(
            CASE
            WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :always THEN u.created_at
            WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :last_visit THEN COALESCE(
              u.previous_visit_at,u.created_at
            )
            ELSE (:now::timestamp - INTERVAL '1 MINUTE' * COALESCE(uo.new_topic_duration_minutes, :default_duration))
            END, u.created_at, :min_date
         ) AS treat_as_new_topic_start_date"

  category_filter =
    if admin
      ""
    else
      append = "OR u.admin" if !admin
      <<~SQL
        (
         NOT c.read_restricted #{append} OR c.id IN (
            SELECT c2.id FROM categories c2
            JOIN category_groups cg ON cg.category_id = c2.id
            JOIN group_users gu ON gu.user_id = :user_id AND cg.group_id = gu.group_id
            WHERE c2.read_restricted )
        ) AND
      SQL
    end

  visibility_filter =
    if staff
      ""
    else
      append = "OR u.admin OR u.moderator" if !staff
      "(topics.visible #{append}) AND"
    end

  tags_filter = ""

  if muted_tag_ids.present? &&
       %w[always only_muted].include?(SiteSetting.remove_muted_tags_from_latest)
    existing_tags_sql =
      "(select array_agg(tag_id) from topic_tags where topic_tags.topic_id = topics.id)"
    muted_tags_array_sql = "ARRAY[#{muted_tag_ids.join(",")}]"

    if SiteSetting.remove_muted_tags_from_latest == "always"
      tags_filter = <<~SQL
        NOT (
          COALESCE(#{existing_tags_sql}, ARRAY[]::int[]) && #{muted_tags_array_sql}
        ) AND
      SQL
    else # only muted
      tags_filter = <<~SQL
        NOT (
          COALESCE(#{existing_tags_sql}, ARRAY[-999]) <@ #{muted_tags_array_sql}
        ) AND
      SQL
    end
  end

  sql = +<<~SQL
    SELECT #{select_sql}
    FROM topics
    JOIN users u on u.id = :user_id
    JOIN user_options AS uo ON uo.user_id = u.id
    JOIN categories c ON c.id = topics.category_id
    LEFT JOIN topic_users tu ON tu.topic_id = topics.id AND tu.user_id = u.id
    #{skip_new ? "" : "LEFT JOIN dismissed_topic_users ON dismissed_topic_users.topic_id = topics.id AND dismissed_topic_users.user_id = :user_id"}
    #{additional_join_sql}
    WHERE u.id = :user_id AND
          #{filter_old_unread_sql}
          topics.archetype <> 'private_message' AND
          #{custom_state_filter ? custom_state_filter : "((#{unread}) OR (#{new})) AND"}
          #{visibility_filter}
          #{tags_filter}
          topics.deleted_at IS NULL AND
          #{category_filter}
          NOT (
            #{(skip_new && skip_unread) ? "" : "last_read_post_number IS NULL AND"}
            (
              topics.category_id IN (#{CategoryUser.muted_category_ids_query(user, include_direct: true).select("categories.id").to_sql})
              AND tu.notification_level <= #{TopicUser.notification_levels[:regular]}
            )
          )
  SQL

  sql << " AND topics.id = :topic_id" if topic_id

  sql << " ORDER BY topics.bumped_at DESC" unless skip_order

  sql
end

.tags_included_wrapped_sql(sql) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'app/models/topic_tracking_state.rb', line 342

def self.tags_included_wrapped_sql(sql)
  return <<~SQL if SiteSetting.tagging_enabled && TopicTrackingState.include_tags_in_report?
      WITH tags_included_cte AS (
        #{sql}
      )
      SELECT *, (
        SELECT ARRAY_AGG(name) from topic_tags
           JOIN tags on tags.id = topic_tags.tag_id
           WHERE topic_id = tags_included_cte.topic_id
        ) tags
      FROM tags_included_cte
    SQL

  sql
end

.treat_as_new_topic_clauseObject



253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'app/models/topic_tracking_state.rb', line 253

def self.treat_as_new_topic_clause
  User
    .where(
      "GREATEST(CASE
                WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :always THEN u.created_at
                WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :last_visit THEN COALESCE(u.previous_visit_at,u.created_at)
                ELSE (:now::timestamp - INTERVAL '1 MINUTE' * COALESCE(uo.new_topic_duration_minutes, :default_duration))
             END, u.created_at, :min_date)",
      treat_as_new_topic_params,
    )
    .where_clause
    .ast
    .to_sql
end

.treat_as_new_topic_paramsObject



268
269
270
271
272
273
274
275
276
# File 'app/models/topic_tracking_state.rb', line 268

def self.treat_as_new_topic_params
  {
    now: DateTime.now,
    last_visit: User::NewTopicDuration::LAST_VISIT,
    always: User::NewTopicDuration::ALWAYS,
    default_duration: SiteSetting.default_other_new_topic_duration_minutes,
    min_date: Time.at(SiteSetting.min_new_topics_time).to_datetime,
  }
end

.trigger_post_read_count_update(post, groups, last_read_post_number, user_id) ⇒ Object



577
578
579
580
581
582
# File 'app/models/topic_tracking_state.rb', line 577

def self.trigger_post_read_count_update(post, groups, last_read_post_number, user_id)
  return if !post
  return if groups.empty?
  opts = { readers_count: post.readers_count, reader_id: user_id }
  post.publish_change_to_clients!(:read, opts)
end

.unread_channel_key(user_id) ⇒ Object



100
101
102
# File 'app/models/topic_tracking_state.rb', line 100

def self.unread_channel_key(user_id)
  "/unread/#{user_id}"
end

.unread_filter_sql(whisperer: false) ⇒ Object



249
250
251
# File 'app/models/topic_tracking_state.rb', line 249

def self.unread_filter_sql(whisperer: false)
  TopicQuery.unread_filter(Topic, whisperer: whisperer).where_clause.ast.to_sql
end

.update_topic_list_read_indicator(topic, groups, last_read_post_number, user_id, write_event) ⇒ Object



548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'app/models/topic_tracking_state.rb', line 548

def self.update_topic_list_read_indicator(
  topic,
  groups,
  last_read_post_number,
  user_id,
  write_event
)
  return unless last_read_post_number == topic.highest_post_number
  message = { topic_id: topic.id, show_indicator: write_event }.as_json
  groups_to_update = []

  groups.each do |group|
    member = group.members.include?(user_id)

    member_writing = (write_event && member)
    non_member_reading = (!write_event && !member)
    next if non_member_reading || member_writing

    groups_to_update << group
  end

  return if groups_to_update.empty?
  MessageBus.publish(
    "/private-messages/unread-indicator/#{topic.id}",
    message,
    user_ids: groups_to_update.flat_map(&:members),
  )
end