index.ts 2.48 KB
Newer Older
1
2
3
4
5
6
7
8
type TextStreamUpdate = {
	done: boolean;
	value: string;
};

// createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response,
// and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
export async function createOpenAITextStream(
9
10
	messageStream: ReadableStreamDefaultReader,
	splitLargeDeltas: boolean
11
): Promise<AsyncGenerator<TextStreamUpdate>> {
12
13
14
15
16
	let iterator = openAIStreamToIterator(messageStream);
	if (splitLargeDeltas) {
		iterator = streamLargeDeltasAsRandomChunks(iterator);
	}
	return iterator;
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
}

async function* openAIStreamToIterator(
	reader: ReadableStreamDefaultReader
): AsyncGenerator<TextStreamUpdate> {
	while (true) {
		const { value, done } = await reader.read();
		if (done) {
			yield { done: true, value: '' };
			break;
		}
		const lines = value.split('\n');
		for (const line of lines) {
			if (line !== '') {
				console.log(line);
				if (line === 'data: [DONE]') {
					yield { done: true, value: '' };
Arman Ordookhani's avatar
Arman Ordookhani committed
34
				} else if (line.startsWith(':')) {
35
36
					// Events starting with : are comments https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
					// OpenRouter sends heartbeats like ": OPENROUTER PROCESSING"
Timothy J. Baek's avatar
Timothy J. Baek committed
37
					continue;
38
				} else {
39
40
41
					try {
						const data = JSON.parse(line.replace(/^data: /, ''));
						console.log(data);
42

43
						yield { done: false, value: data.choices?.[0]?.delta?.content ?? '' };
44
45
46
					} catch (e) {
						console.error('Error extracting delta from SSE event:', e);
					}
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
				}
			}
		}
	}
}

// streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
// This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
async function* streamLargeDeltasAsRandomChunks(
	iterator: AsyncGenerator<TextStreamUpdate>
): AsyncGenerator<TextStreamUpdate> {
	for await (const textStreamUpdate of iterator) {
		if (textStreamUpdate.done) {
			yield textStreamUpdate;
			return;
		}
		let content = textStreamUpdate.value;
		if (content.length < 5) {
			yield { done: false, value: content };
			continue;
		}
		while (content != '') {
			const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
			const chunk = content.slice(0, chunkSize);
			yield { done: false, value: chunk };
			await sleep(5);
			content = content.slice(chunkSize);
		}
	}
}

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));