Class: Workato::Connector::Sdk::Trigger
Instance Attribute Summary
Attributes inherited from Operation
#streams
Instance Method Summary
collapse
-
#dedup(input = {}) ⇒ T.untyped
-
#initialize(trigger:, methods: {}, connection: Connection.new, object_definitions: nil, streams: ProhibitedStreams.new) ⇒ void
constructor
-
#invoke(input = {}, payload = {}, headers = {}, params = {}, webhook_subscribe_output = {}) ⇒ SorbetTypes::WebhookNotificationOutputHash, SorbetTypes::PollOutputHash
-
#poll(settings = nil, input = {}, closure = nil, extended_input_schema = [], extended_output_schema = []) ⇒ SorbetTypes::PollOutputHash
-
#poll_page(settings = nil, input = {}, closure = nil, extended_input_schema = [], extended_output_schema = []) ⇒ SorbetTypes::PollOutputHash
-
#webhook_notification(input = {}, payload = {}, extended_input_schema = [], extended_output_schema = [], headers = {}, params = {}, settings = nil, webhook_subscribe_output = {}) ⇒ SorbetTypes::WebhookNotificationOutputHash
-
#webhook_subscribe(webhook_url = '', settings = nil, input = {}, recipe_id = recipe_id!) ) ⇒ SorbetTypes::WebhookSubscribeOutput
-
#webhook_unsubscribe(webhook_subscribe_output = {}) ⇒ T.untyped
Methods inherited from Operation
#execute, #extended_schema, #input_fields, #output_fields, #sample_output, #summarize_input, #summarize_output
#execution_context, #recipe_id!
Methods included from Dsl::Error
#error
Methods included from Dsl::HTTP
#copy, #delete, #get, #head, #move, #options, #parallel, #patch, #post, #put
Methods included from Dsl::AWS
#aws
#blank, #clear, #decrypt, #encrypt, #null, #puts, #skip, #sleep, #workato
#workato_schema
#lookup
#account_property
Methods included from Dsl::Time
#now, #today
Constructor Details
#initialize(trigger:, methods: {}, connection: Connection.new, object_definitions: nil, streams: ProhibitedStreams.new) ⇒ void
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/workato/connector/sdk/trigger.rb', line 46
def initialize(trigger:, methods: {}, connection: Connection.new, object_definitions: nil,
streams: ProhibitedStreams.new)
super(
operation: trigger,
connection: connection,
methods: methods,
object_definitions: object_definitions,
streams: streams
)
end
|
Instance Method Details
#dedup(input = {}) ⇒ T.untyped
121
122
123
|
# File 'lib/workato/connector/sdk/trigger.rb', line 121
def dedup(input = {})
trigger[:dedup].call(input)
end
|
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
# File 'lib/workato/connector/sdk/trigger.rb', line 208
def invoke(input = {}, payload = {}, = {}, params = {}, webhook_subscribe_output = {})
extended_schema = extended_schema(nil, input)
config_schema = Schema.new(schema: config_fields_schema)
input_schema = Schema.new(schema: extended_schema[:input])
output_schema = Schema.new(schema: extended_schema[:output])
input = apply_input_schema(input, config_schema + input_schema)
if webhook_notification?
webhook_notification(
input,
payload,
input_schema,
output_schema,
,
params,
nil,
webhook_subscribe_output
).tap do |event|
apply_output_schema(event, output_schema)
end
else
output = poll(nil, input, nil, input_schema, output_schema)
output[:events].each do |event|
apply_output_schema(event, output_schema)
end
output
end
end
|
#poll(settings = nil, input = {}, closure = nil, extended_input_schema = [], extended_output_schema = []) ⇒ SorbetTypes::PollOutputHash
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/workato/connector/sdk/trigger.rb', line 102
def poll(settings = nil, input = {}, closure = nil, extended_input_schema = [], extended_output_schema = [])
events = T.let([], T::Array[SorbetTypes::TriggerEventHash])
loop do
output = poll_page(settings, input, closure, extended_input_schema, extended_output_schema)
events = output[:events] + events
closure = output[:next_poll]
break unless output[:can_poll_more]
end
{
events: events,
can_poll_more: false,
next_poll: closure
}.with_indifferent_access
end
|
#poll_page(settings = nil, input = {}, closure = nil, extended_input_schema = [], extended_output_schema = []) ⇒ SorbetTypes::PollOutputHash
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/workato/connector/sdk/trigger.rb', line 68
def poll_page(settings = nil, input = {}, closure = nil, extended_input_schema = [],
extended_output_schema = [])
poll_proc = trigger[:poll]
output = execute(
settings,
{ input: input, closure: closure },
extended_input_schema,
extended_output_schema
) do |connection, payload, eis, eos|
instance_exec(connection, payload[:input], payload[:closure], eis, eos, &poll_proc) || {}
end
unless T.unsafe(output).is_a?(::Hash)
Kernel.raise Workato::Connector::Sdk::InvalidTriggerPollOutputError
end
output[:events] = Array.wrap(output[:events])
.reverse!
.map! { |event| ::Hash.try_convert(event) || event }
output[:next_poll] = output[:next_poll].presence || closure
output
end
|
#webhook_notification(input = {}, payload = {}, extended_input_schema = [], extended_output_schema = [], headers = {}, params = {}, settings = nil, webhook_subscribe_output = {}) ⇒ SorbetTypes::WebhookNotificationOutputHash
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# File 'lib/workato/connector/sdk/trigger.rb', line 139
def webhook_notification(
input = {},
payload = {},
extended_input_schema = [],
extended_output_schema = [],
= {},
params = {},
settings = nil,
webhook_subscribe_output = {}
)
connection.merge_settings!(settings) if settings
output = global_dsl_context.execute(
Utilities::HashWithIndifferentAccess.wrap(input),
payload,
Array.wrap(extended_input_schema).map { |i| Utilities::HashWithIndifferentAccess.wrap(i) },
Array.wrap(extended_output_schema).map { |i| Utilities::HashWithIndifferentAccess.wrap(i) },
Utilities::HashWithIndifferentAccess.wrap(),
Utilities::HashWithIndifferentAccess.wrap(params),
connection.settings,
Utilities::HashWithIndifferentAccess.wrap(webhook_subscribe_output),
&trigger[:webhook_notification]
)
if output.is_a?(::Array)
output.map! { |event| ::Hash.try_convert(event) || event }
else
::Hash.try_convert(output) || output
end
end
|
#webhook_subscribe(webhook_url = '', settings = nil, input = {}, recipe_id = recipe_id!)
) ⇒ SorbetTypes::WebhookSubscribeOutput
176
177
178
179
180
181
182
183
184
185
186
187
|
# File 'lib/workato/connector/sdk/trigger.rb', line 176
def webhook_subscribe(webhook_url = '', settings = nil, input = {}, recipe_id = recipe_id!)
webhook_subscribe_proc = trigger[:webhook_subscribe]
execute(settings, { input: input, webhook_url: webhook_url, recipe_id: recipe_id }) do |connection, payload|
instance_exec(
payload[:webhook_url],
connection,
payload[:input],
payload[:recipe_id],
&webhook_subscribe_proc
)
end
end
|
#webhook_unsubscribe(webhook_subscribe_output = {}) ⇒ T.untyped
190
191
192
193
194
195
|
# File 'lib/workato/connector/sdk/trigger.rb', line 190
def webhook_unsubscribe(webhook_subscribe_output = {})
webhook_unsubscribe_proc = trigger[:webhook_unsubscribe]
execute(nil, webhook_subscribe_output) do |_connection, input|
instance_exec(input, &webhook_unsubscribe_proc)
end
end
|