Class: ConcurrentPipeline::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/pipeline.rb

Defined Under Namespace

Classes: PipelineStep, Wrapper

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(target:, store:, changeset:, stream:) ⇒ Pipeline

Returns a new instance of Pipeline.



203
204
205
206
207
208
# File 'lib/concurrent_pipeline/pipeline.rb', line 203

def initialize(target:, store:, changeset:, stream:)
  @target = target
  @store = store
  @changeset = changeset
  @stream = stream
end

Class Attribute Details

.target_typeObject (readonly)

Returns the value of attribute target_type.



145
146
147
# File 'lib/concurrent_pipeline/pipeline.rb', line 145

def target_type
  @target_type
end

Instance Attribute Details

#changesetObject (readonly)

Returns the value of attribute changeset.



202
203
204
# File 'lib/concurrent_pipeline/pipeline.rb', line 202

def changeset
  @changeset
end

#storeObject (readonly)

Returns the value of attribute store.



202
203
204
# File 'lib/concurrent_pipeline/pipeline.rb', line 202

def store
  @store
end

#targetObject (readonly)

Returns the value of attribute target.



202
203
204
# File 'lib/concurrent_pipeline/pipeline.rb', line 202

def target
  @target
end

Class Method Details

.build_pipelines(store:, stream:, pool:) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/concurrent_pipeline/pipeline.rb', line 147

def build_pipelines(store:, stream:, pool:)
  if target_type
    store.all(target_type).map { |record|
      Wrapper.new(
        pipeline: new(
          target: record,
          store: store,
          changeset: store.changeset,
          stream: stream
        ),
        pool: pool
      )
    }
  else
    Wrapper.new(
      pipeline: new(
        target: nil,
        store: store,
        changeset: store.changeset,
        stream: stream
      ),
      pool: pool
    )
  end
end

.concurrency(size = nil) ⇒ Object



196
197
198
199
# File 'lib/concurrent_pipeline/pipeline.rb', line 196

def concurrency(size = nil)
  @concurrency = size if size
  @concurrency
end

.doneObject



183
184
185
# File 'lib/concurrent_pipeline/pipeline.rb', line 183

def done(...)
  define_method(:done?, ...)
end

.each(type, as: nil) ⇒ Object



173
174
175
176
177
# File 'lib/concurrent_pipeline/pipeline.rb', line 173

def each(type, as: nil)
  @target_type = type
  define_method(as) { target } if as
  define_method(:record) { target }
end

.performObject



187
188
189
190
# File 'lib/concurrent_pipeline/pipeline.rb', line 187

def perform(...)
  steps(:perform)
  define_method(:perform, ...)
end

.readyObject



179
180
181
# File 'lib/concurrent_pipeline/pipeline.rb', line 179

def ready(...)
  define_method(:ready?, ...)
end

.steps(*sequence) ⇒ Object



192
193
194
# File 'lib/concurrent_pipeline/pipeline.rb', line 192

def steps(*sequence)
  define_method(:steps) { sequence }
end

Instance Method Details

#stream(type, payload) ⇒ Object



210
211
212
# File 'lib/concurrent_pipeline/pipeline.rb', line 210

def stream(type, payload)
  @stream.push(type, payload)
end