Module: Classifier::Streaming

Included in:
Bayes, KNN, LSI, LogisticRegression, TFIDF
Defined in:
lib/classifier/streaming.rb,
lib/classifier/streaming/progress.rb,
lib/classifier/streaming/line_reader.rb

Overview

Streaming module provides memory-efficient training capabilities for classifiers. Include this module in a classifier to add streaming and batch training methods.

Examples:

Including in a classifier

class MyClassifier
  include Classifier::Streaming
end

Streaming training

classifier.train_from_stream(:category, File.open('corpus.txt'))

Batch training with progress

classifier.train_batch(:category, documents, batch_size: 100) do |progress|
  puts "#{progress.percent}% complete"
end

Defined Under Namespace

Classes: LineReader, Progress

Constant Summary collapse

DEFAULT_BATCH_SIZE =

Default batch size for streaming operations

100

Instance Method Summary collapse

Instance Method Details

#delete_checkpoint(checkpoint_id) ⇒ Object

Deletes a checkpoint.

Raises:

  • (ArgumentError)


91
92
93
94
95
96
# File 'lib/classifier/streaming.rb', line 91

def delete_checkpoint(checkpoint_id)
  raise ArgumentError, 'No storage configured' unless respond_to?(:storage) && storage

  checkpoint_storage = checkpoint_storage_for(checkpoint_id)
  checkpoint_storage.delete if checkpoint_storage.exists?
end

#list_checkpointsObject

Lists available checkpoints. Requires a storage backend to be configured.

Raises:

  • (ArgumentError)


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/classifier/streaming.rb', line 69

def list_checkpoints
  raise ArgumentError, 'No storage configured' unless respond_to?(:storage) && storage

  case storage
  when Storage::File
    file_storage = storage #: Storage::File
    dir = File.dirname(file_storage.path)
    base = File.basename(file_storage.path, '.*')
    ext = File.extname(file_storage.path)

    pattern = File.join(dir, "#{base}_checkpoint_*#{ext}")
    Dir.glob(pattern).map do |path|
      File.basename(path, ext).sub(/^#{Regexp.escape(base)}_checkpoint_/, '')
    end.sort
  else
    []
  end
end

#save_checkpoint(checkpoint_id) ⇒ Object

Saves a checkpoint of the current training state. Requires a storage backend to be configured.

Raises:

  • (ArgumentError)


52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/classifier/streaming.rb', line 52

def save_checkpoint(checkpoint_id)
  raise ArgumentError, 'No storage configured' unless respond_to?(:storage) && storage

  original_storage = storage

  begin
    self.storage = checkpoint_storage_for(checkpoint_id)
    save
  ensure
    self.storage = original_storage
  end
end

#train_batch(category = nil, documents = nil, batch_size: DEFAULT_BATCH_SIZE, **categories, &block) ⇒ Object

Trains the classifier with an array of documents in batches. Supports both positional and keyword argument styles.

Examples:

Positional style

classifier.train_batch(:spam, documents, batch_size: 100)

Keyword style

classifier.train_batch(spam: documents, ham: other_docs, batch_size: 100)

Raises:

  • (NotImplementedError)


44
45
46
# File 'lib/classifier/streaming.rb', line 44

def train_batch(category = nil, documents = nil, batch_size: DEFAULT_BATCH_SIZE, **categories, &block)
  raise NotImplementedError, "#{self.class} must implement train_batch"
end

#train_from_stream(category, io, batch_size: DEFAULT_BATCH_SIZE, &block) ⇒ Object

Trains the classifier from an IO stream. Each line in the stream is treated as a separate document.

Raises:

  • (NotImplementedError)


30
31
32
# File 'lib/classifier/streaming.rb', line 30

def train_from_stream(category, io, batch_size: DEFAULT_BATCH_SIZE, &block)
  raise NotImplementedError, "#{self.class} must implement train_from_stream"
end