Class: AzureEventHubsHttpSender

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/azureeventhubs/http.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection_string, hub_name, expiry = 3600, proxy_addr = '', proxy_port = 3128, open_timeout = 60, read_timeout = 60) ⇒ AzureEventHubsHttpSender

Returns a new instance of AzureEventHubsHttpSender.



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/azureeventhubs/http.rb', line 2

def initialize(connection_string, hub_name, expiry=3600,proxy_addr='',proxy_port=3128,open_timeout=60,read_timeout=60)
  require 'openssl'
  require 'base64'
  require 'net/http'
  require 'json'
  require 'cgi'
  require 'time'
  require 'httpclient'
  @connection_string = connection_string
  @hub_name = hub_name
  @expiry_interval = expiry
  @proxy_addr = proxy_addr
  @proxy_port = proxy_port
  @open_timeout = open_timeout
  @read_timeout = read_timeout

  if @connection_string.count(';') != 2
    raise "Connection String format is not correct"
  end

  @connection_string.split(';').each do |part|
    if ( part.index('Endpoint') == 0 )
      @endpoint = 'https' + part[11..-1]
    elsif ( part.index('SharedAccessKeyName') == 0 )
      @sas_key_name = part[20..-1]
    elsif ( part.index('SharedAccessKey') == 0 )
      @sas_key_value = part[16..-1]
    end
  end
  @uri = URI.parse("#{@endpoint}#{@hub_name}/messages")

  if (proxy_addr.to_s.empty?)
    @client = HTTPClient.new
  else
    proxy_url = "#{proxy_addr}:#{proxy_port}"
    @client = HTTPClient.new(proxy)
  end
end

Instance Method Details

#send(payload) ⇒ Object



53
54
55
# File 'lib/fluent/plugin/azureeventhubs/http.rb', line 53

def send(payload)
  send_w_properties(payload, nil)
end

#send_w_properties(payload, properties) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/azureeventhubs/http.rb', line 57

def send_w_properties(payload, properties)
  token = generate_sas_token(@uri.to_s)
  headers = {
    'Content-Type' => 'application/atom+xml;type=entry;charset=utf-8',
    'Authorization' => token
  }
  if not properties.nil?
    headers = headers.merge(properties)
  end
  body = payload.to_json
  res = @client.post(@uri.to_s, body, headers)
  rescue HTTPClient::TimeoutError, Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::ETIMEDOUT, Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e
end