Class: STAN::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/stan/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject, opts = {}, cb) ⇒ Subscription

Returns a new instance of Subscription.



431
432
433
434
435
436
437
438
439
440
441
# File 'lib/stan/client.rb', line 431

def initialize(subject, opts={}, cb)
  @subject = subject
  @queue = opts[:queue]
  @inbox = STAN.create_inbox
  @sid = nil # inbox subscription sid
  @options = opts
  @cb = cb
  @ack_inbox = nil
  @stan = opts[:stan]
  @durable_name = opts[:durable_name]
end

Instance Attribute Details

#ack_inboxObject

Returns the value of attribute ack_inbox.



429
430
431
# File 'lib/stan/client.rb', line 429

def ack_inbox
  @ack_inbox
end

#cbObject (readonly)

Returns the value of attribute cb.



428
429
430
# File 'lib/stan/client.rb', line 428

def cb
  @cb
end

#durable_nameObject (readonly)

Returns the value of attribute durable_name.



428
429
430
# File 'lib/stan/client.rb', line 428

def durable_name
  @durable_name
end

#inboxObject (readonly)

Returns the value of attribute inbox.



428
429
430
# File 'lib/stan/client.rb', line 428

def inbox
  @inbox
end

#optionsObject (readonly)

Returns the value of attribute options.



428
429
430
# File 'lib/stan/client.rb', line 428

def options
  @options
end

#queueObject (readonly)

Returns the value of attribute queue.



428
429
430
# File 'lib/stan/client.rb', line 428

def queue
  @queue
end

#sidObject

Returns the value of attribute sid.



429
430
431
# File 'lib/stan/client.rb', line 429

def sid
  @sid
end

#stanObject (readonly)

Returns the value of attribute stan.



428
429
430
# File 'lib/stan/client.rb', line 428

def stan
  @stan
end

#subjectObject (readonly)

Returns the value of attribute subject.



428
429
430
# File 'lib/stan/client.rb', line 428

def subject
  @subject
end

Instance Method Details

#closeObject



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/stan/client.rb', line 480

def close
  synchronize do
    stan.nats.unsubscribe(self.sid)
  end

  # Make client stop tracking the subscription inbox
  # and grab close request subject under the lock.
  sub_close_subject = nil
  stan.synchronize do
    stan.sub_map.delete(self.ack_inbox)
    sub_close_subject = stan.sub_close_req_subject
  end

  sub_close_req = STAN::Protocol::UnsubscribeRequest.new({
    clientID: stan.client_id,
    subject: self.subject,
    inbox: self.ack_inbox
  })

  raw = stan.nats.request(sub_close_subject, sub_close_req.to_proto, {
    timeout: stan.options[:connect_timeout]
  })
  response = STAN::Protocol::SubscriptionResponse.decode(raw.data)
  unless response.error.empty?
    # FIXME: Error handling on unsubscribe/close
    raise Error.new(response.error)
  end
end

#unsubscribeObject

Unsubscribe removes interest in the subscription. For durables, it means that the durable interest is also removed from the server. Restarting a durable with the same name will not resume the subscription, it will be considered a new one.



447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/stan/client.rb', line 447

def unsubscribe
  synchronize do
    stan.nats.unsubscribe(self.sid)
  end

  # Make client stop tracking the subscription inbox
  # and grab unsub request subject under the lock.
  unsub_subject = nil
  stan.synchronize do
    stan.sub_map.delete(self.ack_inbox)
    unsub_subject = stan.unsub_req_subject
  end

  unsub_req = STAN::Protocol::UnsubscribeRequest.new({
    clientID: stan.client_id,
    subject: self.subject,
    inbox: self.ack_inbox
  })

  if self.durable_name
    unsub_req[:durableName] = self.durable_name
  end

  raw = stan.nats.request(unsub_subject, unsub_req.to_proto, {
    timeout: stan.options[:connect_timeout]
  })
  response = STAN::Protocol::SubscriptionResponse.decode(raw.data)
  unless response.error.empty?
    # FIXME: Error handling on unsubscribe
    raise Error.new(response.error)
  end
end