Class: Fluent::GithubActivitiesInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_github-activities.rb

Constant Summary collapse

DEFAULT_BASE_TAG =
"github-activity"
DEFAULT_CLIENTS =
4

Instance Method Summary collapse

Constructor Details

#initializeGithubActivitiesInput

Returns a new instance of GithubActivitiesInput.



37
38
39
40
41
42
43
# File 'lib/fluent/plugin/in_github-activities.rb', line 37

def initialize
  super

  require "thread"
  require "pathname"
  require "fluent/plugin/github-activities"
end

Instance Method Details

#shutdownObject



88
89
90
# File 'lib/fluent/plugin/in_github-activities.rb', line 88

def shutdown
  @client_threads.each(&:exit)
end

#startObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/in_github-activities.rb', line 45

def start
  @base_tag = @base_tag.sub(/\.\z/, "")

  users = prepare_users_list
  n_clients = [@clients, users.size].min
  @interval = @interval * n_clients

  @client_threads = []
  @request_queue = Queue.new

  users_manager_params = {
    :users    => users,
    :pos_file => @pos_file,
  }
  users_manager = ::Fluent::GithubActivities::UsersManager.new(users_manager_params)
  users_manager.generate_initial_requests.each do |request|
    @request_queue.push(request)
  end

  n_clients.times do
    @client_threads << Thread.new do
      crawler_options = {
        :access_token => @access_token,
        :watching_users => users,
        :include_commits_from_pull_request => @include_commits_from_pull_request,
        :include_foreign_commits => @include_foreign_commits,
        :pos_file => @pos_file,
        :request_queue => @request_queue,
        :default_interval => @interval,
      }
      crawler = ::Fluent::GithubActivities::Crawler.new(crawler_options)
      crawler.on_emit = lambda do |tag, record|
        router.emit("#{@base_tag}.#{tag}", Engine.now, record)
      end

      loop do
        crawler.process_request
        sleep(crawler.interval_for_next_request)
      end
    end
  end
end