Class: BBFS::ContentServer::QueueIndexer

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_indexer.rb

Overview

Simple indexer, gets inputs events (files to index) and outputs content data updates into output queue.

Instance Method Summary collapse

Constructor Details

#initialize(input_queue, output_queue, content_data_path) ⇒ QueueIndexer

Returns a new instance of QueueIndexer.



12
13
14
15
16
# File 'lib/content_server/queue_indexer.rb', line 12

def initialize(input_queue, output_queue, content_data_path)
  @input_queue = input_queue
  @output_queue = output_queue
  @content_data_path = content_data_path
end

Instance Method Details

#runObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/content_server/queue_indexer.rb', line 18

def run
  server_content_data = ContentData::ContentData.new
  # Shallow check content data files.
  tmp_content_data = ContentData::ContentData.new
  tmp_content_data.from_file(@content_data_path) if File.exists?(@content_data_path)
  tmp_content_data.instances.each_value do |instance|
    # Skipp instances (files) which did not pass the shallow check.
    Log.info('Shallow checking content data:')
    if shallow_check(instance)
      Log.info("exists: #{instance.full_path}")
      server_content_data.add_content(tmp_content_data.contents[instance.checksum])
      server_content_data.add_instance(instance)
    else
      Log.info("changed: #{instance.full_path}")
      # Add non existing and changed files to index queue.
      @input_queue.push([FileMonitoring::FileStatEnum::STABLE, instance.full_path])
    end
  end

  # Start indexing on demand and write changes to queue
  thread = Thread.new do
    while true do
      Log.info 'Waiting on index input queue.'
      state, is_dir, path = @input_queue.pop
      Log.info "event: #{state}, #{is_dir}, #{path}."

      # index files and add to copy queue
      # delete directory with it's sub files
      # delete file
      if state == FileMonitoring::FileStatEnum::STABLE && !is_dir
        Log.info "Indexing content #{path}."
        index_agent = FileIndexing::IndexAgent.new
        indexer_patterns = FileIndexing::IndexerPatterns.new
        indexer_patterns.add_pattern(path)
        index_agent.index(indexer_patterns, server_content_data)
        Log.info("Failed files: #{index_agent.failed_files.to_a.join(',')}.") \
            if !index_agent.failed_files.empty?
        Log.info("indexed content #{index_agent.indexed_content}.")
        server_content_data.merge index_agent.indexed_content
      elsif ((state == FileMonitoring::FileStatEnum::NON_EXISTING ||
        state == FileMonitoring::FileStatEnum::CHANGED) && !is_dir)
        # If file content changed, we should remove old instance.
        key = FileIndexing::IndexAgent.global_path(path)
        # Check if deleted file exists at content data.
        Log.info("Instance to remove: #{key}")
        if server_content_data.instances.key?(key)
          instance_to_remove = server_content_data.instances[key]
          # Remove file from content data only if it does not pass the shallow check, i.e.,
          # content has changed/removed.
          if !shallow_check(instance_to_remove)
            content_to_remove = server_content_data.contents[instance_to_remove.checksum]
            # Remove the deleted instance.
            content_data_to_remove = ContentData::ContentData.new
            content_data_to_remove.add_content(content_to_remove)
            content_data_to_remove.add_instance(instance_to_remove)
            # Remove the file.
            server_content_data = ContentData::ContentData.remove_instances(
                content_data_to_remove, server_content_data)
          end
        end
      elsif state == FileMonitoring::FileStatEnum::NON_EXISTING && is_dir
        Log.info("NonExisting/Changed: #{path}")
        # Remove directory but only when non-existing.
        Log.info("Directory to remove: #{path}")
        global_dir = FileIndexing::IndexAgent.global_path(path)
        server_content_data = ContentData::ContentData.remove_directory(
        server_content_data, global_dir)
      else
        Log.info("This case should not be handled: #{state}, #{is_dir}, #{path}.")
      end
      # TODO(kolman): Don't write to file each change?
      Log.info "Writing server content data to #{@content_data_path}."
      server_content_data.to_file(@content_data_path)

      Log.info 'Adding server content data to queue.'
      @output_queue.push(ContentData::ContentData.new(server_content_data))
    end  # while true do
  end  # Thread.new do
  thread
end

#shallow_check(instance) ⇒ Object

Check file existence, check it’s size and modification date. If something wrong reindex the file and update content data.



101
102
103
104
105
106
# File 'lib/content_server/queue_indexer.rb', line 101

def shallow_check(instance)
  shallow_instance = FileIndexing::IndexAgent.create_shallow_instance(instance.full_path)
  return false unless shallow_instance
  return (shallow_instance.size == instance.size &&
          shallow_instance.modification_time == instance.modification_time)
end