39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/anschel/main.rb', line 39
def agent
log.info \
event: 'hello',
version: VERSION,
options: options.to_hash,
num_cpus: num_cpus
input, filter, output, store, stats, ts = \
nil, nil, nil, nil, nil, nil, nil
begin
raw_config = File.read(options.config).gsub(/^\s*\/\/ .*/, '')
config = JrJackson::Json.load \
raw_config, symbolize_keys: true
setup_log4j config[:log4j]
store = Store.new config[:store], log
stats = Stats.new config[:stats_interval], log
filter = Filter.new config[:filter], stats, log
output = Output.new config[:output], stats, log
input = Input.new \
config[:input], config[:queue_size], stats, log, store[:input]
stats.create 'event'
stats.get 'event'
ts = []
stats_port = config[:stats_port] || 3345
ts << stats_endpoint(stats, stats_port, log)
ts += num_cpus.times.map do
Thread.new do
loop do
raw = input.shift.encode 'UTF-8', \
invalid: :replace, undef: :replace, replace: '?'
begin
event = JrJackson::Json.load raw, symbolize_keys: true
rescue JrJackson::ParseError
log.error \
event: 'main-input-error',
reason: 'could not parse event',
remediation: 'skipping',
raw_event: raw
next
end
output.push filter.apply(event)
stats.inc 'event'
end
end
end
rescue Exception => e
log.fatal \
event: 'exception',
exception: e.inspect,
class: e.class,
message: e.message,
backtrace: e.backtrace
bye output, input, store, log, :error
exit 2
end
log.info event: 'all-systems-clear'
trap('SIGINT') do
bye output, input, store, log
exit
end
ts.map &:join
end
|