Class: Grumlin::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/grumlin/client.rb

Defined Under Namespace

Classes: PoolResource

Instance Method Summary collapse

Constructor Details

#initialize(url, parent: Async::Task.current, **client_options) ⇒ Client

Client is not reusable. Once closed should be recreated.



42
43
44
45
46
47
48
# File 'lib/grumlin/client.rb', line 42

def initialize(url, parent: Async::Task.current, **client_options)
  @url = url
  @client_options = client_options
  @parent = parent
  @request_dispatcher = nil
  @transport = nil
end

Instance Method Details

#close(check_requests: true) ⇒ Object

Before calling close the user must ensure that: 1) There are no ongoing requests 2) There will be no new writes after



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/grumlin/client.rb', line 69

def close(check_requests: true) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
  return if @closed

  @closed = true

  @transport&.close
  @transport&.wait

  @response_task&.stop
  @response_task&.wait

  return if @request_dispatcher&.requests&.empty?

  @request_dispatcher.clear unless check_requests

  raise Grumlin::ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}" if check_requests
ensure
  Console.debug(self, "Closed")
end

#connectObject

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/grumlin/client.rb', line 50

def connect
  raise ClientClosedError if @closed

  @transport = build_transport
  response_channel = @transport.connect
  @request_dispatcher = Grumlin::RequestDispatcher.new
  @response_task = @parent.async do
    response_channel.each do |response|
      @request_dispatcher.add_response(response)
    end
  rescue Async::Stop, Async::TimeoutError, StandardError
    close(check_requests: false)
  end
  Console.debug(self, "Connected")
end

#connected?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/grumlin/client.rb', line 89

def connected?
  @transport&.connected? || false
end

#inspectObject



107
108
109
# File 'lib/grumlin/client.rb', line 107

def inspect
  "<#{self.class} url=#{@url} connected=#{connected?}>"
end

#to_sObject



111
112
113
# File 'lib/grumlin/client.rb', line 111

def to_s
  inspect
end

#write(query) ⇒ Object

TODO: support yielding

Raises:



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/grumlin/client.rb', line 94

def write(query)
  raise NotConnectedError unless connected?

  channel = @request_dispatcher.add_request(query)
  begin
    @transport.write(query)
    channel.dequeue
  rescue Async::Stop, Async::TimeoutError
    close(check_requests: false)
    raise
  end
end