Class: FFWD::FlushingOutput

Inherits:
Object
  • Object
show all
Includes:
Reporter
Defined in:
lib/ffwd/flushing_output.rb

Defined Under Namespace

Classes: Setup

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Reporter

build_meta, included, #increment, map_meta, #report!, #reporter_data

Constructor Details

#initialize(core, log, hook, config) ⇒ FlushingOutput

Returns a new instance of FlushingOutput.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/ffwd/flushing_output.rb', line 32

def initialize core, log, hook, config
  @log = log
  @flush_interval = config[:flush_interval]
  @buffer_limit = config[:buffer_limit]
  @hook = hook
  @reporter_meta = @hook.reporter_meta

  @buffer = []
  @pending = nil
  @c = nil

  @sub = nil

  core.starting do
    @log.info "Started"
    @log.info "  config: #{config}"

    @hook.connect

    @sub = core.output.metric_subscribe do |metric|
      if @buffer.size >= @buffer_limit
        increment :dropped_metrics, 1
        next
      end

      @buffer << metric
      check_timer!
    end
  end

  core.stopping do
    @log.info "Stopped"

    @hook.close

    if @sub
      @sub.unsubscribe
      @sub = nil
    end

    if @timer
      @timer.cancel
      @timer = nil
    end
  end
end

Instance Attribute Details

#logObject (readonly)

Returns the value of attribute log.



30
31
32
# File 'lib/ffwd/flushing_output.rb', line 30

def log
  @log
end

#reporter_metaObject (readonly)

Returns the value of attribute reporter_meta.



30
31
32
# File 'lib/ffwd/flushing_output.rb', line 30

def reporter_meta
  @reporter_meta
end

Instance Method Details

#check_timer!Object



119
120
121
122
123
124
125
126
127
# File 'lib/ffwd/flushing_output.rb', line 119

def check_timer!
  return if @timer

  @log.debug "Setting timer to #{@flush_interval}s"

  @timer = EM::Timer.new(@flush_interval) do
    flush!
  end
end

#flush!Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/ffwd/flushing_output.rb', line 79

def flush!
  if @timer
    @timer.cancel
    @timer = nil
  end

  if @pending
    @log.info "Request already in progress, dropping metrics"
    increment :dropped_metrics, @buffer.size
    @buffer.clear
    return
  end

  unless @hook.active?
    @log.error "Dropping metrics, no active connection available"
    increment :dropped_metrics, @buffer.size
    @buffer.clear
    return
  end

  buffer_size = @buffer.size

  @pending = @hook.send @buffer

  @pending.callback do
    increment :sent_metrics, buffer_size
    @pending = nil
  end

  @pending.errback do
    @log.error "Failed to submit metrics: #{@pending.error}"
    increment :failed_metrics, buffer_size
    @pending = nil
  end
rescue => e
  @log.error "Error during flush", e
ensure
  @buffer.clear
end