Class: Carnivore::Source::HttpSource
- Inherits:
-
Carnivore::Source
- Object
- Carnivore::Source
- Carnivore::Source::HttpSource
- Includes:
- Http::Utils::Params
- Defined in:
- lib/carnivore-http/http_source.rb
Overview
Carnivore HTTP source
Constant Summary collapse
- BODY_TO_FILE_SIZE =
Size limit for inline body
1024 * 10
Instance Attribute Summary collapse
-
#args ⇒ Hash
readonly
Source arguments.
-
#auth_allowed_origins ⇒ Array<IPAddr>
readonly
Allowed request origin addresses.
- #auth_htpasswd ⇒ HTAuth::PasswdFile readonly
- #retry_delivery ⇒ Carnivore::Http::RetryDelivery readonly
Instance Method Summary collapse
-
#allowed_credentials?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on config credentials.
-
#allowed_htpasswd?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on htpasswd file.
-
#allowed_origin?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on origin.
-
#authorized?(message) ⇒ TrueClass, FalseClass
Message is authorized for processing.
-
#auto_process? ⇒ Boolean
Always auto start.
-
#build_listener(&block) ⇒ Reel::Server::HTTP, Reel::Server::HTTPS
Initialize http listener correctly based on configuration.
-
#build_message(con, req) ⇒ Hash
Build message hash from request.
-
#confirm(message, args = {}) ⇒ Object
Confirm processing of message.
-
#default_args(args = {}) ⇒ Hash
Default configuration arguments.
-
#perform_transmission(message_id, payload, method, url, headers = {}) ⇒ NilClass
Transmit message to HTTP endpoint.
-
#retry_delivery_failure(actor, reason) ⇒ NilClass
Handle failed retry deliveries.
-
#retry_directory ⇒ String, NilClass
Directory storing failed messages.
-
#retry_write_directory ⇒ String, NilClass
Cache directory for initial writes.
-
#setup(args = {}) ⇒ Object
Setup the source.
-
#transmit(message, *extra) ⇒ Object
Tranmit message.
-
#write_for_retry(message_id, payload, method, url, headers) ⇒ TrueClass, FalseClass
Persist message if enabled for send retry.
Methods included from Http::Utils::Params
#format_query_args, #format_query_type, included, #parse_query_string
Instance Attribute Details
#args ⇒ Hash (readonly)
Returns source arguments.
17 18 19 |
# File 'lib/carnivore-http/http_source.rb', line 17 def args @args end |
#auth_allowed_origins ⇒ Array<IPAddr> (readonly)
Returns allowed request origin addresses.
21 22 23 |
# File 'lib/carnivore-http/http_source.rb', line 21 def auth_allowed_origins @auth_allowed_origins end |
#auth_htpasswd ⇒ HTAuth::PasswdFile (readonly)
23 24 25 |
# File 'lib/carnivore-http/http_source.rb', line 23 def auth_htpasswd @auth_htpasswd end |
#retry_delivery ⇒ Carnivore::Http::RetryDelivery (readonly)
19 20 21 |
# File 'lib/carnivore-http/http_source.rb', line 19 def retry_delivery @retry_delivery end |
Instance Method Details
#allowed_credentials?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on config credentials
164 165 166 167 168 169 170 |
# File 'lib/carnivore-http/http_source.rb', line 164 def allowed_credentials?() if(creds = args.get(:authorization, :credentials)) creds[[:message][:authentication][:username]] == [:message][:authentication][:password] else true end end |
#allowed_htpasswd?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on htpasswd file
147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/carnivore-http/http_source.rb', line 147 def allowed_htpasswd?() if(auth_htpasswd) entry = auth_htpasswd.fetch([:message][:authentication][:username]) if(entry) entry.authenticated?([:message][:authentication][:password]) else false end else true end end |
#allowed_origin?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on origin
176 177 178 179 180 181 182 183 184 |
# File 'lib/carnivore-http/http_source.rb', line 176 def allowed_origin?() if(auth_allowed_origins) !!auth_allowed_origins.detect do |allowed_check| allowed_check.include?([:message][:origin]) end else true end end |
#authorized?(message) ⇒ TrueClass, FalseClass
Authorization is driven via the source configuration. Valid structure looks like:
{
:type => 'http',
:args => {
:authorization => {
:allowed_origins => ['127.0.0.1', '192.168.0.2', '192.168.6.0/24'],
:htpasswd => '/path/to/htpasswd.file',
:credentials => {
:username1 => 'password1'
},
:valid_on => :all # or :any
}
}
}
When multiple authorization items are provided, the ‘:valid_on` will define behavior. It will default to `:all`.
Message is authorized for processing
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/carnivore-http/http_source.rb', line 121 def () if(args.fetch(:authorization)) valid_on = args.fetch(:authorization, :valid_on, :all).to_sym case valid_on when :all allowed_origin?() && allowed_htpasswd?() && allowed_credentials?() when :any allowed_origin?() || allowed_htpasswd?() || allowed_credentials?() when :none true else raise ArgumentError.new "Unknown authorization `:valid_on` provided! Given: #{valid_on}. Allowed: `any` or `all`" end else true end end |
#auto_process? ⇒ Boolean
Always auto start
96 97 98 |
# File 'lib/carnivore-http/http_source.rb', line 96 def auto_process? args.has_key?(:enable_processing) ? args[:enable_processing] : true end |
#build_listener(&block) ⇒ Reel::Server::HTTP, Reel::Server::HTTPS
Initialize http listener correctly based on configuration
295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/carnivore-http/http_source.rb', line 295 def build_listener(&block) if(args[:ssl]) ssl_config = Smash.new(args[:ssl][key].dup) [:key, :cert].each do |key| if(ssl_config[key]) ssl_config[key] = File.open(ssl_config.delete(key)) end end Reel::Server::HTTPS.supervise(args[:bind], args[:port], ssl_config, &block) else Reel::Server::HTTP.supervise(args[:bind], args[:port], &block) end end |
#build_message(con, req) ⇒ Hash
if body size is greater than BODY_TO_FILE_SIZE the body will be a temp file instead of a string
Build message hash from request
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/carnivore-http/http_source.rb', line 320 def (con, req) msg = Smash.new( :request => req, :headers => Smash[ req.headers.map{ |k,v| [k.downcase.tr('-', '_'), v]} ], :connection => con, :query => parse_query_string(req.query_string), :origin => req.remote_addr, :authentication => {} ) if(msg[:headers][:content_type] == 'application/json') msg[:body] = MultiJson.load( req.body.to_s ) elsif(msg[:headers][:content_type] == 'application/x-www-form-urlencoded') msg[:body] = parse_query_string( req.body.to_s ) if(msg[:body].size == 1 && msg[:body].values.first.is_a?(Array) && msg[:body].values.first.empty?) msg[:body] = msg[:body].keys.first end elsif(msg[:headers][:content_length].to_i > BODY_TO_FILE_SIZE) msg[:body] = Tempfile.new('carnivore-http') while((chunk = req.body.readpartial(2048))) msg[:body] << chunk end msg[:body].rewind else msg[:body] = req.body.to_s end if(msg[:headers][:authorization]) user, pass = Base64.urlsafe_decode64( msg[:headers][:authorization].split(' ').last ).split(':', 2) msg[:authentication] = { :username => user, :password => pass } end if(msg[:body].is_a?(Hash) && msg[:body][:id]) Smash.new( :raw => msg, :content => msg[:body].to_smash ) else msg end end |
#confirm(message, args = {}) ⇒ Object
Confirm processing of message
277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/carnivore-http/http_source.rb', line 277 def confirm(, args={}) unless([:message][:confirmed]) code = args.delete(:code) || :ok args[:response_body] = 'Thanks' if code == :ok && args.empty? body = args.delete(:response_body) debug "Confirming #{} with: Code: #{code.inspect} Args: #{args.inspect} Body: #{body}" [:message][:request].respond(code, *(args.empty? ? [body] : [args, body])) [:message][:connection].close [:message][:confirmed] = true else warn "Message was already confimed. Confirmation not sent! (#{})" end end |
#default_args(args = {}) ⇒ Hash
Default configuration arguments. If hash is provided, it will be merged into the default arguments.
86 87 88 89 90 91 92 93 |
# File 'lib/carnivore-http/http_source.rb', line 86 def default_args(args={}) Smash.new( :bind => '0.0.0.0', :port => '3000', :auto_respond => true, :retry_directory => '/tmp/.carnivore-resend' ).merge(args) end |
#perform_transmission(message_id, payload, method, url, headers = {}) ⇒ NilClass
Transmit message to HTTP endpoint
236 237 238 239 240 241 242 |
# File 'lib/carnivore-http/http_source.rb', line 236 def perform_transmission(, payload, method, url, headers={}) unless(retry_delivery.redeliver(, payload, method, url, headers)) write_for_retry(, payload, method, url, headers) retry_delivery.async.attempt_redelivery() end nil end |
#retry_delivery_failure(actor, reason) ⇒ NilClass
Handle failed retry deliveries
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/carnivore-http/http_source.rb', line 52 def retry_delivery_failure(actor, reason) if(actor == retry_delivery) if(reason) error "Failed RetryDelivery encountered: #{reason}. Rebuilding." @retry_delivery = Carnivore::Http::RetryDelivery.new(retry_directory) else info 'Encountered RetryDelivery failure. No reason so assuming teardown.' end else error "Unknown actor failure encountered: #{reason}" end nil end |
#retry_directory ⇒ String, NilClass
Returns directory storing failed messages.
67 68 69 70 71 |
# File 'lib/carnivore-http/http_source.rb', line 67 def retry_directory if(args[:retry_directory]) FileUtils.mkdir_p(File.join(args[:retry_directory], name.to_s)).first end end |
#retry_write_directory ⇒ String, NilClass
Returns cache directory for initial writes.
74 75 76 77 78 79 |
# File 'lib/carnivore-http/http_source.rb', line 74 def retry_write_directory base = retry_directory if(base) FileUtils.mkdir_p(File.join(base, '.write')).first end end |
#setup(args = {}) ⇒ Object
Setup the source
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/carnivore-http/http_source.rb', line 28 def setup(args={}) require 'fileutils' @args = default_args(args) @retry_delivery = Carnivore::Http::RetryDelivery.new(retry_directory) self.link retry_delivery if(args.get(:authorization, :allowed_origins)) require 'ipaddr' @allowed_origins = [args.get(:authorization, :allowed_origins)].flatten.compact.map do |origin_check| IPAddr.new(origin_check) end end if(args.get(:authorization, :htpasswd)) require 'htauth' @auth_htpasswd = HTAuth::PasswdFile.open( args.get(:authorization, :htpasswd) ) end end |
#transmit(message, *extra) ⇒ Object
Tranmit message. The transmission can be a response back to an open connection, or a request to a remote source (remote carnivore-http source generally)
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/carnivore-http/http_source.rb', line 192 def transmit(, *extra) = extra.detect{|x| x.is_a?(Hash)} || {} orig = extra.detect{|x| x.is_a?(Carnivore::Message)} con = [:connection] if(orig && con.nil?) con = orig[:message][:connection] end if(con) # response payload = .is_a?(String) ? : MultiJson.dump() # TODO: add `options` options for marshaling: json/xml/etc code = .fetch(:code, :ok) info "Transmit response type with code: #{code}" con.respond(code, payload) else # request if(args[:endpoint]) url = args[:endpoint] else url = "http#{'s' if args[:ssl]}://#{args[:bind]}" if(args[:port]) url << ":#{args[:port]}" end url = URI.join(url, args.fetch(:path, '/')).to_s end if([:path]) url = URI.join(url, [:path].to_s) end method = .fetch(:method, args.fetch(:method, :post) ).to_s.downcase.to_sym = .is_a?(Hash) ? .fetch(:id, Celluloid.uuid) : Celluloid.uuid payload = .is_a?(String) ? : MultiJson.dump() info "Transmit request type for Message ID: #{}" async.perform_transmission(.to_s, payload, method, url, .fetch(:headers, {})) end end |
#write_for_retry(message_id, payload, method, url, headers) ⇒ TrueClass, FalseClass
Persist message if enabled for send retry
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/carnivore-http/http_source.rb', line 252 def write_for_retry(, payload, method, url, headers) data = { :message_id => , :payload => payload, :method => method, :url => url, :headers => headers } if(retry_directory) stage_path = File.join(retry_write_directory, "#{}.json") final_path = File.join(retry_directory, File.basename(stage_path)) File.open(stage_path, 'w+') do |file| file.write MultiJson.dump(data) end FileUtils.move(stage_path, final_path) info "Failed message (ID: #{}) persisted for resend" true end end |