ext_server_common.go 10.7 KB
Newer Older
1
2
3
package llm

/*
4
#cgo CFLAGS: -I${SRCDIR}/ext_server -I${SRCDIR}/llama.cpp -I${SRCDIR}/llama.cpp/common -I${SRCDIR}/llama.cpp/examples/server
5
#cgo CFLAGS: -DNDEBUG -DLLAMA_SERVER_LIBRARY=1 -D_XOPEN_SOURCE=600 -DACCELERATE_NEW_LAPACK -DACCELERATE_LAPACK_ILP64
6
7
#cgo CFLAGS: -Wmissing-noreturn -Wextra -Wcast-qual -Wno-unused-function -Wno-array-bounds
#cgo CPPFLAGS: -Ofast -Wextra -Wno-unused-function -Wno-unused-variable -Wno-deprecated-declarations -Wno-unused-but-set-variable
8
9
#cgo darwin CFLAGS: -D_DARWIN_C_SOURCE
#cgo darwin CPPFLAGS:  -DGGML_USE_ACCELERATE
Daniel Hiltgen's avatar
Daniel Hiltgen committed
10
#cgo darwin CPPFLAGS: -DGGML_USE_METAL -DGGML_METAL_NDEBUG
11
#cgo darwin LDFLAGS: -lc++ -framework Accelerate
Daniel Hiltgen's avatar
Daniel Hiltgen committed
12
#cgo darwin LDFLAGS: -framework Foundation -framework Metal -framework MetalKit -framework MetalPerformanceShaders
13
14
15
16
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libcommon.a
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libext_server.a
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libllama.a
#cgo darwin LDFLAGS: ${SRCDIR}/llama.cpp/build/darwin/metal/lib/libggml_static.a
17
#cgo linux CFLAGS: -D_GNU_SOURCE
18
19
#cgo linux LDFLAGS: -lrt -ldl -lstdc++ -lm
#cgo linux windows LDFLAGS: -lpthread
20
21

#include <stdlib.h>
22
#include "ext_server.h"
23
24
25
26
27
28
29
30
31

*/
import "C"
import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"log"
32
	"strings"
33
34
35
36
37
38
39
	"sync"
	"time"
	"unsafe"

	"github.com/jmorganca/ollama/api"
)

40
41
// TODO switch Linux to always be dynamic
// If that works out, then look at the impact of doing the same for Mac
42
43
44
45
46
47
48
49
50
51
52
53
54
type extServer interface {
	LLM
	llama_server_init(sparams *C.ext_server_params_t, err *C.ext_server_resp_t)
	llama_server_start()
	llama_server_stop()
	llama_server_completion(json_req *C.char, resp *C.ext_server_resp_t)
	llama_server_completion_next_result(task_id C.int, resp *C.ext_server_task_result_t)
	llama_server_completion_cancel(task_id C.int, err *C.ext_server_resp_t)
	llama_server_release_task_result(result *C.ext_server_task_result_t)
	llama_server_tokenize(json_req *C.char, json_resp **C.char, err *C.ext_server_resp_t)
	llama_server_detokenize(json_req *C.char, json_resp **C.char, err *C.ext_server_resp_t)
	llama_server_embedding(json_req *C.char, json_resp **C.char, err *C.ext_server_resp_t)
	llama_server_release_json_resp(json_resp **C.char)
55
56
57
58
59
}

// Note: current implementation does not support concurrent instantiations
var mutex sync.Mutex

60
61
62
63
64
65
func newExtServerResp(len C.size_t) C.ext_server_resp_t {
	var resp C.ext_server_resp_t
	resp.msg_len = len
	bytes := make([]byte, len)
	resp.msg = (*C.char)(C.CBytes(bytes))
	return resp
66
67
}

68
69
70
71
72
func freeExtServerResp(resp C.ext_server_resp_t) {
	if resp.msg_len == 0 {
		return
	}
	C.free(unsafe.Pointer(resp.msg))
73
74
}

75
76
func extServerResponseToErr(resp C.ext_server_resp_t) error {
	return fmt.Errorf(C.GoString(resp.msg))
77
78
}

79
func newExtServer(server extServer, model string, adapters, projectors []string, opts api.Options) (extServer, error) {
80
81
82
83
	if !mutex.TryLock() {
		log.Printf("concurrent llm servers not yet supported, waiting for prior server to complete")
		mutex.Lock()
	}
84

85
	var sparams C.ext_server_params_t
86
87
88
89
90
91
	sparams.model = C.CString(model)
	defer C.free(unsafe.Pointer(sparams.model))

	sparams.embedding = true
	sparams.n_ctx = C.uint(opts.NumCtx)
	sparams.n_batch = C.uint(opts.NumBatch)
92
	sparams.n_gpu_layers = C.int(opts.NumGPU)
93
	sparams.main_gpu = C.int(opts.MainGPU)
94
	sparams.n_parallel = 1 // TODO - wire up concurrency
95
96
97
98

	// Always use the value encoded in the model
	sparams.rope_freq_base = 0.0
	sparams.rope_freq_scale = 0.0
99
100
101
102
	sparams.memory_f16 = C.bool(opts.F16KV)
	sparams.use_mlock = C.bool(opts.UseMLock)
	sparams.use_mmap = C.bool(opts.UseMMap)
	sparams.numa = C.bool(opts.UseNUMA)
103
104
105

	sparams.lora_adapters = nil
	for i := 0; i < len(adapters); i++ {
106
		la := (*C.ext_server_lora_adapter_t)(C.malloc(C.sizeof_ext_server_lora_adapter_t))
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
		defer C.free(unsafe.Pointer(la))
		la.adapter = C.CString(adapters[i])
		defer C.free(unsafe.Pointer(la.adapter))
		la.scale = C.float(1.0) // TODO expose scale/weights up through ollama UX
		la.next = nil
		if i == 0 {
			sparams.lora_adapters = la
		} else {
			tmp := sparams.lora_adapters
			for ; tmp.next != nil; tmp = tmp.next {
			}
			tmp.next = la
		}
	}

122
123
124
125
126
127
128
	if len(projectors) > 0 {
		// TODO: applying multiple projectors is not supported by the llama.cpp server yet
		sparams.mmproj = C.CString(projectors[0])
		defer C.free(unsafe.Pointer(sparams.mmproj))
	} else {
		sparams.mmproj = nil
	}
129

130
	sparams.n_threads = C.uint(opts.NumThread)
131
132

	log.Printf("Initializing internal llama server")
133
134
135
136
137
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	server.llama_server_init(&sparams, &resp)
	if resp.id < 0 {
		return nil, extServerResponseToErr(resp)
138
139
140
	}

	log.Printf("Starting internal llama main loop")
141
	server.llama_server_start()
142
143
144
	return server, nil
}

145
func predict(ctx context.Context, llm extServer, predict PredictOpts, fn func(PredictResult)) error {
146
147
148
149
150
151
152
153
154
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	var imageData []ImageData
	if len(predict.Images) > 0 {
		for cnt, i := range predict.Images {
			imageData = append(imageData, ImageData{Data: i, ID: cnt})
		}
	}
	log.Printf("loaded %d images", len(imageData))
155
156
157
158

	request := map[string]any{
		"prompt":            predict.Prompt,
		"stream":            true,
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
		"n_predict":         predict.Options.NumPredict,
		"n_keep":            predict.Options.NumKeep,
		"temperature":       predict.Options.Temperature,
		"top_k":             predict.Options.TopK,
		"top_p":             predict.Options.TopP,
		"tfs_z":             predict.Options.TFSZ,
		"typical_p":         predict.Options.TypicalP,
		"repeat_last_n":     predict.Options.RepeatLastN,
		"repeat_penalty":    predict.Options.RepeatPenalty,
		"presence_penalty":  predict.Options.PresencePenalty,
		"frequency_penalty": predict.Options.FrequencyPenalty,
		"mirostat":          predict.Options.Mirostat,
		"mirostat_tau":      predict.Options.MirostatTau,
		"mirostat_eta":      predict.Options.MirostatEta,
		"penalize_nl":       predict.Options.PenalizeNewline,
		"seed":              predict.Options.Seed,
		"stop":              predict.Options.Stop,
176
177
		"image_data":        imageData,
		"cache_prompt":      true,
178
179
180
181
182
183
	}

	if predict.Format == "json" {
		request["grammar"] = jsonGrammar
	}

184
185
186
187
188
189
	retryDelay := 100 * time.Microsecond
	for retries := 0; retries < maxRetries; retries++ {
		if retries > 0 {
			time.Sleep(retryDelay) // wait before retrying
			retryDelay *= 2        // exponential backoff
		}
190

191
192
193
194
		// Handling JSON marshaling with special characters unescaped.
		buffer := &bytes.Buffer{}
		enc := json.NewEncoder(buffer)
		enc.SetEscapeHTML(false)
195

196
197
198
		if err := enc.Encode(request); err != nil {
			return fmt.Errorf("failed to marshal data: %w", err)
		}
199

200
201
		req := C.CString(buffer.String())
		defer C.free(unsafe.Pointer(req))
202

203
204
205
206
		llm.llama_server_completion(req, &resp)
		if resp.id < 0 {
			return extServerResponseToErr(resp)
		}
207

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
		retryNeeded := false
	out:
		for {
			select {
			case <-ctx.Done():
				// This handles the request cancellation
				llm.llama_server_completion_cancel(resp.id, &resp)
				if resp.id < 0 {
					return extServerResponseToErr(resp)
				} else {
					return nil
				}
			default:
				var result C.ext_server_task_result_t
				llm.llama_server_completion_next_result(resp.id, &result)
				json_resp := C.GoString(result.json_resp)
				llm.llama_server_release_task_result(&result)

				var p prediction
				if err := json.Unmarshal([]byte(json_resp), &p); err != nil {
					llm.llama_server_completion_cancel(resp.id, &resp)
					if resp.id < 0 {
						return fmt.Errorf("error unmarshaling llm prediction response: %w and cancel %s", err, C.GoString(resp.msg))
					} else {
						return fmt.Errorf("error unmarshaling llm prediction response: %w", err)
					}
				}

				if bool(result.error) && strings.Contains(json_resp, "slot unavailable") {
					retryNeeded = true
					// task will already be canceled
					break out
				}

				if p.Content != "" {
					fn(PredictResult{
						Content: p.Content,
					})
				}

				if p.Stop {
					fn(PredictResult{
						Done:               true,
						PromptEvalCount:    p.Timings.PromptN,
						PromptEvalDuration: parseDurationMs(p.Timings.PromptMS),
						EvalCount:          p.Timings.PredictedN,
						EvalDuration:       parseDurationMs(p.Timings.PredictedMS),
					})
					return nil
				}
258
259
			}
		}
260
261
262
		if !retryNeeded {
			return nil // success
		}
263
264
	}

265
266
267
268
269
	// should never reach here ideally
	return fmt.Errorf("max retries exceeded")
}

func encode(llm extServer, ctx context.Context, prompt string) ([]int, error) {
270
271
272
273
274
275
	data, err := json.Marshal(TokenizeRequest{Content: prompt})
	if err != nil {
		return nil, fmt.Errorf("marshaling encode data: %w", err)
	}
	req := C.CString(string(data))
	defer C.free(unsafe.Pointer(req))
276
277
278
279
280
281
	var json_resp *C.char
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	llm.llama_server_tokenize(req, &json_resp, &resp)
	if resp.id < 0 {
		return nil, extServerResponseToErr(resp)
282
	}
283
	defer llm.llama_server_release_json_resp(&json_resp)
284
285

	var encoded TokenizeResponse
286
	if err2 := json.Unmarshal([]byte(C.GoString(json_resp)), &encoded); err2 != nil {
287
288
289
290
291
292
		return nil, fmt.Errorf("unmarshal encode response: %w", err2)
	}

	return encoded.Tokens, err
}

293
func decode(llm extServer, ctx context.Context, tokens []int) (string, error) {
294
295
296
297
298
299
300
301
302
303
	if len(tokens) == 0 {
		return "", nil
	}
	data, err := json.Marshal(DetokenizeRequest{Tokens: tokens})
	if err != nil {
		return "", fmt.Errorf("marshaling decode data: %w", err)
	}

	req := C.CString(string(data))
	defer C.free(unsafe.Pointer(req))
304
305
306
307
308
309
	var json_resp *C.char
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	llm.llama_server_detokenize(req, &json_resp, &resp)
	if resp.id < 0 {
		return "", extServerResponseToErr(resp)
310
	}
311
	defer llm.llama_server_release_json_resp(&json_resp)
312
313

	var decoded DetokenizeResponse
314
	if err2 := json.Unmarshal([]byte(C.GoString(json_resp)), &decoded); err2 != nil {
315
316
317
318
319
320
		return "", fmt.Errorf("unmarshal encode response: %w", err2)
	}

	return decoded.Content, err
}

321
func embedding(llm extServer, ctx context.Context, input string) ([]float64, error) {
322
323
324
325
326
327
328
	data, err := json.Marshal(TokenizeRequest{Content: input})
	if err != nil {
		return nil, fmt.Errorf("error marshaling embed data: %w", err)
	}

	req := C.CString(string(data))
	defer C.free(unsafe.Pointer(req))
329
330
331
332
333
334
	var json_resp *C.char
	resp := newExtServerResp(128)
	defer freeExtServerResp(resp)
	llm.llama_server_embedding(req, &json_resp, &resp)
	if resp.id < 0 {
		return nil, extServerResponseToErr(resp)
335
	}
336
	defer llm.llama_server_release_json_resp(&json_resp)
337
338

	var embedding EmbeddingResponse
339
	if err := json.Unmarshal([]byte(C.GoString(json_resp)), &embedding); err != nil {
340
341
342
343
344
345
		return nil, fmt.Errorf("unmarshal tokenize response: %w", err)
	}

	return embedding.Embedding, nil
}

346
347
func close(llm extServer) {
	llm.llama_server_stop()
348
349
	mutex.Unlock()
}