Class: Karafka::Routing::Builder

Inherits:
Array
  • Object
show all
Defined in:
lib/karafka/routing/builder.rb

Overview

Note:

We lock the access just in case this is used in patterns. The locks here do not have any impact on routing usage unless being expanded, so no race conditions risks.

Builder used as a DSL layer for building consumers and telling them which topics to consume

Examples:

Build a simple (most common) route

consumers do
  topic :new_videos do
    consumer NewVideosConsumer
  end
end

Instance Method Summary collapse

Constructor Details

#initializeBuilder

Returns a new instance of Builder.



26
27
28
29
30
31
# File 'lib/karafka/routing/builder.rb', line 26

def initialize
  @mutex = Mutex.new
  @draws = []
  @defaults = EMPTY_DEFAULTS
  super
end

Instance Method Details

#activeArray<Karafka::Routing::ConsumerGroup>

Returns only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context.

Returns:

  • (Array<Karafka::Routing::ConsumerGroup>)

    only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context



87
88
89
# File 'lib/karafka/routing/builder.rb', line 87

def active
  select(&:active?)
end

#clearObject

Clears the builder and the draws memory



92
93
94
95
96
97
98
# File 'lib/karafka/routing/builder.rb', line 92

def clear
  @mutex.synchronize do
    @defaults = EMPTY_DEFAULTS
    @draws.clear
    array_clear
  end
end

#defaults(&block) ⇒ Proc

Returns defaults that should be evaluated per topic.

Parameters:

  • block (Proc)

    block with per-topic evaluated defaults

Returns:

  • (Proc)

    defaults that should be evaluated per topic



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/karafka/routing/builder.rb', line 102

def defaults(&block)
  return @defaults unless block

  if @mutex.owned?
    @defaults = block
  else
    @mutex.synchronize do
      @defaults = block
    end
  end
end

#draw(&block) { ... } ⇒ Object

Note:

After it is done drawing it will store and validate all the routes to make sure that they are correct and that there are no topic/group duplications (this is forbidden)

Used to draw routes for Karafka

Examples:

draw do
  topic :xyz do
  end
end

Parameters:

  • block (Proc)

    block we will evaluate within the builder context

Yields:

  • Evaluates provided block in a builder context so we can describe routes

Raises:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/routing/builder.rb', line 45

def draw(&block)
  @mutex.synchronize do
    @draws << block

    instance_eval(&block)

    # Ensures high-level routing details consistency
    # Contains checks that require knowledge about all the consumer groups to operate
    Contracts::Routing.new.validate!(map(&:to_h))

    each do |consumer_group|
      # Validate consumer group settings
      Contracts::ConsumerGroup.new.validate!(consumer_group.to_h)

      # and then its topics settings
      consumer_group.topics.each do |topic|
        Contracts::Topic.new.validate!(topic.to_h)
      end

      # Initialize subscription groups after all the routing is done
      consumer_group.subscription_groups
    end
  end
end

#redraw(&block) ⇒ Object

Clear routes and draw them again with the given block. Helpful for testing purposes.

Parameters:

  • block (Proc)

    block we will evaluate within the builder context



76
77
78
79
80
81
82
# File 'lib/karafka/routing/builder.rb', line 76

def redraw(&block)
  @mutex.synchronize do
    @draws.clear
    array_clear
  end
  draw(&block)
end