Class: Newque::Http_plaintext

Inherits:
Object
  • Object
show all
Defined in:
lib/newque/http/http_plaintext.rb

Instance Method Summary collapse

Constructor Details

#initialize(http) ⇒ Http_plaintext

Returns a new instance of Http_plaintext.



6
7
8
# File 'lib/newque/http/http_plaintext.rb', line 6

def initialize http
  @http = http
end

Instance Method Details

#read(channel, mode, limit = nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/newque/http/http_plaintext.rb', line 42

def read channel, mode, limit=nil
  thread = Thread.new do
    res = @http.conn.get do |req|
      @http.send :set_req_options, req
      req.url "/v1/#{channel}"
      req.headers['newque-mode'] = mode
      req.headers['newque-read-max'] = limit unless limit.nil?
    end
    if res.status == 200
      Read_response.new(
        res.headers['newque-response-length'].to_i,
        res.headers['newque-response-last-id'],
        res.headers['newque-response-last-ts'].to_i,
        res.body.split(@http.options[:separator])
      )
    else
      @http.send :parse_json_response, res.body
    end
  end
  Future.new thread, @http.timeout
end

#write(channel, atomic, msgs, ids = nil) ⇒ Object

Raises:



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/newque/http/http_plaintext.rb', line 10

def write channel, atomic, msgs, ids=nil
  head, *tail = msgs
  raise NewqueError.new("No messages given") if head.nil?

  stream = StringIO.new
  stream.write head
  if msgs.size > 1
    tail.each do |msg|
      stream.write @http.options[:separator]
      stream.write msg
    end
  end

  thread = Thread.new do
    res = @http.conn.post do |req|
      @http.send :set_req_options, req
      req.url "/v1/#{channel}"
      req.body = stream.string

      req.headers['newque-mode'] = if atomic then 'atomic'
        elsif msgs.size == 1 then 'single'
        else 'multiple'
      end
      req.headers['newque-msg-id'] = ids unless ids.nil?
    end

    parsed = @http.send :parse_json_response, res.body
    Write_response.new parsed['saved']
  end
  Future.new thread, @http.timeout
end