Class: LogStash::Outputs::Application_insights::Upload_pipe

Inherits:
Blob
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/upload_pipe.rb

Constant Summary

Constants inherited from Blob

Blob::CREATE_EXIST_ERRORS

Instance Attribute Summary

Attributes inherited from Blob

#last_io_exception

Instance Method Summary collapse

Methods inherited from Blob

close, #create_container_exist_recovery, #create_exist_recovery, #create_table_exist_recovery, #state_table_delete, #state_table_insert, #state_table_query, #state_table_update, stopped?, #update_commited_or_uncommited_list

Methods inherited from Context

#clear_context, #context_to_table_entity, #context_to_tuple, #table_entity_to_context, #table_entity_to_tuple, #tuple_to_context

Constructor Details

#initialize(channel = nil, id = nil, tuple = nil) ⇒ Upload_pipe

Returns a new instance of Upload_pipe.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 25

def initialize ( channel = nil, id = nil, tuple = nil )

  # super first parameter must be nil. blob first parameter is channel, otherwise it will pass storage_account_name as channel
  super( tuple )
  @channel = channel
  if @channel
    @id = id
    @instrumentation_key = @channel.instrumentation_key
    @table_id = @channel.table_id
    @blob_max_delay = @channel.blob_max_delay
    @blob_extension = @channel.blob_extension
    @event_format = @channel.event_format
    @file_pipe = @channel.file_pipe?

    @io_queue = Queue.new

    # create a thread that handles the IO of the blob
    if @file_pipe
      launch_file_pipe_thread
    else
      launch_block_pipe_thread
    end
  end

end

Instance Method Details

#<<(block) ⇒ Object



150
151
152
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 150

def << ( block )
  @io_queue << block
end

#busy?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 146

def busy?
  0 < @io_queue.length  ||  0 == @io_queue.num_waiting
end

#closeObject

close blob. It will finish whatever was already on the queue, and if necessary commit called on shutdown



137
138
139
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 137

def close
  @io_queue << :close
end

#commitObject



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 155

def commit
  unless @uploaded_block_ids.empty?
    @action = :commit
    @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable ]
    success =  storage_io_block {
      @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}"
      # assume that exceptions can be raised due to this method:
      @client.blobClient.commit_blob_blocks( @container_name, @blob_name, @uploaded_block_ids ) unless @configuration[:disable_blob_upload]
      @log_state = :committed
    }
    if success
      # next stage
      state_table_update
    else
      @storage_recovery.recover_later( context_to_tuple, :commit, @storage_account_name )
    end
  end
end

#launch_block_pipe_threadObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 52

def launch_block_pipe_thread
  Thread.new do
    timer = Timer.new
    next_block = nil
    loop do
      block_to_upload = nil # release reference to resource for GC
      block_to_upload = next_block || @io_queue.pop
      next_block = nil

      if :trigger == timer.state
        next_block = block_to_upload unless :wakeup == block_to_upload
        block_to_upload = :timeout
        to_commit = :commit

      elsif :close == block_to_upload
        to_commit = :commit

      # ignore :trigger as they are only to casue check timeout
      elsif :wakeup == block_to_upload # ignore :wakeup
        next

      else
        while @io_queue.length > 0
          next_block = @io_queue.pop
          next if :wakeup == next_block # ignore :wakeup
          break if :close == next_block
          break if blob_full?( next_block )
          break unless block_to_upload.concat( next_block )
          next_block = nil 
        end
      end

      unless to_commit
        timer.set( block_to_upload.oldest_event_time + @blob_max_delay, nil ) {|object| @io_queue << :wakeup if 0 == @io_queue.length } if blob_empty?
        upload( block_to_upload )
        block_to_upload = nil # release reference to resource for GC
        to_commit = :commit if blob_full?
      end

      if to_commit
        commit unless @uploaded_block_ids.empty?
        to_commit = nil
        @uploaded_block_ids = [  ]
        timer.cancel
        break if :close == block_to_upload
      end
    end
  end
end

#launch_file_pipe_threadObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 103

def launch_file_pipe_thread
  Thread.new do
    loop do
      file_to_upload = @io_queue.pop

      break if :close == file_to_upload

      file_to_upload.open_read
      @file_size = file_to_upload.file_size

      while block = file_to_upload.get_next_block
        unless upload( block )
          # start the file from the begining
          file_to_upload.close_read
          @channel.recover_later_file_upload( file_to_upload )
          file_to_upload = nil
          break
        end
      end

      if file_to_upload
        commit unless @uploaded_block_ids.empty?
        file_to_upload.dispose
        file_to_upload = nil
      end

      @uploaded_block_ids = [  ]
    end
  end
end

#queue_sizeObject



142
143
144
# File 'lib/logstash/outputs/application_insights/upload_pipe.rb', line 142

def queue_size
  @io_queue.length
end