Class: Kcl::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/kcl/worker.rb

Constant Summary collapse

PROCESS_INTERVAL =

by sec

1

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, record_processor_factory) ⇒ Worker

Returns a new instance of Worker.



11
12
13
14
15
16
17
18
19
# File 'lib/kcl/worker.rb', line 11

def initialize(id, record_processor_factory)
  @id = id
  @record_processor_factory = record_processor_factory
  @live_shards  = {} # Map<String, Boolean>
  @shards       = {} # Map<String, Kcl::Workers::ShardInfo>
  @kinesis      = nil # Kcl::Proxies::KinesisProxy
  @checkpointer = nil # Kcl::Checkpointer
  @timer        = nil
end

Class Method Details

.run(id, record_processor_factory) ⇒ Object



6
7
8
9
# File 'lib/kcl/worker.rb', line 6

def self.run(id, record_processor_factory)
  worker = self.new(id, record_processor_factory)
  worker.start
end

Instance Method Details

#available_lease_shard?Boolean

Count the number of leases hold by worker excluding the processed shard

Returns:

  • (Boolean)


91
92
93
94
95
96
# File 'lib/kcl/worker.rb', line 91

def available_lease_shard?
  leased_count = @shards.values.inject(0) do |num, shard|
    shard.lease_owner == @id && !shard.completed? ? num + 1 : num
  end
  Kcl.config.max_lease_count > leased_count
end

#cleanupObject

Cleanup resources



57
58
59
60
61
62
# File 'lib/kcl/worker.rb', line 57

def cleanup
  @live_shards  = {}
  @shards       = {}
  @kinesis      = nil
  @checkpointer = nil
end

#consume_shards!Object

Process records by shard



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/kcl/worker.rb', line 99

def consume_shards!
  threads = []
  @shards.each do |shard_id, shard|
    # already owner of the shard
    next if shard.lease_owner == @id

    begin
      shard = checkpointer.fetch_checkpoint(shard)
    rescue Kcl::Errors::CheckpointNotFoundError
      Kcl.logger.info("Not found checkpoint of shard at #{shard.to_h}")
      next
    end
    # shard is closed and processed all records
    next if shard.completed?

    shard = checkpointer.lease(shard, @id)

    threads << Thread.new do
      begin
        consumer = Kcl::Workers::Consumer.new(
          shard,
          @record_processor_factory.create_processor,
          kinesis,
          checkpointer
        )
        consumer.consume!
      ensure
        shard = checkpointer.remove_lease_owner(shard)
        Kcl.logger.info("Finish to consume shard at shard_id: #{shard_id}")
      end
    end
  end
  threads.each(&:join)
end

#shutdown(signal = :NONE) ⇒ Object

Shutdown gracefully



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kcl/worker.rb', line 43

def shutdown(signal = :NONE)
  unless @timer.nil?
    @timer.cancel
    @timer = nil
  end
  EM.stop

  Kcl.logger.info("Shutdown worker with signal #{signal} at #{object_id}")
rescue => e
  Kcl.logger.error("#{e.class}: #{e.message}")
  raise e
end

#startObject

Start consuming data from the stream, and pass it to the application record processors.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/kcl/worker.rb', line 23

def start
  Kcl.logger.info("Start worker at #{object_id}")

  EM.run do
    trap_signals

    @timer = EM::PeriodicTimer.new(PROCESS_INTERVAL) do
      sync_shards!
      consume_shards! if available_lease_shard?
    end
  end

  cleanup
  Kcl.logger.info("Finish worker at #{object_id}")
rescue => e
  Kcl.logger.error("#{e.class}: #{e.message}")
  raise e
end

#sync_shards!Object

Add new shards and delete unused shards



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/kcl/worker.rb', line 65

def sync_shards!
  @live_shards.transform_values! { |_| false }

  kinesis.shards.each do |shard|
    @live_shards[shard.shard_id] = true
    next if @shards[shard.shard_id]
    @shards[shard.shard_id] = Kcl::Workers::ShardInfo.new(
      shard.shard_id,
      shard.parent_shard_id,
      shard.sequence_number_range
    )
    Kcl.logger.info("Found new shard at shard_id: #{shard.shard_id}")
  end

  @live_shards.each do |shard_id, alive|
    next if alive
    checkpointer.remove_lease(@shards[shard_id])
    @shards.delete(shard_id)
    Kcl.logger.info("Remove shard at shard_id: #{shard_id}")
  end

  @shards
end