Class: CloudWatchLogger::Client::AWS_SDK::DeliveryThread

Inherits:
Thread
  • Object
show all
Defined in:
lib/cloudwatchlogger/client/aws_sdk/threaded.rb

Instance Method Summary collapse

Constructor Details

#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread

Returns a new instance of DeliveryThread.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 38

def initialize(credentials, log_group_name, log_stream_name, opts = {})
  opts[:open_timeout] = opts[:open_timeout] || 120
  opts[:read_timeout] = opts[:read_timeout] || 120
  @credentials = credentials
  @log_group_name = log_group_name
  @log_stream_name = log_stream_name
  @opts = opts

  @queue = Queue.new
  @exiting = false

  super do
    loop do
      connect!(opts) if @client.nil?

      message_object = @queue.pop
      break if message_object == :__delivery_thread_exit_signal__

      begin
        event = {
          log_group_name: @log_group_name,
          log_stream_name: @log_stream_name,
          log_events: [{
            timestamp: message_object[:epoch_time],
            message:   message_object[:message]
          }]
        }
        event[:sequence_token] = @sequence_token if @sequence_token
        response = @client.put_log_events(event)
        unless response.rejected_log_events_info.nil?
          raise CloudWatchLogger::LogEventRejected
        end
        @sequence_token = response.next_sequence_token
      rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => err
        @sequence_token = err.message.split(' ').last
        retry
      end
    end
  end

  at_exit do
    exit!
    join
  end
end

Instance Method Details

#connect!(opts = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 95

def connect!(opts = {})
  args = { http_open_timeout: opts[:open_timeout], http_read_timeout: opts[:read_timeout] }
  args[:region] = @opts[:region] if @opts[:region]
  args.merge!( @credentials.key?(:access_key_id) ? { access_key_id: @credentials[:access_key_id], secret_access_key: @credentials[:secret_access_key] } : {} )

  @client = Aws::CloudWatchLogs::Client.new(args)
  begin
    @client.create_log_stream(
      log_group_name: @log_group_name,
      log_stream_name: @log_stream_name
    )
  rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
    @client.create_log_group(
      log_group_name: @log_group_name
    )
    retry
  rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException, 
    Aws::CloudWatchLogs::Errors::AccessDeniedException
  end
end

#deliver(message) ⇒ Object

Pushes a message onto the internal queue



91
92
93
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 91

def deliver(message)
  @queue.push(message)
end

#exit!Object

Signals the queue that we’re exiting



85
86
87
88
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 85

def exit!
  @exiting = true
  @queue.push :__delivery_thread_exit_signal__
end