Class: Thrift::SqsTransport

Inherits:
BaseTransport
  • Object
show all
Defined in:
lib/thrift-sqs-transport.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, aws_key, aws_secret, options = {}) ⇒ SqsTransport

Returns a new instance of SqsTransport.



30
31
32
33
34
35
36
37
# File 'lib/thrift-sqs-transport.rb', line 30

def initialize(queue_name, aws_key, aws_secret, options = {})
  @queue_name, @aws_key, @aws_secret = queue_name, aws_key, aws_secret
  
  @delete_after_read = options[:delete]
  @max_messages = options[:messages_to_read] || 10
  @region = options[:region]
  @host = options[:host]
end

Instance Method Details

#closeObject



50
51
52
53
54
55
# File 'lib/thrift-sqs-transport.rb', line 50

def close
  @connection = nil
  @queue_url = nil
  @in_buffer = nil
  @out_buffer = nil
end

#flushObject



97
98
99
100
101
102
103
# File 'lib/thrift-sqs-transport.rb', line 97

def flush
  data = @out_buffer.string
  @out_buffer = StringIO.new

  open unless open?
  @connection.send_message(@queue_url, data)
end

#openObject



39
40
41
42
43
44
45
46
47
48
# File 'lib/thrift-sqs-transport.rb', line 39

def open
  sqs_options = { :aws_access_key_id => @aws_key, :aws_secret_access_key => @aws_secret }
  sqs_options[:region] = @region if @region
  sqs_options[:host] = @host if @host

  @connection = Fog::AWS::SQS.new(sqs_options)
  response = @connection.create_queue(@queue_name)
  @queue_url = response.body["QueueUrl"] rescue nil
  @messages = []
end

#open?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/thrift-sqs-transport.rb', line 57

def open?
  !!@queue_url
end

#read(size) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/thrift-sqs-transport.rb', line 61

def read(size)
  open unless open?
  
  if @in_buffer
    data_read = @in_buffer.read(size)

    if data_read.nil?
      @in_buffer = nil
      return read(size)
    else
      return data_read
    end
  else
    @messages += @connection.receive_message(@queue_url, 'MaxNumberOfMessages' => @max_messages).body["Message"] if @messages.length == 0
    return "" if @messages.length == 0

    message = @messages.shift

    body = message["Body"]
    receipt = message["ReceiptHandle"]
    @connection.delete_message(@queue_url, receipt) if @delete_after_read

    if message.length > size
      @in_buffer = StringIO.new(body)
      return @in_buffer.read(size)
    else
      return body
    end
  end
end

#write(data) ⇒ Object



92
93
94
95
# File 'lib/thrift-sqs-transport.rb', line 92

def write(data)
  @out_buffer ||= StringIO.new
  @out_buffer.write(data)
end