45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/fluent/plugin/in_github-activities.rb', line 45
def start
@base_tag = @base_tag.sub(/\.\z/, "")
users = prepare_users_list
n_clients = [@clients, users.size].min
@interval = @interval * n_clients
@client_threads = []
@request_queue = Queue.new
users_manager_params = {
:users => users,
:pos_file => @pos_file,
}
users_manager = ::Fluent::GithubActivities::UsersManager.new(users_manager_params)
users_manager.generate_initial_requests.each do |request|
@request_queue.push(request)
end
n_clients.times do
@client_threads << Thread.new do
crawler_options = {
:access_token => @access_token,
:watching_users => users,
:include_commits_from_pull_request => @include_commits_from_pull_request,
:include_foreign_commits => @include_foreign_commits,
:pos_file => @pos_file,
:request_queue => @request_queue,
:default_interval => @interval,
}
crawler = ::Fluent::GithubActivities::Crawler.new(crawler_options)
crawler.on_emit = lambda do |tag, record|
router.emit("#{@base_tag}.#{tag}", Engine.now, record)
end
loop do
crawler.process_request
sleep(crawler.interval_for_next_request)
end
end
end
end
|