Class: LogStash::Outputs::Application_insights::Channels

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/channels.rb

Constant Summary collapse

@@instance =
Channels.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeChannels

Returns a new instance of Channels.



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/logstash/outputs/application_insights/channels.rb', line 27

def initialize
  configuration = Config.current

  @logger = configuration[:logger]

  @instrumentation_key_table_id_db = {}
  @channels = [  ]
  @create_semaphore = Mutex.new

  @default_instrumentation_key = configuration[:instrumentation_key]
  @default_table_id = configuration[:table_id]
  @tables = configuration[:tables]
end

Class Method Details

.instanceObject



139
140
141
# File 'lib/logstash/outputs/application_insights/channels.rb', line 139

def self.instance
  @@instance
end

Instance Method Details

#channel(instrumentation_key, table_id) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/outputs/application_insights/channels.rb', line 65

def channel ( instrumentation_key, table_id )
  begin
    dispatch_channel( instrumentation_key, table_id )

  rescue NoChannelError
    begin
      create_channel( instrumentation_key, table_id )
    rescue ChannelExistError # can happen due to race conditions
      dispatch_channel( instrumentation_key, table_id )
    end
  end
end

#closeObject



121
122
123
124
125
# File 'lib/logstash/outputs/application_insights/channels.rb', line 121

def close
  @channels.each do |channel|
    channel.close
  end
end

#mark_invalid_instrumentation_key(instrumentation_key) ⇒ Object



127
128
129
# File 'lib/logstash/outputs/application_insights/channels.rb', line 127

def mark_invalid_instrumentation_key ( instrumentation_key )
  # TODO should go to lost and found container
end

#mark_invalid_table_id(table_id) ⇒ Object



131
132
133
# File 'lib/logstash/outputs/application_insights/channels.rb', line 131

def mark_invalid_table_id ( table_id )
  # TODO should go to lost and found container
end

#periodic_forward_eventsObject



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/logstash/outputs/application_insights/channels.rb', line 79

def periodic_forward_events
  Thread.new do
    loop do
      sleep( 0.5 )
      channels = @create_semaphore.synchronize { @channels.dup }
      channels.each do |channel|
        channel.flush
      end
    end
  end
end

#receive(event, encoded_event) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/logstash/outputs/application_insights/channels.rb', line 48

def receive ( event, encoded_event )
  if LogStash::SHUTDOWN == event
    @logger.info { "received a LogStash::SHUTDOWN event" }

  elsif LogStash::FLUSH == event
    @logger.info { "received a LogStash::FLUSH event" }
  else
    data = event.to_hash
    table_id = ( event.include?( METADATA_FIELD_TABLE_ID ) ? event.sprintf( "%{#{METADATA_FIELD_TABLE_ID}}" ) : data[FIELD_TABLE_ID] ) || @default_table_id
    instrumentation_key = ( event.include?( METADATA_FIELD_INSTRUMENTATION_KEY ) ? event.sprintf( "%{#{METADATA_FIELD_INSTRUMENTATION_KEY}}" ) : data[FIELD_INSTRUMENTATION_KEY] ) || @default_instrumentation_key

    @flow_control.pass_or_wait
    channel( instrumentation_key, table_id ) << data
  end
end

#startObject



41
42
43
44
45
# File 'lib/logstash/outputs/application_insights/channels.rb', line 41

def start
  @flow_control = Flow_control.instance
  # launch tread that forward events from channels to azure storage
  periodic_forward_events
end