Class: Nifi
- Inherits:
-
Object
- Object
- Nifi
- Defined in:
- lib/nifi_sdk_ruby.rb
Constant Summary collapse
- DEFAULT_HOST =
'localhost'
- DEFAULT_SCHEMA =
'http'
- DEFAULT_PORT =
8080
- DEFAULT_DEBUG =
false
- DEFAULT_ACYNC =
false
Instance Method Summary collapse
- #create_process_group(*args) ⇒ Object
- #create_template_instance(*args) ⇒ Object
- #delete_process_group(id = nil) ⇒ Object
- #delete_template(id = nil) ⇒ Object
- #get_api_key ⇒ Object
- #get_async ⇒ Object
- #get_base_url ⇒ Object
- #get_conection_status(id = nil) ⇒ Object
- #get_conection_status_history(id = nil) ⇒ Object
- #get_debug ⇒ Object
- #get_flow_status ⇒ Object
- #get_host ⇒ Object
- #get_process(id = nil) ⇒ Object
- #get_process_group(id = nil) ⇒ Object
- #get_process_group_by_name(name = nil) ⇒ Object
- #get_resources ⇒ Object
- #get_schema ⇒ Object
- #get_template_by_name(name = nil) ⇒ Object
-
#initialize(*args) ⇒ Nifi
constructor
A new instance of Nifi.
- #process_group_by_name?(name = nil) ⇒ Boolean
- #set_async(async = nil) ⇒ Object
- #set_debug(debug = nil) ⇒ Object
- #start_process(*args) ⇒ Object
- #stop_process(*args) ⇒ Object
- #template_by_id?(id = nil) ⇒ Boolean
- #template_by_name?(name = nil) ⇒ Boolean
-
#update_process(*args) ⇒ Object
NiFi has a classic Optimistic Locking pattern implementation which means that your call will only be accepted and processed as valid if you specify the right version of the component in the revision -> version field.
- #upload_template(*args) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Nifi
Returns a new instance of Nifi.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/nifi_sdk_ruby.rb', line 35 def initialize(*args) args = args.reduce Hash.new, :merge @@schema = args[:schema] ? args[:schema] : DEFAULT_SCHEMA @@host = args[:host] ? args[:host] : DEFAULT_HOST @@port = args[:port] ? args[:port] : DEFAULT_PORT @@base_url = @@schema + '://' + @@host + ':' + @@port.to_s + '/nifi-api' @@debug = DEFAULT_DEBUG @@async = DEFAULT_ACYNC @@sdk_name = 'ruby' @@sdk_version = NifiSdkRuby::VERSION @@client_id = SecureRandom.uuid @@cert = args[:cert] ? args[:cert] : nil @@cert_key = args[:cert_key] ? args[:cert_key] : nil @@cert_password = args[:cert_password] ? args[:cert_key] : nil end |
Instance Method Details
#create_process_group(*args) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/nifi_sdk_ruby.rb', line 140 def create_process_group(*args) args = args.reduce Hash.new, :merge if args[:name].nil? raise ArgumentError.new('name params is mandatory.') end name = args[:name].to_s if self.process_group_by_name? name raise ArgumentError.new('The process group ' << name << ' already exists') end params = '{"revision":{"clientId":"' << @@client_id + '","version":0},"component":{"name":"' << name << '","position":{"x":274.54776144527517,"y":-28.886681059739686}}}' process_group = args[:id] ? args[:id] : 'root' base_url = @@base_url + '/process-groups/' << process_group << '/process-groups' self.class.http_client(base_url, 'POSTRAW', params) end |
#create_template_instance(*args) ⇒ Object
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/nifi_sdk_ruby.rb', line 292 def create_template_instance(*args) args = args.reduce Hash.new, :merge if args[:id].nil? and args[:name].nil? raise ArgumentError.new('either specify id of the template or it\'s name ') end if args[:name] raise StandardError.new('Could not find template called ' << args[:name]) unless template_by_name?(args[:name]) id = get_template_by_name(args[:name])[0][0] else raise StandardError.new('Could not find template with id ' << args[:id]) unless template_by_id?(args[:id]) id = args[:id] end originX = args[:originX] ? args[:originX].to_s : '0.0' originY = args[:originY] ? args[:originY].to_s : '0.0' process_group = args[:process_group_id] ? args[:process_group_id] : 'root' params = '{"templateId": "' << id << '", "originX": ' << originX << ', "originY": ' << originY << '}' base_url = @@base_url + '/process-groups/' << process_group << '/template-instance' self.class.http_client(base_url, 'POSTRAW', params) end |
#delete_process_group(id = nil) ⇒ Object
159 160 161 162 163 164 165 166 167 |
# File 'lib/nifi_sdk_ruby.rb', line 159 def delete_process_group(id = nil) if id.nil? raise ArgumentError.new('id is mandatory.') end base_url = @@base_url + '/process-groups/' << id << '?clientId=' << @@client_id + '&version=1' self.class.http_client(base_url, 'DELETE') end |
#delete_template(id = nil) ⇒ Object
355 356 357 358 359 360 361 362 363 |
# File 'lib/nifi_sdk_ruby.rb', line 355 def delete_template(id = nil) if id.nil? raise ArgumentError.new('id is mandatory.') end base_url = @@base_url + '/templates/' << id self.class.http_client(base_url, 'DELETE') end |
#get_api_key ⇒ Object
85 86 87 |
# File 'lib/nifi_sdk_ruby.rb', line 85 def get_api_key @@api_key end |
#get_async ⇒ Object
81 82 83 |
# File 'lib/nifi_sdk_ruby.rb', line 81 def get_async @@async end |
#get_base_url ⇒ Object
97 98 99 |
# File 'lib/nifi_sdk_ruby.rb', line 97 def get_base_url @@base_url end |
#get_conection_status(id = nil) ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/nifi_sdk_ruby.rb', line 112 def get_conection_status(id = nil) if id.nil? raise ArgumentError.new('name params is mandatory.') end base_url = @@base_url + "/flow/connections/#{id}/status" self.class.http_client(base_url) end |
#get_conection_status_history(id = nil) ⇒ Object
122 123 124 125 126 127 128 129 130 |
# File 'lib/nifi_sdk_ruby.rb', line 122 def get_conection_status_history(id = nil) if id.nil? raise ArgumentError.new('name params is mandatory.') end base_url = @@base_url + "/flow/connections/#{id}/status/history" self.class.http_client(base_url) end |
#get_debug ⇒ Object
65 66 67 |
# File 'lib/nifi_sdk_ruby.rb', line 65 def get_debug @@debug end |
#get_flow_status ⇒ Object
106 107 108 109 110 |
# File 'lib/nifi_sdk_ruby.rb', line 106 def get_flow_status() base_url = @@base_url + '/flow/status' self.class.http_client(base_url) end |
#get_host ⇒ Object
93 94 95 |
# File 'lib/nifi_sdk_ruby.rb', line 93 def get_host @@host end |
#get_process(id = nil) ⇒ Object
188 189 190 191 192 193 194 195 |
# File 'lib/nifi_sdk_ruby.rb', line 188 def get_process(id = nil) if id.nil? raise ArgumentError.new('id is mandatory.') end url = @@base_url + '/processors/' << id self.class.http_client(url) end |
#get_process_group(id = nil) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/nifi_sdk_ruby.rb', line 132 def get_process_group(id = nil) process_group = id ? id : 'root' base_url = @@base_url + '/process-groups/' << process_group self.class.http_client(base_url) end |
#get_process_group_by_name(name = nil) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/nifi_sdk_ruby.rb', line 169 def get_process_group_by_name(name = nil) if name.nil? raise ArgumentError.new('name is mandatory.') end res = self.class.exists pg = res.select do |r| r['name'] == name and r['identifier'] =~ /process-groups/ end if pg.count == 1 self.class.get pg[0]['identifier'] else raise ArgumentError.new('Unable to locate group with name ' << name) end end |
#get_resources ⇒ Object
101 102 103 104 |
# File 'lib/nifi_sdk_ruby.rb', line 101 def get_resources() base_url = @@base_url + '/resources' self.class.http_client(base_url) end |
#get_schema ⇒ Object
89 90 91 |
# File 'lib/nifi_sdk_ruby.rb', line 89 def get_schema @@schema end |
#get_template_by_name(name = nil) ⇒ Object
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/nifi_sdk_ruby.rb', line 365 def get_template_by_name(name = nil) if name.nil? raise ArgumentError.new('name is mandatory.') end res = self.class.exists t = res.select do |r| r['name'] == name and r['identifier'] =~ /^\/templates\// end if t.count == 1 t[0]['identifier'].scan(/\/templates\/(.*)/)[0][0] else raise StandardError.new('Unable to locate template with name ' << name) end end |
#process_group_by_name?(name = nil) ⇒ Boolean
277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/nifi_sdk_ruby.rb', line 277 def process_group_by_name?(name = nil) if name.nil? raise ArgumentError.new('name is mandatory.') end res = self.class.exists pg = res.select do |r| r['name'] == name and r['identifier'] =~ /process-groups/ end pg.count == 1 ? true : false end |
#set_async(async = nil) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/nifi_sdk_ruby.rb', line 69 def set_async(async = nil) if async.nil? raise ArgumentError.new('missing async') end if !(!!async == async) raise TypeError.new('async must be a boolean') end @@async = async end |
#set_debug(debug = nil) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/nifi_sdk_ruby.rb', line 53 def set_debug(debug = nil) if debug.nil? raise ArgumentError.new('missing debug') end if !(!!debug == debug) raise TypeError.new('debug must be a boolean') end @@debug = debug end |
#start_process(*args) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/nifi_sdk_ruby.rb', line 197 def start_process(*args) args = args.reduce Hash.new, :merge if args[:id].nil? or args[:version].nil? raise ArgumentError.new('id and version params are mandatory') end id = args[:id].to_s version = args[:version].to_s params = { revision:{ version: version }, id: id, component:{ id: id, state: 'RUNNING' }, status:{ runStatus: 'Running' } } update_process(id: id, update_json: params) end |
#stop_process(*args) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/nifi_sdk_ruby.rb', line 222 def stop_process(*args) args = args.reduce Hash.new, :merge if args[:id].nil? or args[:version].nil? raise ArgumentError.new('id and version params are mandatory') end id = args[:id].to_s version = args[:version].to_s params = { revision:{ version: version }, id: id, component:{ id: id, state: 'STOPPED' }, status:{ runStatus: 'Stopped' } } update_process(id: id, update_json: params) end |
#template_by_id?(id = nil) ⇒ Boolean
401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/nifi_sdk_ruby.rb', line 401 def template_by_id?(id = nil) if id.nil? raise ArgumentError.new('id is mandatory.') end res = self.class.exists pg = res.select do |r| r['identifier'] == '/templates/' << id end pg.count == 1 ? true : false end |
#template_by_name?(name = nil) ⇒ Boolean
386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/nifi_sdk_ruby.rb', line 386 def template_by_name?(name = nil) if name.nil? raise ArgumentError.new('name is mandatory.') end res = self.class.exists pg = res.select do |r| r['name'] == name and r['identifier'] =~ /templates/ end pg.count == 1 ? true : false end |
#update_process(*args) ⇒ Object
NiFi has a classic Optimistic Locking pattern implementation which means that your call will only be accepted and processed as valid if you specify the right version of the component in the revision -> version field
ArgumentError is raised if you haven’t specified required arguments or they are invalid
Updates the process by id using given JSON string or Hash Params: :id => the id of the process :update_json => json with updated values, could be either a JSON string or Ruby Hash
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/nifi_sdk_ruby.rb', line 257 def update_process(*args) args = args.reduce Hash.new, :merge if args[:id].nil? or args[:update_json].nil? raise ArgumentError.new('id and update_json params are mandatory') end id = args[:id].to_s case args[:update_json] when Hash params = args[:update_json].to_json when String params = args[:update_json] else raise ArgumentError.new('update_json param must be either a Hash or a String') end base_url = @@base_url + '/processors/' << id self.class.http_client(base_url, 'PUT', params) end |
#upload_template(*args) ⇒ Object
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
# File 'lib/nifi_sdk_ruby.rb', line 315 def upload_template(*args) args = args.reduce Hash.new, :merge if args[:path].nil? raise ArgumentError.new('path params is mandatory.') end path = args[:path] if path =~ URI::regexp download_s = open(path) download_t = '/tmp/' << download_s.base_uri.to_s.split('/')[-1] IO.copy_stream(download_s, download_t) path = download_t end if not File.file? path or not File.readable? path raise IOError.new('Access to ' <<path << ' failed') end t = File.open(path) { |f| Nokogiri::XML(f) } name = t.xpath('//template/name').text if self.template_by_name? name self.delete_template self.get_template_by_name name #raise StandardError.new('The template ' << name << ' already exists') end params = Array.new params << Curl::PostField.file('template', path) process_group = args[:id] ? args[:id] : 'root' base_url = @@base_url + '/process-groups/' << process_group << '/templates/upload' res = self.class.http_client(base_url, 'POST', params) return res['templateEntity']['template'] end |