Class: DatTCP::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-tcp.rb

Defined Under Namespace

Modules: TCPServer Classes: LoggerProxy, NullLoggerProxy, State

Constant Summary collapse

DEFAULT_BACKLOG_SIZE =
1024
DEFAULT_SHUTDOWN_TIMEOUT =
15
DEFAULT_NUM_WORKERS =
2
SIGNAL =
'.'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(worker_class, options = nil) ⇒ Server

Returns a new instance of Server.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/dat-tcp.rb', line 18

def initialize(worker_class, options = nil)
  if !worker_class.kind_of?(Class) || !worker_class.include?(DatTCP::Worker)
    raise ArgumentError, "worker class must include `#{DatTCP::Worker}`"
  end

  options ||= {}
  @backlog_size     = options[:backlog_size]     || DEFAULT_BACKLOG_SIZE
  @shutdown_timeout = options[:shutdown_timeout] || DEFAULT_SHUTDOWN_TIMEOUT

  @signal_reader, @signal_writer = IO.pipe

  @logger_proxy = if options[:logger]
    LoggerProxy.new(options[:logger])
  else
    NullLoggerProxy.new
  end

  @worker_pool = DatWorkerPool.new(worker_class, {
    :num_workers   => (options[:num_workers] || DEFAULT_NUM_WORKERS),
    :logger        => options[:logger],
    :worker_params => options[:worker_params]
  })

  @tcp_server = nil
  @thread     = nil
  @state      = State.new(:stop)
end

Instance Method Details

#client_file_descriptorsObject



58
59
60
# File 'lib/dat-tcp.rb', line 58

def client_file_descriptors
  @worker_pool.work_items.map(&:fileno)
end

#file_descriptorObject



54
55
56
# File 'lib/dat-tcp.rb', line 54

def file_descriptor
  @tcp_server.fileno if self.listening?
end

#halt(wait = false) ⇒ Object



103
104
105
106
107
108
# File 'lib/dat-tcp.rb', line 103

def halt(wait = false)
  return unless self.running?
  @state.set :halt
  wakeup_thread
  wait_for_shutdown if wait
end

#inspectObject



110
111
112
113
114
115
116
# File 'lib/dat-tcp.rb', line 110

def inspect
  reference = '0x0%x' % (self.object_id << 1)
  "#<#{self.class}:#{reference}".tap do |s|
    s << " @ip=#{ip.inspect} @port=#{port.inspect}"
    s << ">"
  end
end

#ipObject



46
47
48
# File 'lib/dat-tcp.rb', line 46

def ip
  @tcp_server.addr[3] if self.listening?
end

#listen(*args) {|@tcp_server| ... } ⇒ Object

Yields:

  • (@tcp_server)

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
# File 'lib/dat-tcp.rb', line 70

def listen(*args)
  @state.set :listen
  @tcp_server = TCPServer.build(*args)
  raise ArgumentError, "takes ip and port or file descriptor" if !@tcp_server
  yield @tcp_server if block_given?
  @tcp_server.listen(@backlog_size)
end

#listening?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/dat-tcp.rb', line 62

def listening?
  !!@tcp_server
end

#pause(wait = false) ⇒ Object



89
90
91
92
93
94
# File 'lib/dat-tcp.rb', line 89

def pause(wait = false)
  return unless self.running?
  @state.set :pause
  wakeup_thread
  wait_for_shutdown if wait
end

#portObject



50
51
52
# File 'lib/dat-tcp.rb', line 50

def port
  @tcp_server.addr[1] if self.listening?
end

#running?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/dat-tcp.rb', line 66

def running?
  !!(@thread && @thread.alive?)
end

#start(passed_client_fds = nil) ⇒ Object

Raises:



83
84
85
86
87
# File 'lib/dat-tcp.rb', line 83

def start(passed_client_fds = nil)
  raise NotListeningError.new unless listening?
  @state.set :run
  @thread = Thread.new{ work_loop(passed_client_fds) }
end

#stop(wait = false) ⇒ Object



96
97
98
99
100
101
# File 'lib/dat-tcp.rb', line 96

def stop(wait = false)
  return unless self.running?
  @state.set :stop
  wakeup_thread
  wait_for_shutdown if wait
end

#stop_listenObject



78
79
80
81
# File 'lib/dat-tcp.rb', line 78

def stop_listen
  @tcp_server.close rescue false
  @tcp_server = nil
end