Class: Workling::Clients::SqsClient
- Inherits:
-
BrokerBase
- Object
- Base
- BrokerBase
- Workling::Clients::SqsClient
- Defined in:
- lib/workling/clients/sqs_client.rb
Constant Summary collapse
- AWS_MAX_QUEUE_NAME =
80
- DEFAULT_MESSAGES_PER_REQ =
Note that 10 is the maximum number of messages that can be retrieved in a single request.
10
- DEFAULT_VISIBILITY_TIMEOUT =
30
- DEFAULT_VISIBILITY_RESERVE =
10
Instance Attribute Summary collapse
-
#messages_per_req ⇒ Object
readonly
Returns the value of attribute messages_per_req.
-
#sqs_options ⇒ Object
readonly
Mainly exposed for testing purposes.
-
#visibility_timeout ⇒ Object
readonly
Returns the value of attribute visibility_timeout.
Instance Method Summary collapse
-
#close ⇒ Object
No need for explicit closing, since there is no persistent connection to SQS.
-
#connect ⇒ Object
Starts the client.
-
#queue_for_key(key) ⇒ Object
Returns the queue that corresponds to the specified key.
-
#queue_name(key) ⇒ Object
Returns the queue name for the specified key.
-
#request(key, value) ⇒ Object
Request work.
-
#retrieve(key) ⇒ Object
Retrieve work.
Methods inherited from BrokerBase
Methods inherited from Base
#dispatch, installed?, load, #subscribe
Instance Attribute Details
#messages_per_req ⇒ Object (readonly)
Returns the value of attribute messages_per_req.
43 44 45 |
# File 'lib/workling/clients/sqs_client.rb', line 43 def @messages_per_req end |
#sqs_options ⇒ Object (readonly)
Mainly exposed for testing purposes
42 43 44 |
# File 'lib/workling/clients/sqs_client.rb', line 42 def @sqs_options end |
#visibility_timeout ⇒ Object (readonly)
Returns the value of attribute visibility_timeout.
44 45 46 |
# File 'lib/workling/clients/sqs_client.rb', line 44 def visibility_timeout @visibility_timeout end |
Instance Method Details
#close ⇒ Object
No need for explicit closing, since there is no persistent connection to SQS.
73 74 75 |
# File 'lib/workling/clients/sqs_client.rb', line 73 def close true end |
#connect ⇒ Object
Starts the client.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/workling/clients/sqs_client.rb', line 47 def connect @sqs_options = Workling.config[:sqs_options] # Make sure that required options were specified unless (@sqs_options.include?('aws_access_key_id') && @sqs_options.include?('aws_secret_access_key')) raise WorklingError, 'Unable to start SqsClient due to missing SQS options' end # Optional settings @messages_per_req = @sqs_options['messages_per_req'] || DEFAULT_MESSAGES_PER_REQ @visibility_timeout = @sqs_options['visibility_timeout'] || DEFAULT_VISIBILITY_TIMEOUT @visibility_reserve = @sqs_options['visibility_reserve'] || DEFAULT_VISIBILITY_RESERVE begin @sqs = RightAws::SqsGen2.new( @sqs_options['aws_access_key_id'], @sqs_options['aws_secret_access_key'], :multi_thread => true) rescue => e raise WorklingError, "Unable to connect to SQS. Error: #{e}" end end |
#queue_for_key(key) ⇒ Object
Returns the queue that corresponds to the specified key. Creates the queue if it doesn’t exist yet.
137 138 139 140 |
# File 'lib/workling/clients/sqs_client.rb', line 137 def queue_for_key(key) # Use thread local for storing queues, for the same reason as for buffers Thread.current["queue_#{key}"] ||= @sqs.queue(queue_name(key), true, @visibility_timeout) end |
#queue_name(key) ⇒ Object
Returns the queue name for the specified key. The name consists of an optional prefix, followed by the environment and the key itself. Note that with a long worker class / method name, the name could exceed the 80 character maximum for SQS queue names. We truncate the name until it fits, but there’s still the danger of this not being unique any more. Might need to implement a more robust naming scheme…
148 149 150 |
# File 'lib/workling/clients/sqs_client.rb', line 148 def queue_name(key) "#{@sqs_options['prefix'] || ''}#{env}_#{key}"[0, AWS_MAX_QUEUE_NAME] end |
#request(key, value) ⇒ Object
Request work.
126 127 128 129 130 131 132 133 |
# File 'lib/workling/clients/sqs_client.rb', line 126 def request(key, value) begin queue_for_key(key).(value.to_json) rescue => e logger.error "SQS Client: Error sending msg for key: #{key}, value: #{value.inspect}; Error: #{e}" raise WorklingError, "Error sending msg for key: #{key}, value: #{value.inspect}; Error: #{e}" end end |
#retrieve(key) ⇒ Object
Retrieve work.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/workling/clients/sqs_client.rb', line 78 def retrieve(key) begin # We're using a buffer per key to retrieve several messages at once, # then return them one at a time until the buffer is empty. # Workling seems to create one thread per worker class, each with its own # client. But to be sure (and to be less dependent on workling internals), # we store each buffer in a thread local variable. buffer = Thread.current["buffer_#{key}"] if buffer.nil? || buffer.empty? Thread.current["buffer_#{key}"] = buffer = queue_for_key(key).( @messages_per_req, @visibility_timeout) end if buffer.empty? nil else msg = buffer.shift # We need to protect against the case that processing one of the # messages in the buffer took so much time that the visibility # timeout for the remaining messages has expired. To be on the # safe side (since we need to leave enough time to delete the # message), we drop it if more than half of the visibility timeout # has elapsed. if msg.received_at < (Time.now - (@visibility_timeout - @visibility_reserve)) nil else # Need to wrap in HashWithIndifferentAccess, as JSON serialization # loses symbol keys. parsed_msg = HashWithIndifferentAccess.new(JSON.parse(msg.body)) # Delete the msg from SQS, so we don't re-retrieve it after the # visibility timeout. Ideally we would defer deleting a msg until # after Workling has successfully processed it, but it currently # doesn't provide the necessary hooks for this. msg.delete parsed_msg end end rescue => e logger.error "Error retrieving msg for key: #{key}; Error: #{e}\n#{e.backtrace.join("\n")}" end end |