Class: ConcurrentPipeline::Pipeline
- Inherits:
-
Object
- Object
- ConcurrentPipeline::Pipeline
show all
- Defined in:
- lib/concurrent_pipeline/pipeline.rb
Defined Under Namespace
Classes: PipelineStep, Wrapper
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(target:, store:, changeset:, stream:) ⇒ Pipeline
Returns a new instance of Pipeline.
203
204
205
206
207
208
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 203
def initialize(target:, store:, changeset:, stream:)
@target = target
@store = store
@changeset = changeset
@stream = stream
end
|
Class Attribute Details
.target_type ⇒ Object
Returns the value of attribute target_type.
145
146
147
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 145
def target_type
@target_type
end
|
Instance Attribute Details
#changeset ⇒ Object
Returns the value of attribute changeset.
202
203
204
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 202
def changeset
@changeset
end
|
#store ⇒ Object
Returns the value of attribute store.
202
203
204
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 202
def store
@store
end
|
#target ⇒ Object
Returns the value of attribute target.
202
203
204
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 202
def target
@target
end
|
Class Method Details
.build_pipelines(store:, stream:, pool:) ⇒ Object
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 147
def build_pipelines(store:, stream:, pool:)
if target_type
store.all(target_type).map { |record|
Wrapper.new(
pipeline: new(
target: record,
store: store,
changeset: store.changeset,
stream: stream
),
pool: pool
)
}
else
Wrapper.new(
pipeline: new(
target: nil,
store: store,
changeset: store.changeset,
stream: stream
),
pool: pool
)
end
end
|
.concurrency(size = nil) ⇒ Object
196
197
198
199
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 196
def concurrency(size = nil)
@concurrency = size if size
@concurrency
end
|
.done ⇒ Object
183
184
185
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 183
def done(...)
define_method(:done?, ...)
end
|
.each(type, as: nil) ⇒ Object
173
174
175
176
177
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 173
def each(type, as: nil)
@target_type = type
define_method(as) { target } if as
define_method(:record) { target }
end
|
187
188
189
190
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 187
def perform(...)
steps(:perform)
define_method(:perform, ...)
end
|
.ready ⇒ Object
179
180
181
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 179
def ready(...)
define_method(:ready?, ...)
end
|
.steps(*sequence) ⇒ Object
192
193
194
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 192
def steps(*sequence)
define_method(:steps) { sequence }
end
|
Instance Method Details
#stream(type, payload) ⇒ Object
210
211
212
|
# File 'lib/concurrent_pipeline/pipeline.rb', line 210
def stream(type, payload)
@stream.push(type, payload)
end
|