5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/logstash/outputs/elasticsearch/http_client_builder.rb', line 5
def self.build(logger, hosts, params)
client_settings = {
:pool_max => params["pool_max"],
:pool_max_per_route => params["pool_max_per_route"],
:check_connection_timeout => params["validate_after_inactivity"],
:http_compression => params["http_compression"]
}
client_settings[:proxy] = params["proxy"] if params["proxy"]
common_options = {
:client_settings => client_settings,
:metric => params["metric"],
:resurrect_delay => params["resurrect_delay"]
}
if params["sniffing"]
common_options[:sniffing] = true
common_options[:sniffer_delay] = params["sniffing_delay"]
end
common_options[:timeout] = params["timeout"] if params["timeout"]
if params["path"]
client_settings[:path] = dedup_slashes("/#{params["path"]}/")
end
common_options[:bulk_path] = if params["bulk_path"]
dedup_slashes("/#{params["bulk_path"]}")
else
dedup_slashes("/#{params["path"]}/_bulk")
end
common_options[:sniffing_path] = if params["sniffing_path"]
dedup_slashes("/#{params["sniffing_path"]}")
else
dedup_slashes("/#{params["path"]}/_nodes/http")
end
common_options[:healthcheck_path] = if params["healthcheck_path"]
dedup_slashes("/#{params["healthcheck_path"]}")
else
dedup_slashes("/#{params["path"]}")
end
if params["parameters"]
client_settings[:parameters] = params["parameters"]
end
logger.debug? && logger.debug("Normalizing http path", :path => params["path"], :normalized => client_settings[:path])
client_settings.merge! setup_ssl(logger, params)
common_options.merge! setup_basic_auth(logger, params)
external_version_types = ["external", "external_gt", "external_gte"]
raise(
LogStash::ConfigurationError,
"External versioning requires the presence of a version number."
) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil
raise(
LogStash::ConfigurationError,
"External versioning is not supported by the create action."
) if params['action'] == 'create' and external_version_types.include?(params.fetch('version_type', ''))
raise( LogStash::ConfigurationError,
"doc_as_upsert and scripted_upsert are mutually exclusive."
) if params["doc_as_upsert"] and params["scripted_upsert"]
raise(
LogStash::ConfigurationError,
"Specifying action => 'update' needs a document_id."
) if params['action'] == 'update' and params.fetch('document_id', '') == ''
raise(
LogStash::ConfigurationError,
"External versioning is not supported by the update action. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html."
) if params['action'] == 'update' and external_version_types.include?(params.fetch('version_type', ''))
update_options = {
:doc_as_upsert => params["doc_as_upsert"],
:script_var_name => params["script_var_name"],
:script_type => params["script_type"],
:script_lang => params["script_lang"],
:scripted_upsert => params["scripted_upsert"]
}
common_options.merge! update_options if params["action"] == 'update'
create_http_client(common_options.merge(:hosts => hosts, :logger => logger))
end
|