Class: Hoss::Transport::Worker Private
- Inherits:
-
Object
- Object
- Hoss::Transport::Worker
- Includes:
- Logging
- Defined in:
- lib/hoss/transport/worker.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Defined Under Namespace
Classes: FlushMessage, StopMessage
Constant Summary
Constants included from Logging
Logging::LEVELS, Logging::PREFIX
Class Attribute Summary collapse
- .adapter ⇒ Object private
Instance Attribute Summary collapse
- #config ⇒ Object readonly private
- #connection ⇒ Object readonly private
- #filters ⇒ Object readonly private
- #name ⇒ Object readonly private
- #queue ⇒ Object readonly private
- #serializers ⇒ Object readonly private
Instance Method Summary collapse
-
#initialize(config, queue, serializers:, filters:) ⇒ Worker
constructor
private
A new instance of Worker.
- #work_forever ⇒ Object private
Methods included from Logging
#debug, #error, #fatal, #info, #warn
Constructor Details
#initialize(config, queue, serializers:, filters:) ⇒ Worker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Worker.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/hoss/transport/worker.rb', line 44 def initialize( config, queue, serializers:, filters: ) @config = config @queue = queue @serializers = serializers @filters = filters @connection = self.class.adapter.new(config) end |
Class Attribute Details
.adapter ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
31 32 33 |
# File 'lib/hoss/transport/worker.rb', line 31 def adapter @adapter ||= Connection end |
Instance Attribute Details
#config ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 60 61 |
# File 'lib/hoss/transport/worker.rb', line 59 def config @config end |
#connection ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 60 61 |
# File 'lib/hoss/transport/worker.rb', line 59 def connection @connection end |
#filters ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 60 61 |
# File 'lib/hoss/transport/worker.rb', line 59 def filters @filters end |
#name ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 60 61 |
# File 'lib/hoss/transport/worker.rb', line 59 def name @name end |
#queue ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 60 61 |
# File 'lib/hoss/transport/worker.rb', line 59 def queue @queue end |
#serializers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 60 61 |
# File 'lib/hoss/transport/worker.rb', line 59 def serializers @serializers end |
Instance Method Details
#work_forever ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/hoss/transport/worker.rb', line 60 def work_forever done = false while (!done && msg = queue.pop) begin debug 'working message', msg # wait if we don't have a config while config.agentConfig.nil? sleep 0.1 end case msg when StopMessage debug 'Stopping worker -- %s', self done = true break else batch = [] current_batch_size = 0 # Use this as the first message in the batch event = msg.filtered ? msg : filter_resource(msg) unless event.nil? event_size = resource_size(event) if current_batch_size + event_size <= @config.batch_size unless host_blacklisted(event) current_batch_size += event_size if event.retries < @config.max_event_retries batch.push(event) else debug "max retries hit for event" end end else debug "Event is too large, body needs to be truncated" end end # Do inner loop reading queue to build report = [] while current_batch_size < @config.batch_size && !queue.empty? next_msg = queue.pop case next_msg when StopMessage debug 'Stopping worker -- %s', self done = true break else event = next_msg.filtered ? next_msg : filter_resource(next_msg) unless event.nil? event_size = resource_size(event) if current_batch_size + event_size <= @config.batch_size unless host_blacklisted(event) current_batch_size += event_size if event.retries < @config.max_event_retries batch.push(event) else debug "max retries hit for event" end end else debug "Event too large for this batch, requeue" .push(event) end end end end if batch.length == 0 debug "batch is empty, breaking" break end debug "Requeue #{.length} messages" if .length > 0 .each {|msg| queue.push(msg, false) } report = Report.new report.events = batch.map {|event| serializers.serialize(event) } debug "Finished building report" data = serializers.serialize(report) json = JSON.fast_generate(data) begin debug json rescue Exception => e debug 'unable to print body' puts json if config.debug end begin if config.disable_reporting debug "Reprting disabled, skipping" else connection.write(json) end rescue Exception => e error format('Failed send report: %s %s', e.inspect, e.backtrace) batch.each do |m| m.retries += 1 queue.push(m, false) end sleep 1 end end rescue Exception => e debug "error in worker #{e.inspect}" end end rescue Exception => e warn 'Worker died with exception: %s', e.inspect debug e.backtrace.join("\n") end |