Class: CloudWatchLogger::Client::AWS_SDK::DeliveryThread
- Inherits:
-
Thread
- Object
- Thread
- CloudWatchLogger::Client::AWS_SDK::DeliveryThread
- Defined in:
- lib/cloudwatchlogger/client/aws_sdk/threaded.rb
Instance Method Summary collapse
- #connect!(opts = {}) ⇒ Object
-
#deliver(message) ⇒ Object
Pushes a message onto the internal queue.
-
#exit! ⇒ Object
Signals the queue that we’re exiting.
-
#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread
constructor
A new instance of DeliveryThread.
Constructor Details
#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread
Returns a new instance of DeliveryThread.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 38 def initialize(credentials, log_group_name, log_stream_name, opts = {}) opts[:open_timeout] = opts[:open_timeout] || 120 opts[:read_timeout] = opts[:read_timeout] || 120 @credentials = credentials @log_group_name = log_group_name @log_stream_name = log_stream_name @opts = opts @queue = Queue.new @exiting = false super do loop do connect!(opts) if @client.nil? = @queue.pop break if == :__delivery_thread_exit_signal__ begin event = { log_group_name: @log_group_name, log_stream_name: @log_stream_name, log_events: [{ timestamp: [:epoch_time], message: [:message] }] } event[:sequence_token] = @sequence_token if @sequence_token response = @client.put_log_events(event) unless response.rejected_log_events_info.nil? raise CloudWatchLogger::LogEventRejected end @sequence_token = response.next_sequence_token rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => err @sequence_token = err..split(' ').last retry end end end at_exit do exit! join end end |
Instance Method Details
#connect!(opts = {}) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 95 def connect!(opts = {}) args = { http_open_timeout: opts[:open_timeout], http_read_timeout: opts[:read_timeout] } args[:region] = @opts[:region] if @opts[:region] args.merge!( @credentials.key?(:access_key_id) ? { access_key_id: @credentials[:access_key_id], secret_access_key: @credentials[:secret_access_key] } : {} ) @client = Aws::CloudWatchLogs::Client.new(args) begin @client.create_log_stream( log_group_name: @log_group_name, log_stream_name: @log_stream_name ) rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException @client.create_log_group( log_group_name: @log_group_name ) retry rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException, Aws::CloudWatchLogs::Errors::AccessDeniedException end end |
#deliver(message) ⇒ Object
Pushes a message onto the internal queue
91 92 93 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 91 def deliver() @queue.push() end |
#exit! ⇒ Object
Signals the queue that we’re exiting
85 86 87 88 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 85 def exit! @exiting = true @queue.push :__delivery_thread_exit_signal__ end |