Class: OxAiWorkers::DelayedRequest

Inherits:
StateBatch show all
Defined in:
lib/oxaiworkers/delayed_request.rb

Instance Attribute Summary

Attributes inherited from StateBatch

#batch_id, #file_id

Attributes inherited from ModuleRequest

#client, #custom_id, #errors, #max_tokens, #messages, #model, #result, #temperature, #tool_calls, #tool_calls_raw, #tools

Instance Method Summary collapse

Methods inherited from StateBatch

#cleanup

Methods included from StateHelper

#log_me

Methods inherited from ModuleRequest

#append, #cleanup, #initialize_requests, #not_found_is_ok, #params, #parse_choices

Constructor Details

#initialize(batch_id: nil, model: nil, max_tokens: nil, temperature: nil) ⇒ DelayedRequest

Returns a new instance of DelayedRequest.



5
6
7
8
9
10
11
# File 'lib/oxaiworkers/delayed_request.rb', line 5

def initialize(batch_id: nil, model: nil, max_tokens: nil, temperature: nil)
  initialize_requests(model:, max_tokens:, temperature:)
  @custom_id = nil if batch_id.present?
  @batch_id = batch_id
  @file_id = nil
  super()
end

Instance Method Details

#cancelObject



78
79
80
# File 'lib/oxaiworkers/delayed_request.rb', line 78

def cancel
  cancel_batch!
end

#cancel_batchObject



24
25
26
# File 'lib/oxaiworkers/delayed_request.rb', line 24

def cancel_batch
  not_found_is_ok { @client.batches.cancel(id: @batch_id) }
end

#clean_storageObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/oxaiworkers/delayed_request.rb', line 28

def clean_storage
  if @batch_id.present?
    batch = @client.batches.retrieve(id: @batch_id)
    if !batch['output_file_id'].nil?
      not_found_is_ok { @client.files.delete(id: batch['output_file_id']) }
    elsif !batch['error_file_id'].nil?
      not_found_is_ok { @client.files.delete(id: batch['error_file_id']) }
    end
    not_found_is_ok { @client.files.delete(id: batch['input_file_id']) }
    if @file_id.present? && @file_id != batch['input_file_id']
      not_found_is_ok do
        @client.files.delete(id: @file_id)
      end
    end
  elsif @file_id.present?
    not_found_is_ok { @client.files.delete(id: @file_id) }
  end
end

#completed?Boolean

Returns:

  • (Boolean)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/oxaiworkers/delayed_request.rb', line 82

def completed?
  return false if @batch_id.nil?

  batch = @client.batches.retrieve(id: @batch_id)
  if batch['status'] == 'failed'
    @errors = batch['errors']['data'].map { |e| e['message'] }
    fail_batch!
    return true
  elsif batch['status'] != 'completed'
    return false
  end

  if !batch['output_file_id'].nil?
    output = @client.files.content(id: batch['output_file_id'])
    output.each do |line|
      @custom_id = line['custom_id']
      # @result = line.dig("response", "body", "choices", 0, "message", "content")
      parse_choices(line.dig('response', 'body'))
      complete_batch!
    end
  elsif !batch['error_file_id'].nil?
    @errors = @client.files.content(id: batch['error_file_id'])
    fail_batch!
  end
  true
end

#finishObject



47
48
49
50
# File 'lib/oxaiworkers/delayed_request.rb', line 47

def finish
  @custom_id = SecureRandom.uuid
  end_batch! unless batch_idle?
end

#post_batchObject



13
14
15
16
17
18
19
20
21
22
# File 'lib/oxaiworkers/delayed_request.rb', line 13

def post_batch
  response = @client.batches.create(
    parameters: {
      input_file_id: @file_id,
      endpoint: '/v1/chat/completions',
      completion_window: '24h'
    }
  )
  @batch_id = response['id']
end

#request!Object



74
75
76
# File 'lib/oxaiworkers/delayed_request.rb', line 74

def request!
  prepare_batch! if @messages.any?
end

#upload_to_storageObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/oxaiworkers/delayed_request.rb', line 52

def upload_to_storage
  item = {
    "custom_id": @custom_id,
    "method": 'POST',
    "url": '/v1/chat/completions',
    "body": params
  }

  file = Tempfile.new(["batch_#{@custom_id}", '.jsonl'])
  file.write item.to_json
  file.close
  begin
    response = @client.files.upload(parameters: { file: file.path, purpose: 'batch' })
    @file_id = response['id']
    process_batch!
  rescue OpenAI::Error => e
    OxAiWorkers.logger.debug(e.inspect, for: self.class)
    fail_batch!
  end
  file.unlink
end