Class: KubernetesOperator

Inherits:
Object
  • Object
show all
Defined in:
lib/kubernetes-operator.rb

Instance Method Summary collapse

Constructor Details

#initialize(crdGroup, crdVersion, crdPlural, options = {}) ⇒ KubernetesOperator

Operator Class to run the core operator functions for your crd

Parameters:

  • group (string)

    Group from crd

  • version (string)

    Api Version from crd

  • plural (string)

    Name (plural) from crd

  • options (Hash) (defaults to: {})

    Additional options

Options Hash (options):

  • sleepTimer (Hash)

    Time to wait for retry if the watch event stops

  • namespace (Hash)

    Watch only an namespace, default watch all namespaces

  • persistence_location (Hash)

    Location for the yaml store, default is /tmp/persistence

See Also:



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/kubernetes-operator.rb', line 121

def initialize(crdGroup, crdVersion, crdPlural, options = {} )
    # parameter
    @crdGroup = crdGroup
    @crdVersion = crdVersion
    @crdPlural = crdPlural

    # default config
    @options = options
    @options[:sleepTimer] ||= 10
    @options[:namespace] ||= nil

    # create persistence
    @options[:persistence_location] ||= "/tmp/persistence"
    Dir.mkdir(@options[:persistence_location]) unless File.exists?(@options[:persistence_location])
    @store = YAML::Store.new("#{@options[:persistence_location]}/#{@crdGroup}_#{@crdVersion}_#{@crdPlural}.yaml")

    # logging
    @logger = Log4r::Logger.new('Log4RTest')
    outputter = Log4r::StdoutOutputter.new(
        "console",
        :formatter => Log4r::JSONFormatter::Base.new("#{crdPlural}.#{@crdGroup}/#{@crdVersion}")
    )
    @logger.add(outputter)

    # kubeconfig
    # (for local development it's nice to use .kube/config)
    if File.exist?("#{Dir.home}/.kube/config")
        @logger.info("use local kube config")
        config = Kubeclient::Config.read(ENV['KUBECONFIG'] || "#{ENV['HOME']}/.kube/config")
        context = config.context
        @k8sclient = Kubeclient::Client.new(
            context.api_endpoint+"/apis/"+@crdGroup,
            @crdVersion,
            ssl_options: context.ssl_options,
            auth_options: context.auth_options
        )
    else
        @logger.info("use incluster config")
        auth_options = {
            bearer_token_file: '/var/run/secrets/kubernetes.io/serviceaccount/token'
        }
        ssl_options = {}
        if File.exist?("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
            ssl_options[:ca_file] = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
        end
        @k8sclient = Kubeclient::Client.new(
            'https://kubernetes.default.svc/apis/'+@crdGroup,
            @crdVersion,
            auth_options: auth_options,
            ssl_options:  ssl_options
        )
    end

    # event helper
    @eventHelper = EventHelper.new(@logger)
end

Instance Method Details

#getEventHelperEventHelper

get the event helper from the operatore framework

Returns:



216
217
218
# File 'lib/kubernetes-operator.rb', line 216

def getEventHelper()
    return @eventHelper
end

#getLoggerLog4r::Logger

get the logger from the operatore framework

Returns:

  • (Log4r::Logger)

    logger



210
211
212
# File 'lib/kubernetes-operator.rb', line 210

def getLogger()
    return @logger
end

#runObject

start the operator to watch your cr



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/kubernetes-operator.rb', line 221

def run
    @logger.info("start the operator")
    # load methods
    @addMethod = method(:defaultActionMethod) unless @addMethod
    @updateMethod = method(:defaultActionMethod) unless @updateMethod
    @deleteMethod = method(:defaultActionMethod) unless @deleteMethod

    while true
        begin
            if @options[:namespace]
                watcher = @k8sclient.watch_entities(@crdPlural,@options[:namespace])
            else
                watcher = @k8sclient.watch_entities(@crdPlural)
            end
            watcher.each do |notice|
                begin
                    isCached = @store.transaction{@store[notice[:object][:metadata][:uid]]}
                    case notice[:type]
                    # new cr was added
                    when "ADDED"
                        # check if version is already processed
                        unless isCached
                            # trigger action
                            @logger.info("trigger add action for #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]})")
                            resp = @addMethod.call(notice[:object])
                            # update status
                            if resp.is_a?(Hash) && resp[:status]
                                @k8sclient.patch_entity(@crdPlural,notice[:object][:metadata][:name]+"/status", {status: resp[:status]},'merge-patch',@options[:namespace])
                            end
                            # add finalizer
                            @logger.info("add finalizer to #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]})")
                            patched = @k8sclient.patch_entity(@crdPlural,notice[:object][:metadata][:name], {metadata: {finalizers: ["#{@crdPlural}.#{@crdVersion}.#{@crdGroup}"]}},'merge-patch',@options[:namespace])
                            # save version
                            @store.transaction do
                                @store[patched[:metadata][:uid]] = patched[:metadata][:resourceVersion]
                                @store.commit
                            end
                        else
                            @logger.info("skip add action for #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]}), found version in cache")
                        end
                    # cr was change or deleted (if finalizer is set, it an modified call, not an delete)
                    when "MODIFIED"
                        # check if version is already processed
                        if isCached.to_i < notice[:object][:metadata][:resourceVersion].to_i
                            # check if it's an delete event
                            unless notice[:object][:metadata][:deletionTimestamp]
                                # trigger action
                                @logger.info("trigger update action for #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]})")
                                resp = @updateMethod.call(notice[:object])
                                # update status
                                if resp[:status]
                                    @k8sclient.patch_entity(@crdPlural,notice[:object][:metadata][:name]+"/status", {status: resp[:status]},'merge-patch',@options[:namespace])
                                end
                                # save version
                                @store.transaction do
                                    @store[notice[:object][:metadata][:uid]] = notice[:object][:metadata][:resourceVersion]
                                    @store.commit
                                end
                            else
                                # trigger action
                                @logger.info("trigger delete action for #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]})")
                                @deleteMethod.call(notice[:object])
                                # remove finalizer
                                @logger.info("remove finalizer to #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]})")
                                patched = @k8sclient.patch_entity(@crdPlural,notice[:object][:metadata][:name], {metadata: {finalizers: nil}},'merge-patch',@options[:namespace])
                            end
                        else
                            @logger.info("skip update action for #{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]}), found version in cache")
                        end
                    when "DELETED"
                        # clear events, if delete was successfuly
                        @logger.info("#{notice[:object][:metadata][:name]} (#{notice[:object][:metadata][:uid]}) is done, clean up events")
                        @eventHelper.deleteAll(notice[:object])
                    else
                        # upsi, something wrong
                        @logger.info("strange things are going on here, I found the type "+notice[:type])
                    end
                rescue => exception
                    @logger.error(exception.inspect)
                end
            end
            # watcher is done, lost connection ...
            watcher.finish
        rescue => exception
            @logger.error(exception.inspect)
        end

        # done
        sleep @options[:sleepTimer]

    end
end

#setAddMethod(callback) ⇒ Object

Set the method that was triggert then an add cr event occurred

Parameters:

  • callback (methode)

    Ruby methode



182
183
184
# File 'lib/kubernetes-operator.rb', line 182

def setAddMethod(callback)
    @addMethod = callback
end

#setDeleteMethod(callback) ⇒ Object

Set the method that was triggert then an delete cr event occurred

Parameters:

  • callback (methode)

    Ruby methode



202
203
204
# File 'lib/kubernetes-operator.rb', line 202

def setDeleteMethod(callback)
    @deleteMethod = callback
end

#setUpdateMethod(callback) ⇒ Object

Set the method that was triggert then an update cr event occurred

Parameters:

  • callback (methode)

    Ruby methode



188
189
190
# File 'lib/kubernetes-operator.rb', line 188

def setUpdateMethod(callback)
    @updateMethod = callback
end

#setUpsertMethod(callback) ⇒ Object

Set the method that was triggert then an add or update cr event occurred This function overwrite setAddMethod and setUpdateMethod

Parameters:

  • callback (methode)

    Ruby methode



195
196
197
198
# File 'lib/kubernetes-operator.rb', line 195

def setUpsertMethod(callback)
    @updateMethod = callback
    @addMethod = callback
end