Class: Etcd::Observer

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/etcd/observer.rb

Instance Method Summary collapse

Methods included from Loggable

#logger, #reset_logger!

Constructor Details

#initialize(client, prefix, handler) ⇒ Observer

Returns a new instance of Observer.



5
6
7
8
9
10
11
# File 'lib/etcd/observer.rb', line 5

def initialize(client, prefix, handler)
  @client  = client
  @prefix  = prefix
  @handler = handler
  @index   = nil
  reset_logger!(Logger::DEBUG)
end

Instance Method Details

#call_handler_in_needed(value, key, info) ⇒ Object

etcd has a bug: after restart watches with index fire __sometimes__ with all the previous values workaround:

- execute @handler only if info[:index] had higher value than the last index


32
33
34
35
36
37
38
39
40
41
42
# File 'lib/etcd/observer.rb', line 32

def call_handler_in_needed(value, key, info)
  if info[:index] && @index.to_i <= info[:index]
    # next time start watching from next index
    @index = info[:index] + 1
    logger.debug "index for #{@prefix} ----  #{@index} "
    @handler.call(value, key, info)
  # first-time-fire
  elsif @index == nil
    @handler.call(value, key, info)
  end
end

#cancelObject



44
45
46
47
# File 'lib/etcd/observer.rb', line 44

def cancel
  @running = false
  self
end

#joinObject



56
57
58
59
# File 'lib/etcd/observer.rb', line 56

def join
  @thread.join
  self
end

#pp_statusObject



65
66
67
# File 'lib/etcd/observer.rb', line 65

def pp_status
  "#{@prefix}: #{pp_thread_status}"
end

#pp_thread_statusObject



69
70
71
72
73
74
# File 'lib/etcd/observer.rb', line 69

def pp_thread_status
  st = @thread.status
  st = 'dead by exception'   if st == nil
  st = 'dead by termination' if st == false
  st
end

#rerunObject



49
50
51
52
53
54
# File 'lib/etcd/observer.rb', line 49

def rerun
  logger.debug "rerun for #{@prefix}"
  @thread.terminate if @thread.alive?
  logger.debug "after termination for #{@prefix}"
  run
end

#runObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/etcd/observer.rb', line 13

def run
  @running = true
  @thread = Thread.start do
    while @running
      logger.debug "********* watching #{@prefix} with index #{@index}"
      @client.watch(@prefix, index: @index) do |value, key, info|
        if @running
          logger.debug "watch fired for #{@prefix} with #{info.inspect} "
          call_handler_in_needed(value, key, info)
        end
      end
    end
  end
  self
end

#statusObject



61
62
63
# File 'lib/etcd/observer.rb', line 61

def status
  @thread.status
end