Class: Liebre::Actor::RPC::Client::Core

Inherits:
Object
  • Object
show all
Defined in:
lib/liebre/actor/rpc/client/core.rb

Constant Summary collapse

OPTS =
{:block => false, :manual_ack => false}
EXPIRE_INTERVAL =
60

Instance Method Summary collapse

Constructor Details

#initialize(client, resources, context, pending, task) ⇒ Core

Returns a new instance of Core.



10
11
12
13
14
15
16
# File 'lib/liebre/actor/rpc/client/core.rb', line 10

def initialize client, resources, context, pending, task
  @client    = client
  @resources = resources
  @context   = context
  @pending   = pending
  @task      = task
end

Instance Method Details

#cleanObject



50
51
52
53
# File 'lib/liebre/actor/rpc/client/core.rb', line 50

def clean
  request_exchange.delete
  response_queue.delete
end

#expireObject



46
47
48
# File 'lib/liebre/actor/rpc/client/core.rb', line 46

def expire
  pending.expire()
end

#reply(meta, response) ⇒ Object



42
43
44
# File 'lib/liebre/actor/rpc/client/core.rb', line 42

def reply meta, response
  pending.finish(meta.correlation_id, response)
end

#request(payload, opts, timeout) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/liebre/actor/rpc/client/core.rb', line 32

def request payload, opts, timeout
  r = pending.add(timeout) do |correlation_id|
    opts = opts.merge :reply_to       => response_queue.name,
                      :correlation_id => correlation_id

    request_exchange.publish(payload, opts)
    context.logger.info("request pending - correlation_id: #{correlation_id}")
  end
end

#startObject



18
19
20
21
22
23
24
25
# File 'lib/liebre/actor/rpc/client/core.rb', line 18

def start
  response_queue.subscribe(OPTS) do |_info, meta, payload|
    client.reply(meta, payload)
  end
  request_exchange

  task.every(EXPIRE_INTERVAL) { client.expire }
end

#stopObject



27
28
29
30
# File 'lib/liebre/actor/rpc/client/core.rb', line 27

def stop
  task.cancel_all
  chan.close
end