Class: Fluent::SwiftSweepInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_swift_sweep.rb

Instance Method Summary collapse

Constructor Details

#initializeSwiftSweepInput

Returns a new instance of SwiftSweepInput.



5
6
7
8
9
10
# File 'lib/fluent/plugin/in_swift_sweep.rb', line 5

def initialize
  super
  require 'fog'
  require 'time'
  require 'open3'
end

Instance Method Details

#check_containerObject



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_swift_sweep.rb', line 91

def check_container
  begin
    @storage.get_container(@swift_container)
  rescue Fog::Storage::OpenStack::NotFound
    if @auto_create_container
      $log.info "Creating container #{@swift_container} on #{@auth_url}, #{@swift_account}"
      @storage.put_container(@swift_container)
    else
      raise "The specified container does not exist: container = #{swift_container}"
    end
  end
end

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/in_swift_sweep.rb', line 22

def configure(conf)
  super

  if @file_path_with_glob.empty?
    raise Fluent::ConfigError, "in_swift_sweep: `@file_path_with_glob` must has a valid path."
  end

  if @auth_url.empty?
    raise Fluent::ConfigError, "in_swift_sweep: `@auth_url` is empty."
  end

  if @auth_user.empty?
    raise Fluent::ConfigError, "in_swift_sweep: `@auth_user` is empty."
  end

  if @auth_tenant.empty?
    raise Fluent::ConfigError, "in_swift_sweep: `@auth_tenant` is empty."
  end

  if @auth_api_key.empty?
    raise Fluent::ConfigError, "in_swift_sweep: `@auth_api_key` is empty."
  end

  if @swift_container.empty?
    raise Fluent::ConfigError, "in_swift_sweep: `@swift_container` is empty."
  end

end

#run_periodicObject



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/in_swift_sweep.rb', line 77

def run_periodic
  while @processing
    Dir.glob(@file_path_with_glob).map do |filename|
      File.open(filename) do |file|
        @storage.put_object(@swift_container, filename, file)
        log.info "File #{filename} sent to swift"
        FileUtils.rm(filename)
        log.info "File #{filename} deleted"
      end
    end
  end
end

#shutdownObject



72
73
74
75
# File 'lib/fluent/plugin/in_swift_sweep.rb', line 72

def shutdown
  @processing = false
  @thread.join
end

#startObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/in_swift_sweep.rb', line 51

def start
  super

  Excon.defaults[:ssl_verify_peer] = false

  $log.debug "openstack_auth_url: #{@auth_url}"
  $log.debug "openstack_username: #{@auth_user}"
  $log.debug "openstack_tenant: #{@auth_tenant}"
  $log.debug "openstack_auth_key: #{@auth_api_key}"
  @storage = Fog::Storage.new :provider => 'OpenStack',
                    :openstack_auth_url => @auth_url,
                    :openstack_username => @auth_user,
                    :openstack_tenant => @auth_tenant,
                    :openstack_api_key  => @auth_api_key

  check_container
  
  @processing = true
  @thread = Thread.new(&method(:run_periodic))
end