Class: Karafka::Swarm::Pidfd

Inherits:
Object
  • Object
show all
Extended by:
FFI::Library
Defined in:
lib/karafka/swarm/pidfd.rb

Overview

Pidfd Linux representation wrapped with Ruby for communication within Swarm It is more stable than using ‘#pid` and `#ppid` + signals and cheaper

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pid) ⇒ Pidfd

Returns a new instance of Pidfd.

Parameters:

  • pid (Integer)

    pid of the node we want to work with



63
64
65
66
67
68
69
# File 'lib/karafka/swarm/pidfd.rb', line 63

def initialize(pid)
  @mutex = Mutex.new

  @pid = pid
  @pidfd = open(pid)
  @pidfd_io = IO.new(@pidfd)
end

Class Method Details

.supported?Boolean

Returns true if syscall is supported via FFI.

Returns:

  • (Boolean)

    true if syscall is supported via FFI



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/karafka/swarm/pidfd.rb', line 43

def supported?
  # If we were not even able to load the FFI C lib, it won't be supported
  return false unless API_SUPPORTED
  # Won't work on macOS because it does not support pidfd
  return false if RUBY_DESCRIPTION.include?('darwin')
  # Won't work on Windows for the same reason as on macOS
  return false if RUBY_DESCRIPTION.match?(/mswin|ming|cygwin/)

  # There are some OSes like BSD that will have C lib for FFI bindings but will not support
  # the needed syscalls. In such cases, we can just try and fail, which will indicate it
  # won't work. The same applies to using new glibc on an old kernel.
  new(::Process.pid)

  true
rescue Errors::PidfdOpenFailedError
  false
end

Instance Method Details

#alive?Boolean

Returns true if given process is alive, false if no longer.

Returns:

  • (Boolean)

    true if given process is alive, false if no longer



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/swarm/pidfd.rb', line 72

def alive?
  @pidfd_select ||= [@pidfd_io]

  if @mutex.owned?
    return false if @cleaned

    IO.select(@pidfd_select, nil, nil, 0).nil?
  else
    @mutex.synchronize do
      return false if @cleaned

      IO.select(@pidfd_select, nil, nil, 0).nil?
    end
  end
end

#cleanupObject

Note:

This should run only on processes that exited, otherwise will wait

Cleans the zombie process



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/karafka/swarm/pidfd.rb', line 90

def cleanup
  @mutex.synchronize do
    return if @cleaned

    waitid(P_PIDFD, @pidfd, nil, WEXITED)

    @pidfd_io.close
    @pidfd_select = nil
    @pidfd_io = nil
    @pidfd = nil
    @cleaned = true
  end
end

#signal(sig_name) ⇒ Boolean

Note:

It will not send signals to dead processes

Sends given signal to the process using its pidfd

Parameters:

  • sig_name (String)

    signal name

Returns:

  • (Boolean)

    true if signal was sent, otherwise false or error raised. ‘false` returned when we attempt to send a signal to a dead process



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/karafka/swarm/pidfd.rb', line 109

def signal(sig_name)
  @mutex.synchronize do
    return false if @cleaned
    # Never signal processes that are dead
    return false unless alive?

    result = fdpid_signal(
      pidfd_signal_syscall,
      @pidfd,
      Signal.list.fetch(sig_name),
      nil,
      0
    )

    return true if result.zero?

    raise Errors::PidfdSignalFailedError, result
  end
end