Class: LogStash::Outputs::Application_insights::Channel
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Channel
- Defined in:
- lib/logstash/outputs/application_insights/channel.rb
Instance Attribute Summary collapse
-
#blob_extension ⇒ Object
readonly
Returns the value of attribute blob_extension.
-
#blob_max_delay ⇒ Object
readonly
Returns the value of attribute blob_max_delay.
-
#event_format ⇒ Object
readonly
Returns the value of attribute event_format.
-
#instrumentation_key ⇒ Object
readonly
Returns the value of attribute instrumentation_key.
-
#table_id ⇒ Object
readonly
Returns the value of attribute table_id.
Instance Method Summary collapse
-
#<<(data) ⇒ Object
received data is an hash of the event (does not include metadata).
- #close ⇒ Object
- #file_pipe? ⇒ Boolean
- #flush ⇒ Object
- #gzip_file? ⇒ Boolean
-
#initialize(instrumentation_key, table_id) ⇒ Channel
constructor
A new instance of Channel.
- #recover_later_block_upload(block_to_upload) ⇒ Object
- #recover_later_file_upload(file_to_upload) ⇒ Object
- #recover_later_notification(tuple) ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(instrumentation_key, table_id) ⇒ Channel
Returns a new instance of Channel.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 33 def initialize ( instrumentation_key, table_id ) @closing = false configuration = Config.current @disable_truncation = configuration[:disable_truncation] @file_pipe = !configuration[:disable_compression] @gzip_file = !configuration[:disable_compression] @blob_max_bytesize = configuration[:blob_max_bytesize] @blob_max_events = configuration[:blob_max_events] @logger = configuration[:logger] @logger.debug { "Create a new channel, instrumentation_key / table_id : #{instrumentation_key} / #{table_id}" } @instrumentation_key = instrumentation_key @table_id = table_id set_table_properties( configuration ) @semaphore = Mutex.new @workers_channel = { } @failed_on_notify_retry_Q = Queue.new launch_notify_recovery_thread @blob_extension = ".#{@event_format}" if file_pipe? @blob_extension = "_#{@event_format}.gz" if gzip_file? @add_pipe_threshold = 0 @file_prefix = configuration[:local_file_prefix] @file = nil @failed_on_file_upload_retry_Q = Queue.new launch_file_upload_recovery_thread else @add_pipe_threshold = CHANNEL_THRESHOLD_TO_ADD_UPLOAD_PIPE @failed_on_block_upload_retry_Q = Queue.new launch_block_upload_recovery_thread end @active_upload_pipes = [ Upload_pipe.new( self, 1 ) ] end |
Instance Attribute Details
#blob_extension ⇒ Object (readonly)
Returns the value of attribute blob_extension.
28 29 30 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 28 def blob_extension @blob_extension end |
#blob_max_delay ⇒ Object (readonly)
Returns the value of attribute blob_max_delay.
27 28 29 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 27 def blob_max_delay @blob_max_delay end |
#event_format ⇒ Object (readonly)
Returns the value of attribute event_format.
29 30 31 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 29 def event_format @event_format end |
#instrumentation_key ⇒ Object (readonly)
Returns the value of attribute instrumentation_key.
25 26 27 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 25 def instrumentation_key @instrumentation_key end |
#table_id ⇒ Object (readonly)
Returns the value of attribute table_id.
26 27 28 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 26 def table_id @table_id end |
Instance Method Details
#<<(data) ⇒ Object
received data is an hash of the event (does not include metadata)
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 93 def << ( data ) if @serialized_event_field && data[@serialized_event_field] serialized_event = serialize_serialized_event_field( data[@serialized_event_field] ) else serialized_event = ( EXT_EVENT_FORMAT_CSV == @event_format ? serialize_to_csv( data ) : serialize_to_json( data ) ) end if serialized_event sub_channel = @workers_channel[Thread.current] || @semaphore.synchronize { @workers_channel[Thread.current] = Sub_channel.new( @event_separator ) } sub_channel << serialized_event else @logger.warn { "event not uploaded, no relevant data in event. table_id: #{@table_id}, event: #{data}" } end end |
#close ⇒ Object
81 82 83 84 85 86 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 81 def close @closing = true @active_upload_pipes.each do |upload_pipe| upload_pipe.close end end |
#file_pipe? ⇒ Boolean
77 78 79 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 77 def file_pipe? @file_pipe end |
#flush ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 109 def flush if file_pipe? gz_collect_and_compress_blocks_to_file if file_expired_or_full? enqueue_to_pipe( [ @file ] ) @file = nil end else list = collect_blocks enqueue_to_pipe( list ) end end |
#gzip_file? ⇒ Boolean
73 74 75 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 73 def gzip_file? @gzip_file end |
#recover_later_block_upload(block_to_upload) ⇒ Object
128 129 130 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 128 def recover_later_block_upload( block_to_upload ) @failed_on_block_upload_retry_Q << block_to_upload end |
#recover_later_file_upload(file_to_upload) ⇒ Object
132 133 134 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 132 def recover_later_file_upload( file_to_upload ) @failed_on_file_upload_retry_Q << file_to_upload end |
#recover_later_notification(tuple) ⇒ Object
123 124 125 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 123 def recover_later_notification( tuple ) @failed_on_notify_retry_Q << tuple end |
#stopped? ⇒ Boolean
88 89 90 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 88 def stopped? @closing end |