Class: OxAiWorkers::DelayedRequest
Instance Attribute Summary
Attributes inherited from StateBatch
#batch_id, #file_id
#client, #custom_id, #errors, #max_tokens, #messages, #model, #result, #temperature, #tool_calls, #tool_calls_raw, #tools
Instance Method Summary
collapse
Methods inherited from StateBatch
#cleanup
#log_me
#append, #cleanup, #initialize_requests, #not_found_is_ok, #params, #parse_choices
Constructor Details
#initialize(batch_id: nil, model: nil, max_tokens: nil, temperature: nil) ⇒ DelayedRequest
Returns a new instance of DelayedRequest.
5
6
7
8
9
10
11
|
# File 'lib/oxaiworkers/delayed_request.rb', line 5
def initialize(batch_id: nil, model: nil, max_tokens: nil, temperature: nil)
initialize_requests(model:, max_tokens:, temperature:)
@custom_id = nil if batch_id.present?
@batch_id = batch_id
@file_id = nil
super()
end
|
Instance Method Details
#cancel ⇒ Object
78
79
80
|
# File 'lib/oxaiworkers/delayed_request.rb', line 78
def cancel
cancel_batch!
end
|
#cancel_batch ⇒ Object
24
25
26
|
# File 'lib/oxaiworkers/delayed_request.rb', line 24
def cancel_batch
not_found_is_ok { @client.batches.cancel(id: @batch_id) }
end
|
#clean_storage ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/oxaiworkers/delayed_request.rb', line 28
def clean_storage
if @batch_id.present?
batch = @client.batches.retrieve(id: @batch_id)
if !batch['output_file_id'].nil?
not_found_is_ok { @client.files.delete(id: batch['output_file_id']) }
elsif !batch['error_file_id'].nil?
not_found_is_ok { @client.files.delete(id: batch['error_file_id']) }
end
not_found_is_ok { @client.files.delete(id: batch['input_file_id']) }
if @file_id.present? && @file_id != batch['input_file_id']
not_found_is_ok do
@client.files.delete(id: @file_id)
end
end
elsif @file_id.present?
not_found_is_ok { @client.files.delete(id: @file_id) }
end
end
|
#completed? ⇒ Boolean
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/oxaiworkers/delayed_request.rb', line 82
def completed?
return false if @batch_id.nil?
batch = @client.batches.retrieve(id: @batch_id)
if batch['status'] == 'failed'
@errors = batch['errors']['data'].map { |e| e['message'] }
fail_batch!
return true
elsif batch['status'] != 'completed'
return false
end
if !batch['output_file_id'].nil?
output = @client.files.content(id: batch['output_file_id'])
output.each do |line|
@custom_id = line['custom_id']
parse_choices(line.dig('response', 'body'))
complete_batch!
end
elsif !batch['error_file_id'].nil?
@errors = @client.files.content(id: batch['error_file_id'])
fail_batch!
end
true
end
|
#finish ⇒ Object
47
48
49
50
|
# File 'lib/oxaiworkers/delayed_request.rb', line 47
def finish
@custom_id = SecureRandom.uuid
end_batch! unless batch_idle?
end
|
#post_batch ⇒ Object
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/oxaiworkers/delayed_request.rb', line 13
def post_batch
response = @client.batches.create(
parameters: {
input_file_id: @file_id,
endpoint: '/v1/chat/completions',
completion_window: '24h'
}
)
@batch_id = response['id']
end
|
#request! ⇒ Object
74
75
76
|
# File 'lib/oxaiworkers/delayed_request.rb', line 74
def request!
prepare_batch! if @messages.any?
end
|
#upload_to_storage ⇒ Object
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/oxaiworkers/delayed_request.rb', line 52
def upload_to_storage
item = {
"custom_id": @custom_id,
"method": 'POST',
"url": '/v1/chat/completions',
"body": params
}
file = Tempfile.new(["batch_#{@custom_id}", '.jsonl'])
file.write item.to_json
file.close
begin
response = @client.files.upload(parameters: { file: file.path, purpose: 'batch' })
@file_id = response['id']
process_batch!
rescue OpenAI::Error => e
OxAiWorkers.logger.debug(e.inspect, for: self.class)
fail_batch!
end
file.unlink
end
|