ext_server_common.go 11.1 KB
Newer Older
1
2
3
package llm

/*
4
#cgo CFLAGS: -I${SRCDIR}/ext_server -I${SRCDIR}/llama.cpp -I${SRCDIR}/llama.cpp/common -I${SRCDIR}/llama.cpp/examples/server
5
#cgo CFLAGS: -DNDEBUG -DLLAMA_SERVER_LIBRARY=1 -D_XOPEN_SOURCE=600 -DACCELERATE_NEW_LAPACK -DACCELERATE_LAPACK_ILP64
6
7
#cgo CFLAGS: -Wmissing-noreturn -Wextra -Wcast-qual -Wno-unused-function -Wno-array-bounds
#cgo CPPFLAGS: -Ofast -Wextra -Wno-unused-function -Wno-unused-variable -Wno-deprecated-declarations -Wno-unused-but-set-variable
8
9
#cgo darwin CFLAGS: -D_DARWIN_C_SOURCE
#cgo darwin CPPFLAGS:  -DGGML_USE_ACCELERATE
Daniel Hiltgen's avatar
Daniel Hiltgen committed
10
#cgo darwin CPPFLAGS: -DGGML_USE_METAL -DGGML_METAL_NDEBUG
11
#cgo darwin LDFLAGS: -lc++ -framework Accelerate
Daniel Hiltgen's avatar
Daniel Hiltgen committed
12
#cgo darwin LDFLAGS: -framework Foundation -framework Metal -framework MetalKit -framework MetalPerformanceShaders
13
14
15
16
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libcommon.a
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libext_server.a
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libllama.a
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libggml_static.a
17
18
19
#cgo linux CFLAGS: -D_GNU_SOURCE
#cgo linux windows CFLAGS: -DGGML_CUDA_DMMV_X=32 -DGGML_CUDA_MMV_Y=1 -DGGML_CUDA_PEER_MAX_BATCH_SIZE=128 -DGGML_USE_CUBLAS
#cgo linux LDFLAGS: -L/usr/local/cuda/targets/x86_64-linux/lib -L/usr/local/cuda/lib64 -L/usr/local/cuda/targets/x86_64-linux/lib/stubs
20
21
22
23
#cgo linux LDFLAGS: ${SRCDIR}/llama.cpp/build/linux/cpu/lib/libext_server.a
#cgo linux LDFLAGS: ${SRCDIR}/llama.cpp/build/linux/cpu/lib/libcommon.a
#cgo linux LDFLAGS: ${SRCDIR}/llama.cpp/build/linux/cpu/lib/libllama.a
#cgo linux LDFLAGS: ${SRCDIR}/llama.cpp/build/linux/cpu/lib/libggml_static.a
24
25
#cgo linux LDFLAGS: -lrt -ldl -lstdc++ -lm
#cgo linux windows LDFLAGS: -lpthread
26
27

#include <stdlib.h>
28
#include "ext_server.h"
29
30
31
32
33
34
35
36
37

*/
import "C"
import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"log"
38
	"strings"
39
40
41
42
43
44
45
	"sync"
	"time"
	"unsafe"

	"github.com/jmorganca/ollama/api"
)

46
47
48
49
50
51
52
53
54
55
56
57
58
type extServer interface {
	LLM
	llama_server_init(sparams *C.ext_server_params_t, err *C.ext_server_resp_t)
	llama_server_start()
	llama_server_stop()
	llama_server_completion(json_req *C.char, resp *C.ext_server_resp_t)
	llama_server_completion_next_result(task_id C.int, resp *C.ext_server_task_result_t)
	llama_server_completion_cancel(task_id C.int, err *C.ext_server_resp_t)
	llama_server_release_task_result(result *C.ext_server_task_result_t)
	llama_server_tokenize(json_req *C.char, json_resp **C.char, err *C.ext_server_resp_t)
	llama_server_detokenize(json_req *C.char, json_resp **C.char, err *C.ext_server_resp_t)
	llama_server_embedding(json_req *C.char, json_resp **C.char, err *C.ext_server_resp_t)
	llama_server_release_json_resp(json_resp **C.char)
59
60
61
62
63
}

// Note: current implementation does not support concurrent instantiations
var mutex sync.Mutex

64
65
66
67
68
69
func newExtServerResp(len C.size_t) C.ext_server_resp_t {
	var resp C.ext_server_resp_t
	resp.msg_len = len
	bytes := make([]byte, len)
	resp.msg = (*C.char)(C.CBytes(bytes))
	return resp
70
71
}

72
73
74
75
76
func freeExtServerResp(resp C.ext_server_resp_t) {
	if resp.msg_len == 0 {
		return
	}
	C.free(unsafe.Pointer(resp.msg))
77
78
}

79
80
func extServerResponseToErr(resp C.ext_server_resp_t) error {
	return fmt.Errorf(C.GoString(resp.msg))
81
82
}

83
func newExtServer(server extServer, model string, adapters, projectors []string, opts api.Options) (extServer, error) {
84
85
86
87
	if !mutex.TryLock() {
		log.Printf("concurrent llm servers not yet supported, waiting for prior server to complete")
		mutex.Lock()
	}
88

89
	var sparams C.ext_server_params_t
90
91
92
93
94
95
	sparams.model = C.CString(model)
	defer C.free(unsafe.Pointer(sparams.model))

	sparams.embedding = true
	sparams.n_ctx = C.uint(opts.NumCtx)
	sparams.n_batch = C.uint(opts.NumBatch)
96
	sparams.n_gpu_layers = C.int(opts.NumGPU)
97
	sparams.main_gpu = C.int(opts.MainGPU)
98
	sparams.n_parallel = 1 // TODO - wire up concurrency
99
100
101
102

	// Always use the value encoded in the model
	sparams.rope_freq_base = 0.0
	sparams.rope_freq_scale = 0.0
103
104
105
106
	sparams.memory_f16 = C.bool(opts.F16KV)
	sparams.use_mlock = C.bool(opts.UseMLock)
	sparams.use_mmap = C.bool(opts.UseMMap)
	sparams.numa = C.bool(opts.UseNUMA)
107
108
109

	sparams.lora_adapters = nil
	for i := 0; i < len(adapters); i++ {
110
		la := (*C.ext_server_lora_adapter_t)(C.malloc(C.sizeof_ext_server_lora_adapter_t))
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
		defer C.free(unsafe.Pointer(la))
		la.adapter = C.CString(adapters[i])
		defer C.free(unsafe.Pointer(la.adapter))
		la.scale = C.float(1.0) // TODO expose scale/weights up through ollama UX
		la.next = nil
		if i == 0 {
			sparams.lora_adapters = la
		} else {
			tmp := sparams.lora_adapters
			for ; tmp.next != nil; tmp = tmp.next {
			}
			tmp.next = la
		}
	}

126
127
128
129
130
131
132
	if len(projectors) > 0 {
		// TODO: applying multiple projectors is not supported by the llama.cpp server yet
		sparams.mmproj = C.CString(projectors[0])
		defer C.free(unsafe.Pointer(sparams.mmproj))
	} else {
		sparams.mmproj = nil
	}
133

134
	sparams.n_threads = C.uint(opts.NumThread)
135
136

	log.Printf("Initializing internal llama server")
137
138
139
140
141
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	server.llama_server_init(&sparams, &resp)
	if resp.id < 0 {
		return nil, extServerResponseToErr(resp)
142
143
144
	}

	log.Printf("Starting internal llama main loop")
145
	server.llama_server_start()
146
147
148
	return server, nil
}

149
func predict(ctx context.Context, llm extServer, predict PredictOpts, fn func(PredictResult)) error {
150
151
152
153
154
155
156
157
158
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	var imageData []ImageData
	if len(predict.Images) > 0 {
		for cnt, i := range predict.Images {
			imageData = append(imageData, ImageData{Data: i, ID: cnt})
		}
	}
	log.Printf("loaded %d images", len(imageData))
159
160
161
162

	request := map[string]any{
		"prompt":            predict.Prompt,
		"stream":            true,
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
		"n_predict":         predict.Options.NumPredict,
		"n_keep":            predict.Options.NumKeep,
		"temperature":       predict.Options.Temperature,
		"top_k":             predict.Options.TopK,
		"top_p":             predict.Options.TopP,
		"tfs_z":             predict.Options.TFSZ,
		"typical_p":         predict.Options.TypicalP,
		"repeat_last_n":     predict.Options.RepeatLastN,
		"repeat_penalty":    predict.Options.RepeatPenalty,
		"presence_penalty":  predict.Options.PresencePenalty,
		"frequency_penalty": predict.Options.FrequencyPenalty,
		"mirostat":          predict.Options.Mirostat,
		"mirostat_tau":      predict.Options.MirostatTau,
		"mirostat_eta":      predict.Options.MirostatEta,
		"penalize_nl":       predict.Options.PenalizeNewline,
		"seed":              predict.Options.Seed,
		"stop":              predict.Options.Stop,
180
181
		"image_data":        imageData,
		"cache_prompt":      true,
182
183
184
185
186
187
	}

	if predict.Format == "json" {
		request["grammar"] = jsonGrammar
	}

188
189
190
191
192
193
	retryDelay := 100 * time.Microsecond
	for retries := 0; retries < maxRetries; retries++ {
		if retries > 0 {
			time.Sleep(retryDelay) // wait before retrying
			retryDelay *= 2        // exponential backoff
		}
194

195
196
197
198
		// Handling JSON marshaling with special characters unescaped.
		buffer := &bytes.Buffer{}
		enc := json.NewEncoder(buffer)
		enc.SetEscapeHTML(false)
199

200
201
202
		if err := enc.Encode(request); err != nil {
			return fmt.Errorf("failed to marshal data: %w", err)
		}
203

204
205
		req := C.CString(buffer.String())
		defer C.free(unsafe.Pointer(req))
206

207
208
209
210
		llm.llama_server_completion(req, &resp)
		if resp.id < 0 {
			return extServerResponseToErr(resp)
		}
211

212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
		retryNeeded := false
	out:
		for {
			select {
			case <-ctx.Done():
				// This handles the request cancellation
				llm.llama_server_completion_cancel(resp.id, &resp)
				if resp.id < 0 {
					return extServerResponseToErr(resp)
				} else {
					return nil
				}
			default:
				var result C.ext_server_task_result_t
				llm.llama_server_completion_next_result(resp.id, &result)
				json_resp := C.GoString(result.json_resp)
				llm.llama_server_release_task_result(&result)

				var p prediction
				if err := json.Unmarshal([]byte(json_resp), &p); err != nil {
					llm.llama_server_completion_cancel(resp.id, &resp)
					if resp.id < 0 {
						return fmt.Errorf("error unmarshaling llm prediction response: %w and cancel %s", err, C.GoString(resp.msg))
					} else {
						return fmt.Errorf("error unmarshaling llm prediction response: %w", err)
					}
				}

				if bool(result.error) && strings.Contains(json_resp, "slot unavailable") {
					retryNeeded = true
					// task will already be canceled
					break out
				}

				if p.Content != "" {
					fn(PredictResult{
						Content: p.Content,
					})
				}

				if p.Stop {
					fn(PredictResult{
						Done:               true,
						PromptEvalCount:    p.Timings.PromptN,
						PromptEvalDuration: parseDurationMs(p.Timings.PromptMS),
						EvalCount:          p.Timings.PredictedN,
						EvalDuration:       parseDurationMs(p.Timings.PredictedMS),
					})
					return nil
				}
262
263
			}
		}
264
265
266
		if !retryNeeded {
			return nil // success
		}
267
268
	}

269
270
271
272
273
	// should never reach here ideally
	return fmt.Errorf("max retries exceeded")
}

func encode(llm extServer, ctx context.Context, prompt string) ([]int, error) {
274
275
276
277
278
279
	data, err := json.Marshal(TokenizeRequest{Content: prompt})
	if err != nil {
		return nil, fmt.Errorf("marshaling encode data: %w", err)
	}
	req := C.CString(string(data))
	defer C.free(unsafe.Pointer(req))
280
281
282
283
284
285
	var json_resp *C.char
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	llm.llama_server_tokenize(req, &json_resp, &resp)
	if resp.id < 0 {
		return nil, extServerResponseToErr(resp)
286
	}
287
	defer llm.llama_server_release_json_resp(&json_resp)
288
289

	var encoded TokenizeResponse
290
	if err2 := json.Unmarshal([]byte(C.GoString(json_resp)), &encoded); err2 != nil {
291
292
293
294
295
296
		return nil, fmt.Errorf("unmarshal encode response: %w", err2)
	}

	return encoded.Tokens, err
}

297
func decode(llm extServer, ctx context.Context, tokens []int) (string, error) {
298
299
300
301
302
303
304
305
306
307
	if len(tokens) == 0 {
		return "", nil
	}
	data, err := json.Marshal(DetokenizeRequest{Tokens: tokens})
	if err != nil {
		return "", fmt.Errorf("marshaling decode data: %w", err)
	}

	req := C.CString(string(data))
	defer C.free(unsafe.Pointer(req))
308
309
310
311
312
313
	var json_resp *C.char
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	llm.llama_server_detokenize(req, &json_resp, &resp)
	if resp.id < 0 {
		return "", extServerResponseToErr(resp)
314
	}
315
	defer llm.llama_server_release_json_resp(&json_resp)
316
317

	var decoded DetokenizeResponse
318
	if err2 := json.Unmarshal([]byte(C.GoString(json_resp)), &decoded); err2 != nil {
319
320
321
322
323
324
		return "", fmt.Errorf("unmarshal encode response: %w", err2)
	}

	return decoded.Content, err
}

325
func embedding(llm extServer, ctx context.Context, input string) ([]float64, error) {
326
327
328
329
330
331
332
	data, err := json.Marshal(TokenizeRequest{Content: input})
	if err != nil {
		return nil, fmt.Errorf("error marshaling embed data: %w", err)
	}

	req := C.CString(string(data))
	defer C.free(unsafe.Pointer(req))
333
334
335
336
337
338
	var json_resp *C.char
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	llm.llama_server_embedding(req, &json_resp, &resp)
	if resp.id < 0 {
		return nil, extServerResponseToErr(resp)
339
	}
340
	defer llm.llama_server_release_json_resp(&json_resp)
341
342

	var embedding EmbeddingResponse
343
	if err := json.Unmarshal([]byte(C.GoString(json_resp)), &embedding); err != nil {
344
345
346
347
348
349
		return nil, fmt.Errorf("unmarshal tokenize response: %w", err)
	}

	return embedding.Embedding, nil
}

350
351
func close(llm extServer) {
	llm.llama_server_stop()
352
353
	mutex.Unlock()
}