Class: Kinetic::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/kinetic/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, app) ⇒ Worker

Returns a new instance of Worker.



8
9
10
11
12
13
# File 'lib/kinetic/worker.rb', line 8

def initialize(id, app)
  @app = app
  logger.debug "Initializing worker #{id}"
  @id = id
  @to_io, @master = Kgio::Pipe.new.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



6
7
8
# File 'lib/kinetic/worker.rb', line 6

def app
  @app
end

#channelObject (readonly)

Returns the value of attribute channel.



6
7
8
# File 'lib/kinetic/worker.rb', line 6

def channel
  @channel
end

#exchangesObject (readonly)

Returns the value of attribute exchanges.



6
7
8
# File 'lib/kinetic/worker.rb', line 6

def exchanges
  @exchanges
end

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/kinetic/worker.rb', line 6

def id
  @id
end

Instance Method Details

#atfork_childObject



20
21
22
23
# File 'lib/kinetic/worker.rb', line 20

def atfork_child
  logger.debug "Worker #{id} closing pipe to master"
  @master = @master.close
end

#atfork_parentObject



15
16
17
18
# File 'lib/kinetic/worker.rb', line 15

def atfork_parent
  logger.debug "Worker #{id} closing pipe to child"
  @to_io = @to_io.close
end

#runObject



25
26
27
28
29
30
31
32
33
34
# File 'lib/kinetic/worker.rb', line 25

def run
  logger.debug "Establishing connection host: '#{config.host}', port: '#{config.port}'"
  AMQP.start(host: config.host, port: config.port) do |connection|
    logger.debug "AMQP started with conneciton #{connection}"
    initialize_channel!(connection)
    initialize_exchanges!
    initialize_subscribers!
    initialize_timed_events!
  end
end

#soft_kill(sig) ⇒ Object

master sends fake signals to children



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kinetic/worker.rb', line 37

def soft_kill(sig) # :nodoc:
  case sig
  when Integer
    signum = sig
  else
    signum = Signal.list[sig.to_s] or
      raise ArgumentError, "BUG: bad signal: #{sig.inspect}"
  end
  # writing and reading 4 bytes on a pipe is atomic on all POSIX platforms
  # Do not care in the odd case the buffer is full, here.
  @master.kgio_trywrite([signum].pack('l'))
rescue Errno::EPIPE
  # worker will be reaped soon
end