Class: LogStash::Outputs::Application_insights::Channels
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Channels
- Defined in:
- lib/logstash/outputs/application_insights/channels.rb
Constant Summary collapse
- @@instance =
Channels.new
Class Method Summary collapse
Instance Method Summary collapse
- #channel(instrumentation_key, table_id) ⇒ Object
- #close ⇒ Object
-
#initialize ⇒ Channels
constructor
A new instance of Channels.
- #mark_invalid_instrumentation_key(instrumentation_key) ⇒ Object
- #mark_invalid_table_id(table_id) ⇒ Object
- #periodic_forward_events ⇒ Object
- #receive(event, encoded_event) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ Channels
Returns a new instance of Channels.
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 27 def initialize configuration = Config.current @logger = configuration[:logger] @instrumentation_key_table_id_db = {} @channels = [ ] @create_semaphore = Mutex.new @default_instrumentation_key = configuration[:instrumentation_key] @default_table_id = configuration[:table_id] @tables = configuration[:tables] end |
Class Method Details
.instance ⇒ Object
139 140 141 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 139 def self.instance @@instance end |
Instance Method Details
#channel(instrumentation_key, table_id) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 65 def channel ( instrumentation_key, table_id ) begin dispatch_channel( instrumentation_key, table_id ) rescue NoChannelError begin create_channel( instrumentation_key, table_id ) rescue ChannelExistError # can happen due to race conditions dispatch_channel( instrumentation_key, table_id ) end end end |
#close ⇒ Object
121 122 123 124 125 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 121 def close @channels.each do |channel| channel.close end end |
#mark_invalid_instrumentation_key(instrumentation_key) ⇒ Object
127 128 129 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 127 def mark_invalid_instrumentation_key ( instrumentation_key ) # TODO should go to lost and found container end |
#mark_invalid_table_id(table_id) ⇒ Object
131 132 133 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 131 def mark_invalid_table_id ( table_id ) # TODO should go to lost and found container end |
#periodic_forward_events ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 79 def periodic_forward_events Thread.new do loop do sleep( 0.5 ) channels = @create_semaphore.synchronize { @channels.dup } channels.each do |channel| channel.flush end end end end |
#receive(event, encoded_event) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 48 def receive ( event, encoded_event ) if LogStash::SHUTDOWN == event @logger.info { "received a LogStash::SHUTDOWN event" } elsif LogStash::FLUSH == event @logger.info { "received a LogStash::FLUSH event" } else data = event.to_hash table_id = ( event.include?( METADATA_FIELD_TABLE_ID ) ? event.sprintf( "%{#{METADATA_FIELD_TABLE_ID}}" ) : data[FIELD_TABLE_ID] ) || @default_table_id instrumentation_key = ( event.include?( METADATA_FIELD_INSTRUMENTATION_KEY ) ? event.sprintf( "%{#{METADATA_FIELD_INSTRUMENTATION_KEY}}" ) : data[FIELD_INSTRUMENTATION_KEY] ) || @default_instrumentation_key @flow_control.pass_or_wait channel( instrumentation_key, table_id ) << data end end |
#start ⇒ Object
41 42 43 44 45 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 41 def start @flow_control = Flow_control.instance # launch tread that forward events from channels to azure storage periodic_forward_events end |