Class: Teakflake::StaticWorkerId
- Inherits:
-
Object
- Object
- Teakflake::StaticWorkerId
- Includes:
- LogsForMyFamily::LocalLogger
- Defined in:
- lib/teakflake/worker_id_sources/static_worker_id.rb
Instance Attribute Summary collapse
-
#clock ⇒ Object
readonly
Returns the value of attribute clock.
-
#datacenter_id ⇒ Object
readonly
Returns the value of attribute datacenter_id.
Instance Method Summary collapse
- #assert(_time) ⇒ Object
-
#initialize(zookeeper, datacenter_id, worker_id, addr, clock, worker_id_zk_path: '/teakflake-servers') ⇒ StaticWorkerId
constructor
A new instance of StaticWorkerId.
- #register_worker_id ⇒ Object
- #sanity_check_peers ⇒ Object
Constructor Details
#initialize(zookeeper, datacenter_id, worker_id, addr, clock, worker_id_zk_path: '/teakflake-servers') ⇒ StaticWorkerId
Returns a new instance of StaticWorkerId.
26 27 28 29 30 31 32 33 34 |
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 26 def initialize(zookeeper, datacenter_id, worker_id, addr, clock, worker_id_zk_path: '/teakflake-servers') @zookeeper = zookeeper @datacenter_id = datacenter_id @worker_id = worker_id @worker_id_zk_path = worker_id_zk_path @addr = addr @clock = clock @id_registered = false end |
Instance Attribute Details
#clock ⇒ Object (readonly)
Returns the value of attribute clock.
24 25 26 |
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 24 def clock @clock end |
#datacenter_id ⇒ Object (readonly)
Returns the value of attribute datacenter_id.
24 25 26 |
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 24 def datacenter_id @datacenter_id end |
Instance Method Details
#assert(_time) ⇒ Object
60 61 62 63 |
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 60 def assert(_time) raise 'worker_id not registered' unless @id_registered @worker_id end |
#register_worker_id ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 36 def register_worker_id logger.info(:claiming_worker_id, id: @worker_id) @zookeeper.mkdir_p(@worker_id_zk_path) tries = 0 begin @zookeeper.create( "#{@worker_id_zk_path}/#{@worker_id}", @addr, mode: :ephemeral ) rescue ZK::Exceptions::NodeExists if tries < 2 logger.notice(:fail_attempt_claim_worker_id, id: @worker_id, tries: tries) tries += 1 ::Kernel.sleep 1 retry else logger.error(:fail_claim_worker_id, id: @worker_id) raise end end @id_registered = true logger.info(:claimed_worker_id, id: @worker_id) end |
#sanity_check_peers ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/teakflake/worker_id_sources/static_worker_id.rb', line 65 def sanity_check_peers = peers.each_with_object([]) do |(worker_id, uri), | next if uri == @addr uri = URI(uri) uri.path = '/id' id = Teakflake::Id.new(get_id(uri)) if id.worker_id != worker_id logger.error(:worker_id_insanity, expected: worker_id, got: id.worker_id, peer: uri) raise 'worker id insanity' end if id.datacenter_id != @datacenter_id logger.error(:datacenter_id_insanity, expected: @datacenter_id, got: id.datacenter_id, peer: uri) raise 'datacenter id insanity' end << id. end if !.empty? avg = .inject(:+) / .length.to_f our_time = @clock.millis if (our_time - avg).abs > 10_000 logger.error(:timestamp_insanity, avg: avg, our_time: our_time) raise 'timestamp insanity' end end end |