Class: AzureEventHubsHttpSender
- Inherits:
-
Object
- Object
- AzureEventHubsHttpSender
- Defined in:
- lib/fluent/plugin/azureeventhubs/http.rb
Instance Method Summary collapse
-
#initialize(connection_string, hub_name, expiry = 3600, proxy_addr = '', proxy_port = 3128, open_timeout = 60, read_timeout = 60) ⇒ AzureEventHubsHttpSender
constructor
A new instance of AzureEventHubsHttpSender.
- #send(payload) ⇒ Object
- #send_w_properties(payload, properties) ⇒ Object
Constructor Details
#initialize(connection_string, hub_name, expiry = 3600, proxy_addr = '', proxy_port = 3128, open_timeout = 60, read_timeout = 60) ⇒ AzureEventHubsHttpSender
Returns a new instance of AzureEventHubsHttpSender.
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/fluent/plugin/azureeventhubs/http.rb', line 2 def initialize(connection_string, hub_name, expiry=3600,proxy_addr='',proxy_port=3128,open_timeout=60,read_timeout=60) require 'openssl' require 'base64' require 'net/http' require 'json' require 'cgi' require 'time' require 'httpclient' @connection_string = connection_string @hub_name = hub_name @expiry_interval = expiry @proxy_addr = proxy_addr @proxy_port = proxy_port @open_timeout = open_timeout @read_timeout = read_timeout if @connection_string.count(';') != 2 raise "Connection String format is not correct" end @connection_string.split(';').each do |part| if ( part.index('Endpoint') == 0 ) @endpoint = 'https' + part[11..-1] elsif ( part.index('SharedAccessKeyName') == 0 ) @sas_key_name = part[20..-1] elsif ( part.index('SharedAccessKey') == 0 ) @sas_key_value = part[16..-1] end end @uri = URI.parse("#{@endpoint}#{@hub_name}/messages") if (proxy_addr.to_s.empty?) @client = HTTPClient.new else proxy_url = "#{proxy_addr}:#{proxy_port}" @client = HTTPClient.new(proxy) end end |
Instance Method Details
#send(payload) ⇒ Object
53 54 55 |
# File 'lib/fluent/plugin/azureeventhubs/http.rb', line 53 def send(payload) send_w_properties(payload, nil) end |
#send_w_properties(payload, properties) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/azureeventhubs/http.rb', line 57 def send_w_properties(payload, properties) token = generate_sas_token(@uri.to_s) headers = { 'Content-Type' => 'application/atom+xml;type=entry;charset=utf-8', 'Authorization' => token } if not properties.nil? headers = headers.merge(properties) end body = payload.to_json res = @client.post(@uri.to_s, body, headers) rescue HTTPClient::TimeoutError, Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::ETIMEDOUT, Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e end |