Class: Franz::Input

Inherits:
Object
  • Object
show all
Defined in:
lib/franz/input.rb

Overview

File input for Franz. Really, the only input for Franz, so I hope you like it.

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Input

Start a new input in the background. We’ll generate a stream of events by watching the filesystem for changes (Franz::Discover and Franz::Watch), tailing files (Franz::Tail), and generating events (Franz::Agg)

Parameters:

  • opts (Hash) (defaults to: {})

    options for the aggregator

Options Hash (opts):

  • :input (Hash) — default: {}

    “input” configuration

  • :output (Queue) — default: []

    “output” queue

  • :checkpoint (Path) — default: {}

    path to checkpoint file

  • :checkpoint_interval (Integer) — default: {}

    seconds between checkpoints

  • :logger (Logger) — default: Logger.new(STDOUT)

    logger to use



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/franz/input.rb', line 25

def initialize opts={}
  opts = {
    checkpoint: 'franz.*.checkpoint',
    checkpoint_interval: 30,
    logger: Logger.new(STDOUT),
    output: [],
    input: {
      ignore_before: 0,
      discover_bound: 10_000,
      watch_bound: 1_000,
      tail_bound: 1_000,
      discover_interval: nil,
      watch_interval: nil,
      eviction_interval: nil,
      flush_interval: nil,
      buffer_limit: nil,
      line_limit: nil,
      read_limit: nil,
      play_catchup?: nil,
      configs: []
    }
  }.deep_merge!(opts)

  @logger = opts[:logger]

  @checkpoint_interval = opts[:checkpoint_interval]
  @checkpoint_path     = opts[:checkpoint].sub('*', '%d')
  @checkpoint_glob     = opts[:checkpoint]

  # The checkpoint contains a Marshalled Hash with a compact representation of
  # stateful inputs to various Franz streaming classes (e.g. the "known" option
  # to Franz::Discover). This state file is generated automatically every time
  # the input exits (see below) and also at regular intervals.
  checkpoints = Dir[@checkpoint_glob].sort_by { |path| File.mtime path }
  checkpoints = checkpoints.reject { |path| File.zero? path }
  last_checkpoint_path = checkpoints.pop
  state = nil
  unless last_checkpoint_path.nil?
    last_checkpoint = File.read(last_checkpoint_path)
    state = Marshal.load last_checkpoint
    log.info \
      event: 'input checkpoint loaded',
      checkpoint: last_checkpoint_path
  end

  full_state = state.nil? ? nil : state.dup

  state = state || {}
  known = state.keys
  stats, cursors, seqs = {}, {}, {}
  known.each do |path|
    cursor        = state[path].delete :cursor
    seq           = state[path].delete :seq
    cursors[path] = cursor unless cursor.nil?
    seqs[path]    = seq    unless seq.nil?
    stats[path]   = state[path]
  end

  discoveries  = possibly_bounded_queue opts[:input][:discover_bound]
  deletions    = possibly_bounded_queue opts[:input][:discover_bound]
  watch_events = possibly_bounded_queue opts[:input][:watch_bound]
  tail_events  = possibly_bounded_queue opts[:input][:tail_bound]

  @disover = Franz::Discover.new \
    discoveries: discoveries,
    deletions: deletions,
    configs: opts[:input][:configs],
    discover_interval: opts[:input][:discover_interval],
    ignore_before: opts[:input][:ignore_before],
    logger: opts[:logger],
    known: known,
    full_state: full_state

  @watch = Franz::Watch.new \
    discoveries: discoveries,
    deletions: deletions,
    watch_events: watch_events,
    watch_interval: opts[:input][:watch_interval],
    play_catchup?: opts[:input][:play_catchup?],
    logger: opts[:logger],
    stats: stats,
    full_state: full_state

  @tail = Franz::Tail.new \
    watch_events: watch_events,
    tail_events: tail_events,
    block_size: opts[:input][:block_size],
    line_limit: opts[:input][:line_limit],
    read_limit: opts[:input][:read_limit],
    logger: opts[:logger],
    cursors: cursors,
    full_state: full_state

  @agg = Franz::Agg.new \
    configs: opts[:input][:configs],
    tail_events: tail_events,
    agg_events: opts[:output],
    flush_interval: opts[:input][:flush_interval],
    buffer_limit: opts[:input][:buffer_limit],
    logger: opts[:logger],
    seqs: seqs,
    full_state: full_state

  @stop = false
  @t = Thread.new do
    until @stop
      checkpoint
      sleep @checkpoint_interval
    end
  end

  log.info event: 'input started', opts: opts
end

Instance Method Details

#checkpointObject

Write a checkpoint file given the current state



169
170
171
172
173
174
175
176
177
178
# File 'lib/franz/input.rb', line 169

def checkpoint
  old_checkpoints = Dir[@checkpoint_glob].sort_by { |p| File.mtime p }
  path = @checkpoint_path % Time.now
  File.open(path, 'w') { |f| f.write Marshal.dump(state) }
  old_checkpoints.pop # Keep last two checkpoints
  old_checkpoints.map { |c| FileUtils.rm c }
  log.info \
    event: 'input checkpoint saved',
    checkpoint: path
end

#stateObject

Return a compact representation of internal state



156
157
158
159
160
161
162
163
164
165
166
# File 'lib/franz/input.rb', line 156

def state
  stats   = @watch.state
  cursors = @tail.state
  seqs    = @agg.state
  stats.keys.each do |path|
    stats[path] ||= {}
    stats[path][:cursor] = cursors.fetch(path, nil)
    stats[path][:seq] = seqs.fetch(path, nil)
  end
  return stats
end

#stopHash

Stop everything. Has the effect of draining all the Queues and waiting on auxilliarly threads (e.g. eviction) to complete full intervals, so it may ordinarily take tens of seconds, depending on your configuration.

Returns:

  • (Hash)

    compact internal state



144
145
146
147
148
149
150
151
152
153
# File 'lib/franz/input.rb', line 144

def stop
  return state if @stop
  @stop = true
  @t.join
  @watch.stop
  @tail.stop
  @agg.stop
  log.info event: 'input stopped'
  return state
end