Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/command/sync.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.



507
508
509
510
511
512
513
514
515
516
# File 'lib/flydata/command/sync.rb', line 507

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count.



526
527
528
# File 'lib/flydata/command/sync.rb', line 526

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.



526
527
528
# File 'lib/flydata/command/sync.rb', line 526

def buffer_size
  @buffer_size
end

Instance Method Details

#closeObject



620
621
622
# File 'lib/flydata/command/sync.rb', line 620

def close
  flush
end

#connect(server) ⇒ Object



597
598
599
600
601
602
603
604
605
606
607
608
# File 'lib/flydata/command/sync.rb', line 597

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object



528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/flydata/command/sync.rb', line 528

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  if @buffer_size > @buffer_size_limit
    send
  else
    false
  end
end

#flushObject



616
617
618
# File 'lib/flydata/command/sync.rb', line 616

def flush
  send
end

#pickup_serverObject

TODO: Check server status



588
589
590
591
592
593
594
595
# File 'lib/flydata/command/sync.rb', line 588

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject



610
611
612
613
614
# File 'lib/flydata/command/sync.rb', line 610

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic



544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
# File 'lib/flydata/command/sync.rb', line 544

def send
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}"
      raise e
    end
    puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}"
    wait_time = RETRY_INTERVAL ** retry_count
    puts "  Now waiting for next retry. time=#{wait_time}sec"
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  true
end

#set_options(options) ⇒ Object



518
519
520
521
522
523
524
# File 'lib/flydata/command/sync.rb', line 518

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end