Class: Franz::Input
- Inherits:
-
Object
- Object
- Franz::Input
- Defined in:
- lib/franz/input.rb
Overview
File input for Franz. Really, the only input for Franz, so I hope you like it.
Instance Method Summary collapse
-
#checkpoint ⇒ Object
Write a checkpoint file given the current state.
-
#initialize(opts = {}) ⇒ Input
constructor
Start a new input in the background.
-
#state ⇒ Object
Return a compact representation of internal state.
-
#stop ⇒ Hash
Stop everything.
Constructor Details
#initialize(opts = {}) ⇒ Input
Start a new input in the background. We’ll generate a stream of events by watching the filesystem for changes (Franz::Discover and Franz::Watch), tailing files (Franz::Tail), and generating events (Franz::Agg)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/franz/input.rb', line 25 def initialize opts={} opts = { checkpoint: 'franz.*.checkpoint', checkpoint_interval: 30, logger: Logger.new(STDOUT), output: [], input: { ignore_before: 0, discover_bound: 10_000, watch_bound: 1_000, tail_bound: 1_000, discover_interval: nil, watch_interval: nil, eviction_interval: nil, flush_interval: nil, buffer_limit: nil, line_limit: nil, read_limit: nil, play_catchup?: nil, configs: [] } }.deep_merge!(opts) @logger = opts[:logger] @checkpoint_interval = opts[:checkpoint_interval] @checkpoint_path = opts[:checkpoint].sub('*', '%d') @checkpoint_glob = opts[:checkpoint] # The checkpoint contains a Marshalled Hash with a compact representation of # stateful inputs to various Franz streaming classes (e.g. the "known" option # to Franz::Discover). This state file is generated automatically every time # the input exits (see below) and also at regular intervals. checkpoints = Dir[@checkpoint_glob].sort_by { |path| File.mtime path } checkpoints = checkpoints.reject { |path| File.zero? path } last_checkpoint_path = checkpoints.pop state = nil unless last_checkpoint_path.nil? last_checkpoint = File.read(last_checkpoint_path) state = Marshal.load last_checkpoint log.info \ event: 'input checkpoint loaded', checkpoint: last_checkpoint_path end full_state = state.nil? ? nil : state.dup state = state || {} known = state.keys stats, cursors, seqs = {}, {}, {} known.each do |path| cursor = state[path].delete :cursor seq = state[path].delete :seq cursors[path] = cursor unless cursor.nil? seqs[path] = seq unless seq.nil? stats[path] = state[path] end discoveries = possibly_bounded_queue opts[:input][:discover_bound] deletions = possibly_bounded_queue opts[:input][:discover_bound] watch_events = possibly_bounded_queue opts[:input][:watch_bound] tail_events = possibly_bounded_queue opts[:input][:tail_bound] @disover = Franz::Discover.new \ discoveries: discoveries, deletions: deletions, configs: opts[:input][:configs], discover_interval: opts[:input][:discover_interval], ignore_before: opts[:input][:ignore_before], logger: opts[:logger], known: known, full_state: full_state @watch = Franz::Watch.new \ discoveries: discoveries, deletions: deletions, watch_events: watch_events, watch_interval: opts[:input][:watch_interval], play_catchup?: opts[:input][:play_catchup?], logger: opts[:logger], stats: stats, full_state: full_state @tail = Franz::Tail.new \ watch_events: watch_events, tail_events: tail_events, block_size: opts[:input][:block_size], line_limit: opts[:input][:line_limit], read_limit: opts[:input][:read_limit], logger: opts[:logger], cursors: cursors, full_state: full_state @agg = Franz::Agg.new \ configs: opts[:input][:configs], tail_events: tail_events, agg_events: opts[:output], flush_interval: opts[:input][:flush_interval], buffer_limit: opts[:input][:buffer_limit], logger: opts[:logger], seqs: seqs, full_state: full_state @stop = false @t = Thread.new do until @stop checkpoint sleep @checkpoint_interval end end log.info event: 'input started', opts: opts end |
Instance Method Details
#checkpoint ⇒ Object
Write a checkpoint file given the current state
169 170 171 172 173 174 175 176 177 178 |
# File 'lib/franz/input.rb', line 169 def checkpoint old_checkpoints = Dir[@checkpoint_glob].sort_by { |p| File.mtime p } path = @checkpoint_path % Time.now File.open(path, 'w') { |f| f.write Marshal.dump(state) } old_checkpoints.pop # Keep last two checkpoints old_checkpoints.map { |c| FileUtils.rm c } log.info \ event: 'input checkpoint saved', checkpoint: path end |
#state ⇒ Object
Return a compact representation of internal state
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/franz/input.rb', line 156 def state stats = @watch.state cursors = @tail.state seqs = @agg.state stats.keys.each do |path| stats[path] ||= {} stats[path][:cursor] = cursors.fetch(path, nil) stats[path][:seq] = seqs.fetch(path, nil) end return stats end |
#stop ⇒ Hash
Stop everything. Has the effect of draining all the Queues and waiting on auxilliarly threads (e.g. eviction) to complete full intervals, so it may ordinarily take tens of seconds, depending on your configuration.
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/franz/input.rb', line 144 def stop return state if @stop @stop = true @t.join @watch.stop @tail.stop @agg.stop log.info event: 'input stopped' return state end |