Class: Teakflake::StaticWorkerId

Inherits:
Object
  • Object
show all
Includes:
LogsForMyFamily::LocalLogger
Defined in:
lib/teakflake/worker_id_sources/static_worker_id.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zookeeper, datacenter_id, worker_id, addr, clock, worker_id_zk_path: '/teakflake-servers') ⇒ StaticWorkerId

Returns a new instance of StaticWorkerId.



26
27
28
29
30
31
32
33
34
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 26

def initialize(zookeeper, datacenter_id, worker_id, addr, clock, worker_id_zk_path: '/teakflake-servers')
  @zookeeper = zookeeper
  @datacenter_id = datacenter_id
  @worker_id = worker_id
  @worker_id_zk_path = worker_id_zk_path
  @addr = addr
  @clock = clock
  @id_registered = false
end

Instance Attribute Details

#clockObject (readonly)

Returns the value of attribute clock.



24
25
26
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 24

def clock
  @clock
end

#datacenter_idObject (readonly)

Returns the value of attribute datacenter_id.



24
25
26
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 24

def datacenter_id
  @datacenter_id
end

Instance Method Details

#assert(_time) ⇒ Object



60
61
62
63
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 60

def assert(_time)
  raise 'worker_id not registered' unless @id_registered
  @worker_id
end

#register_worker_idObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 36

def register_worker_id
  logger.info(:claiming_worker_id, id: @worker_id)
  @zookeeper.mkdir_p(@worker_id_zk_path)
  tries = 0
  begin
    @zookeeper.create(
      "#{@worker_id_zk_path}/#{@worker_id}", @addr,
      mode: :ephemeral
    )
  rescue ZK::Exceptions::NodeExists
    if tries < 2
      logger.notice(:fail_attempt_claim_worker_id, id: @worker_id, tries: tries)
      tries += 1
      ::Kernel.sleep 1
      retry
    else
      logger.error(:fail_claim_worker_id, id: @worker_id)
      raise
    end
  end
  @id_registered = true
  logger.info(:claimed_worker_id, id: @worker_id)
end

#sanity_check_peersObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 65

def sanity_check_peers
  timestamps = peers.each_with_object([]) do |(worker_id, uri), timestamps|
    next if uri == @addr
    uri = URI(uri)
    uri.path = '/id'

    id = Teakflake::Id.new(get_id(uri))

    if id.worker_id != worker_id
      logger.error(:worker_id_insanity, expected: worker_id, got: id.worker_id, peer: uri)
      raise 'worker id insanity'
    end

    if id.datacenter_id != @datacenter_id
      logger.error(:datacenter_id_insanity, expected: @datacenter_id, got: id.datacenter_id, peer: uri)
      raise 'datacenter id insanity'
    end
    timestamps << id.timestamp
  end

  if !timestamps.empty?
    avg = timestamps.inject(:+) / timestamps.length.to_f
    our_time = @clock.millis
    if (our_time - avg).abs > 10_000
      logger.error(:timestamp_insanity, avg: avg, our_time: our_time)
      raise 'timestamp insanity'
    end
  end
end