Class: Karafka::Pro::Processing::SubscriptionGroupsCoordinator

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/processing/subscription_groups_coordinator.rb

Overview

Uses the jobs queue API to lock (pause) and unlock (resume) operations of a given subscription group. It is abstracted away from jobs queue on this layer because we do not want to introduce jobs queue as a concept to the consumers layer

Instance Method Summary collapse

Instance Method Details

#pause(subscription_group, lock_id = nil, **kwargs) ⇒ Object

Parameters:

  • subscription_group (Karafka::Routing::SubscriptionGroup)

    subscription group we want to pause

  • lock_id (Object) (defaults to: nil)

    key we want to use if we want to set multiple locks on the same subscription group

  • kwargs (Object)

    Any keyword arguments accepted by the jobs queue lock.



28
29
30
31
32
33
34
# File 'lib/karafka/pro/processing/subscription_groups_coordinator.rb', line 28

def pause(subscription_group, lock_id = nil, **kwargs)
  jobs_queue.lock_async(
    subscription_group.id,
    lock_id,
    **kwargs
  )
end

#resume(subscription_group, lock_id = nil) ⇒ Object

Parameters:



39
40
41
# File 'lib/karafka/pro/processing/subscription_groups_coordinator.rb', line 39

def resume(subscription_group, lock_id = nil)
  jobs_queue.unlock_async(subscription_group.id, lock_id)
end