Class: LogStash::Outputs::Application_insights::Flow_control

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/flow_control.rb

Constant Summary collapse

@@instance =
Flow_control.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFlow_control

Returns a new instance of Flow_control.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/logstash/outputs/application_insights/flow_control.rb', line 27

def initialize
  configuration = Config.current

  @logger_progname = configuration[:logger_progname]
  @logger = configuration[:logger]

  @flow_control_suspend_bytes = configuration[:flow_control_suspend_bytes]
  @flow_control_resume_bytes = configuration[:flow_control_resume_bytes]
  @flow_control_delay = configuration[:flow_control_delay]

  @flow_control_semaphore = Mutex.new

  @state = State.instance
end

Class Method Details

.instanceObject



74
75
76
# File 'lib/logstash/outputs/application_insights/flow_control.rb', line 74

def self.instance
  @@instance
end

Instance Method Details

#pass_or_waitObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/logstash/outputs/application_insights/flow_control.rb', line 43

def pass_or_wait
  bytes_in_memory = @state.bytes_in_memory
  return if bytes_in_memory <= @flow_control_resume_bytes

  @flow_control_semaphore.synchronize {
    bytes_in_memory = @state.bytes_in_memory
    if bytes_in_memory > @flow_control_suspend_bytes
      display_msg( "suspend receiving event, memory above #{@flow_control_suspend_bytes} bytes, wait till memory below #{@flow_control_resume_bytes} bytes" )
      loop do
        GC.start
        sleep( @flow_control_delay )
        bytes_in_memory = @state.bytes_in_memory
        break if bytes_in_memory <= @flow_control_resume_bytes
        display_msg( "continue to suspend receiving event, memory level #{bytes_in_memory} bytes, wait till memory below #{@flow_control_resume_bytes} bytes " )
      end
      display_msg( "resume receiving event, memory level #{bytes_in_memory}, below #{@flow_control_resume_bytes} bytes" )
    end
  }
end