Class: Bricolage::SQSDataSource
- Inherits:
-
DataSource
- Object
- DataSource
- Bricolage::SQSDataSource
show all
- Defined in:
- lib/bricolage/sqsdatasource.rb
Defined Under Namespace
Classes: DeleteMessageBuffer, MessageHandler
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#client ⇒ Object
-
#delete_message(msg) ⇒ Object
-
#delete_message_async(msg) ⇒ Object
-
#handle_messages(handler:, message_class:) ⇒ Object
High-Level Polling Interface.
-
#initialize(region:, url:, access_key_id: nil, secret_access_key: nil, visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false) ⇒ SQSDataSource
constructor
A new instance of SQSDataSource.
-
#initiate_terminate ⇒ Object
-
#poll ⇒ Object
-
#process_async_delete(now = Time.now) ⇒ Object
-
#process_async_delete_force ⇒ Object
-
#put(msg) ⇒ Object
-
#receive_messages ⇒ Object
-
#send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs) ⇒ Object
-
#send_message(msg) ⇒ Object
-
#send_object(obj) ⇒ Object
-
#terminating? ⇒ Boolean
Constructor Details
#initialize(region:, url:, access_key_id: nil, secret_access_key: nil, visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false) ⇒ SQSDataSource
Returns a new instance of SQSDataSource.
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/bricolage/sqsdatasource.rb', line 13
def initialize(region:, url:, access_key_id: nil, secret_access_key: nil,
visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false)
@region = region
@url = url
@access_key_id = access_key_id
@secret_access_key = secret_access_key
@visibility_timeout = visibility_timeout
@max_number_of_messages = max_number_of_messages
@wait_time_seconds = wait_time_seconds
@noop = noop
end
|
Instance Attribute Details
#access_key_id ⇒ Object
Returns the value of attribute access_key_id.
27
28
29
|
# File 'lib/bricolage/sqsdatasource.rb', line 27
def access_key_id
@access_key_id
end
|
#max_number_of_messages ⇒ Object
Returns the value of attribute max_number_of_messages.
31
32
33
|
# File 'lib/bricolage/sqsdatasource.rb', line 31
def max_number_of_messages
@max_number_of_messages
end
|
#region ⇒ Object
Returns the value of attribute region.
25
26
27
|
# File 'lib/bricolage/sqsdatasource.rb', line 25
def region
@region
end
|
#secret_access_key ⇒ Object
Returns the value of attribute secret_access_key.
28
29
30
|
# File 'lib/bricolage/sqsdatasource.rb', line 28
def secret_access_key
@secret_access_key
end
|
#url ⇒ Object
Returns the value of attribute url.
26
27
28
|
# File 'lib/bricolage/sqsdatasource.rb', line 26
def url
@url
end
|
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
30
31
32
|
# File 'lib/bricolage/sqsdatasource.rb', line 30
def visibility_timeout
@visibility_timeout
end
|
#wait_time_seconds ⇒ Object
Returns the value of attribute wait_time_seconds.
32
33
34
|
# File 'lib/bricolage/sqsdatasource.rb', line 32
def wait_time_seconds
@wait_time_seconds
end
|
Class Method Details
.new_mock(**args) ⇒ Object
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/bricolage/sqsmock.rb', line 6
def SQSDataSource.new_mock(**args)
SQSDataSource.new(
region: 'ap-northeast-1',
url: 'http://sqs/000000000000/queue-name',
access_key_id: 'access_key_id_1',
secret_access_key: 'secret_access_key_1',
visibility_timeout: 30
).tap {|ds|
logger = NullLogger.new
ds.__send__(:initialize_base, 'name', nil, logger)
ds.instance_variable_set(:@client, SQSMock::Client.new(**args))
}
end
|
Instance Method Details
#client ⇒ Object
34
35
36
37
38
39
|
# File 'lib/bricolage/sqsdatasource.rb', line 34
def client
@client ||= begin
c = @noop ? DummySQSClient.new : Aws::SQS::Client.new(region: @region, access_key_id: @access_key_id, secret_access_key: @secret_access_key)
SQSClientWrapper.new(c, logger: logger)
end
end
|
#delete_message(msg) ⇒ Object
184
185
186
187
188
189
|
# File 'lib/bricolage/sqsdatasource.rb', line 184
def delete_message(msg)
client.delete_message(
queue_url: @url,
receipt_handle: msg.receipt_handle
)
end
|
#delete_message_async(msg) ⇒ Object
191
192
193
|
# File 'lib/bricolage/sqsdatasource.rb', line 191
def delete_message_async(msg)
delete_message_buffer.put(msg)
end
|
#handle_messages(handler:, message_class:) ⇒ Object
High-Level Polling Interface
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/bricolage/sqsdatasource.rb', line 45
def handle_messages(handler:, message_class:)
trap_signals
polling_loop do
result = poll or next true
msgs = message_class.for_sqs_result(result)
msgs.each do |msg|
handler.handle(msg)
end
handler.after_message_batch
break if terminating?
msgs.empty?
end
end
|
#initiate_terminate ⇒ Object
70
71
72
73
|
# File 'lib/bricolage/sqsdatasource.rb', line 70
def initiate_terminate
@terminating = true
end
|
#poll ⇒ Object
100
101
102
103
104
105
106
107
108
|
# File 'lib/bricolage/sqsdatasource.rb', line 100
def poll
result = receive_messages()
unless result and result.successful?
logger.error "ReceiveMessage failed: #{result ? result.error.message : '(result=nil)'}"
return nil
end
logger.info "receive #{result.messages.size} messages"
result
end
|
#process_async_delete(now = Time.now) ⇒ Object
195
196
197
|
# File 'lib/bricolage/sqsdatasource.rb', line 195
def process_async_delete(now = Time.now)
delete_message_buffer.flush(now)
end
|
#process_async_delete_force ⇒ Object
199
200
201
|
# File 'lib/bricolage/sqsdatasource.rb', line 199
def process_async_delete_force
delete_message_buffer.flush_force
end
|
#put(msg) ⇒ Object
161
162
163
|
# File 'lib/bricolage/sqsdatasource.rb', line 161
def put(msg)
send_message(msg)
end
|
#receive_messages ⇒ Object
151
152
153
154
155
156
157
158
159
|
# File 'lib/bricolage/sqsdatasource.rb', line 151
def receive_messages
result = client.receive_message(
queue_url: @url,
max_number_of_messages: @max_number_of_messages,
visibility_timeout: @visibility_timeout,
wait_time_seconds: @wait_time_seconds
)
result
end
|
#send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs) ⇒ Object
177
178
179
180
181
182
|
# File 'lib/bricolage/sqsdatasource.rb', line 177
def send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs)
attrs['eventName'] = name
attrs['eventSource'] = source
attrs['eventTime'] = time.iso8601
send_object(attrs)
end
|
#send_message(msg) ⇒ Object
165
166
167
168
169
170
171
|
# File 'lib/bricolage/sqsdatasource.rb', line 165
def send_message(msg)
client.send_message(
queue_url: @url,
message_body: { 'Records' => [msg.body] }.to_json,
delay_seconds: msg.delay_seconds
)
end
|
#send_object(obj) ⇒ Object
173
174
175
|
# File 'lib/bricolage/sqsdatasource.rb', line 173
def send_object(obj)
send_message(ObjectMessage.new(obj))
end
|
#terminating? ⇒ Boolean
75
76
77
|
# File 'lib/bricolage/sqsdatasource.rb', line 75
def terminating?
@terminating
end
|