Class: LogStash::Outputs::Application_insights::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(instrumentation_key, table_id) ⇒ Channel

Returns a new instance of Channel.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/outputs/application_insights/channel.rb', line 33

def initialize ( instrumentation_key, table_id )
  @closing = false
  configuration = Config.current
  
  @disable_truncation = configuration[:disable_truncation]
  @file_pipe = !configuration[:disable_compression]
  @gzip_file = !configuration[:disable_compression]
  @blob_max_bytesize = configuration[:blob_max_bytesize]
  @blob_max_events = configuration[:blob_max_events]

  @logger = configuration[:logger]

  @logger.debug { "Create a new channel, instrumentation_key / table_id : #{instrumentation_key} / #{table_id}" }
  @instrumentation_key = instrumentation_key
  @table_id = table_id
  set_table_properties( configuration )
  @semaphore = Mutex.new
  @workers_channel = {  }

  @failed_on_notify_retry_Q = Queue.new
  launch_notify_recovery_thread

  @blob_extension = ".#{@event_format}"
  if file_pipe?
    @blob_extension = "_#{@event_format}.gz" if gzip_file?
    @add_pipe_threshold = 0
    @file_prefix = configuration[:local_file_prefix]
    @file = nil
    @failed_on_file_upload_retry_Q = Queue.new
    launch_file_upload_recovery_thread
  else
    @add_pipe_threshold = CHANNEL_THRESHOLD_TO_ADD_UPLOAD_PIPE
    @failed_on_block_upload_retry_Q = Queue.new
    launch_block_upload_recovery_thread
  end

  @active_upload_pipes = [ Upload_pipe.new( self, 1 ) ]
end

Instance Attribute Details

#blob_extensionObject (readonly)

Returns the value of attribute blob_extension.



28
29
30
# File 'lib/logstash/outputs/application_insights/channel.rb', line 28

def blob_extension
  @blob_extension
end

#blob_max_delayObject (readonly)

Returns the value of attribute blob_max_delay.



27
28
29
# File 'lib/logstash/outputs/application_insights/channel.rb', line 27

def blob_max_delay
  @blob_max_delay
end

#event_formatObject (readonly)

Returns the value of attribute event_format.



29
30
31
# File 'lib/logstash/outputs/application_insights/channel.rb', line 29

def event_format
  @event_format
end

#instrumentation_keyObject (readonly)

Returns the value of attribute instrumentation_key.



25
26
27
# File 'lib/logstash/outputs/application_insights/channel.rb', line 25

def instrumentation_key
  @instrumentation_key
end

#table_idObject (readonly)

Returns the value of attribute table_id.



26
27
28
# File 'lib/logstash/outputs/application_insights/channel.rb', line 26

def table_id
  @table_id
end

Instance Method Details

#<<(data) ⇒ Object

received data is an hash of the event (does not include metadata)



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/logstash/outputs/application_insights/channel.rb', line 93

def << ( data )
  if @serialized_event_field && data[@serialized_event_field]
    serialized_event = serialize_serialized_event_field( data[@serialized_event_field] )
  else
    serialized_event = ( EXT_EVENT_FORMAT_CSV == @event_format ? serialize_to_csv( data ) : serialize_to_json( data ) )
  end

  if serialized_event
    sub_channel = @workers_channel[Thread.current] || @semaphore.synchronize { @workers_channel[Thread.current] = Sub_channel.new( @event_separator ) }
    sub_channel << serialized_event
  else
    @logger.warn { "event not uploaded, no relevant data in event. table_id: #{@table_id}, event: #{data}" }
  end
end

#closeObject



81
82
83
84
85
86
# File 'lib/logstash/outputs/application_insights/channel.rb', line 81

def close
  @closing = true
  @active_upload_pipes.each do |upload_pipe|
    upload_pipe.close
  end
end

#file_pipe?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/logstash/outputs/application_insights/channel.rb', line 77

def file_pipe?
  @file_pipe
end

#flushObject



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/logstash/outputs/application_insights/channel.rb', line 109

def flush
  if file_pipe?
    gz_collect_and_compress_blocks_to_file
    if file_expired_or_full?
      enqueue_to_pipe( [ @file ] )
      @file = nil
    end
  else
    list = collect_blocks
    enqueue_to_pipe( list )
  end
end

#gzip_file?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/logstash/outputs/application_insights/channel.rb', line 73

def gzip_file?
  @gzip_file
end

#recover_later_block_upload(block_to_upload) ⇒ Object



128
129
130
# File 'lib/logstash/outputs/application_insights/channel.rb', line 128

def recover_later_block_upload( block_to_upload )
  @failed_on_block_upload_retry_Q << block_to_upload
end

#recover_later_file_upload(file_to_upload) ⇒ Object



132
133
134
# File 'lib/logstash/outputs/application_insights/channel.rb', line 132

def recover_later_file_upload( file_to_upload )
  @failed_on_file_upload_retry_Q << file_to_upload
end

#recover_later_notification(tuple) ⇒ Object



123
124
125
# File 'lib/logstash/outputs/application_insights/channel.rb', line 123

def recover_later_notification( tuple )
  @failed_on_notify_retry_Q << tuple
end

#stopped?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/logstash/outputs/application_insights/channel.rb', line 88

def stopped?
  @closing
end