server.rs 18.5 KB
Newer Older
1
2
/// HTTP Server logic
use crate::infer::{InferError, InferStreamResponse};
3
use crate::{
4
5
6
    CompatGenerateRequest, Details, ErrorResponse, FinishReason, GenerateParameters,
    GenerateRequest, GenerateResponse, Infer, PrefillToken, StreamDetails, StreamResponse, Token,
    Validation,
7
};
Olivier Dehaene's avatar
Olivier Dehaene committed
8
use axum::extract::Extension;
9
use axum::http::{HeaderMap, Method, StatusCode};
10
use axum::response::sse::{Event, KeepAlive, Sse};
11
use axum::response::IntoResponse;
Olivier Dehaene's avatar
Olivier Dehaene committed
12
use axum::routing::{get, post};
13
use axum::{http, Json, Router};
14
use axum_tracing_opentelemetry::opentelemetry_tracing_layer;
15
use futures::Stream;
16
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
17
use std::convert::Infallible;
Olivier Dehaene's avatar
Olivier Dehaene committed
18
use std::net::SocketAddr;
19
use text_generation_client::ShardedClient;
Olivier Dehaene's avatar
Olivier Dehaene committed
20
use tokenizers::Tokenizer;
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
21
use tokio::signal;
Olivier Dehaene's avatar
Olivier Dehaene committed
22
use tokio::time::Instant;
23
use tokio_stream::StreamExt;
24
use tower_http::cors::{AllowOrigin, CorsLayer};
25
use tracing::{info_span, instrument, Instrument};
26
27
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
Olivier Dehaene's avatar
Olivier Dehaene committed
28

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/// Compatibility route with api-inference and AzureML
#[instrument(skip(infer))]
async fn compat_generate(
    infer: Extension<Infer>,
    req: Json<CompatGenerateRequest>,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
    // switch on stream
    let req = req.0;
    if req.stream {
        Ok(generate_stream(infer, Json(req.into()))
            .await
            .into_response())
    } else {
        let (headers, generation) = generate(infer, Json(req.into())).await?;
        // wrap generation inside a Vec to match api-inference
        Ok((headers, Json(vec![generation.0])).into_response())
    }
}

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
48
/// Health check method
49
50
#[instrument(skip(infer))]
async fn health(infer: Extension<Infer>) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
51
52
    // TODO: while this is the best health check we can do, it is a bit on the heavy side and might
    //       be a bit too slow for a health check.
53
    //       What we should do instead is check if the gRPC channels are still healthy.
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
54
55

    // Send a small inference request
56
57
58
59
    infer
        .generate(GenerateRequest {
            inputs: "liveness".to_string(),
            parameters: GenerateParameters {
60
61
62
63
                temperature: None,
                repetition_penalty: None,
                top_k: None,
                top_p: None,
64
65
                do_sample: false,
                max_new_tokens: 1,
66
                stop: Vec::new(),
67
68
                details: false,
                seed: None,
Olivier Dehaene's avatar
Olivier Dehaene committed
69
            },
70
        })
Olivier Dehaene's avatar
Olivier Dehaene committed
71
72
        .await?;
    Ok(())
Olivier Dehaene's avatar
Olivier Dehaene committed
73
74
}

75
76
77
78
79
80
81
/// Generate tokens
#[utoipa::path(
    post,
    tag = "Text Generation Inference",
    path = "/generate",
    request_body = GenerateRequest,
    responses(
82
83
        (status = 200, description = "Generated Text", body = GenerateResponse),
        (status = 424, description = "Generation Error", body = ErrorResponse,
84
            example = json!({"error": "Request failed during generation"})),
85
        (status = 429, description = "Model is overloaded", body = ErrorResponse,
86
            example = json!({"error": "Model is overloaded"})),
87
        (status = 422, description = "Input validation error", body = ErrorResponse,
88
            example = json!({"error": "Input validation error"})),
89
        (status = 500, description = "Incomplete generation", body = ErrorResponse,
90
91
92
            example = json!({"error": "Incomplete generation"})),
    )
)]
93
#[instrument(
94
    skip(infer),
95
96
97
98
99
    fields(
        total_time,
        validation_time,
        queue_time,
        inference_time,
100
        time_per_token,
101
        seed,
102
103
    )
)]
Olivier Dehaene's avatar
Olivier Dehaene committed
104
async fn generate(
105
    infer: Extension<Infer>,
Olivier Dehaene's avatar
Olivier Dehaene committed
106
    req: Json<GenerateRequest>,
107
) -> Result<(HeaderMap, Json<GenerateResponse>), (StatusCode, Json<ErrorResponse>)> {
108
    let span = tracing::Span::current();
109
    let start_time = Instant::now();
110
111

    // Inference
112
    let details = req.0.parameters.details;
113
    let response = infer.generate(req.0).await?;
Olivier Dehaene's avatar
Olivier Dehaene committed
114

OlivierDehaene's avatar
OlivierDehaene committed
115
116
    // Token details
    let details = match details {
117
        true => Some(Details {
118
            finish_reason: FinishReason::from(response.generated_text.finish_reason),
119
120
121
122
123
            generated_tokens: response.generated_text.generated_tokens,
            prefill: Some(response.prefill),
            tokens: Some(response.tokens),
            seed: response.generated_text.seed,
        }),
OlivierDehaene's avatar
OlivierDehaene committed
124
125
126
        false => None,
    };

127
128
129
130
    // Timings
    let total_time = start_time.elapsed();
    let validation_time = response.queued - start_time;
    let queue_time = response.start - response.queued;
131
132
    let inference_time = Instant::now() - response.start;
    let time_per_token = inference_time / response.generated_text.generated_tokens;
133
134
135
136
137
138
139
140
141
142
143
144
145
146

    // Headers
    let mut headers = HeaderMap::new();
    headers.insert(
        "x-total-time",
        total_time.as_millis().to_string().parse().unwrap(),
    );
    headers.insert(
        "x-validation-time",
        validation_time.as_millis().to_string().parse().unwrap(),
    );
    headers.insert(
        "x-queue-time",
        queue_time.as_millis().to_string().parse().unwrap(),
Olivier Dehaene's avatar
Olivier Dehaene committed
147
    );
148
149
150
151
152
153
154
155
156
157
    headers.insert(
        "x-inference-time",
        inference_time.as_millis().to_string().parse().unwrap(),
    );
    headers.insert(
        "x-time-per-token",
        time_per_token.as_millis().to_string().parse().unwrap(),
    );

    // Tracing metadata
158
159
160
161
162
    span.record("total_time", format!("{total_time:?}"));
    span.record("validation_time", format!("{validation_time:?}"));
    span.record("queue_time", format!("{queue_time:?}"));
    span.record("inference_time", format!("{inference_time:?}"));
    span.record("time_per_token", format!("{time_per_token:?}"));
163
164
    span.record("seed", format!("{:?}", response.generated_text.seed));
    tracing::info!("Output: {}", response.generated_text.text);
Olivier Dehaene's avatar
Olivier Dehaene committed
165

166
167
168
169
170
171
172
173
174
175
176
177
    // Metrics
    metrics::increment_counter!("tgi_request_success");
    metrics::histogram!("tgi_request_duration", total_time);
    metrics::histogram!("tgi_request_validation_duration", validation_time);
    metrics::histogram!("tgi_request_queue_duration", queue_time);
    metrics::histogram!("tgi_request_inference_duration", inference_time);
    metrics::histogram!("tgi_request_mean_time_per_token_duration", time_per_token);
    metrics::histogram!(
        "tgi_request_generated_tokens",
        response.generated_text.generated_tokens as f64
    );

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
178
    // Send response
179
    let response = GenerateResponse {
180
        generated_text: response.generated_text.text,
OlivierDehaene's avatar
OlivierDehaene committed
181
        details,
182
    };
183
    Ok((headers, Json(response)))
Olivier Dehaene's avatar
Olivier Dehaene committed
184
185
}

Yannic Kilcher's avatar
Yannic Kilcher committed
186
/// Generate a stream of token using Server-Sent Events
187
188
189
190
191
192
#[utoipa::path(
    post,
    tag = "Text Generation Inference",
    path = "/generate_stream",
    request_body = GenerateRequest,
    responses(
193
194
195
        (status = 200, description = "Generated Text", body = StreamResponse,
            content_type="text/event-stream"),
        (status = 424, description = "Generation Error", body = ErrorResponse,
196
            example = json!({"error": "Request failed during generation"}),
197
198
            content_type="text/event-stream"),
        (status = 429, description = "Model is overloaded", body = ErrorResponse,
199
            example = json!({"error": "Model is overloaded"}),
200
201
            content_type="text/event-stream"),
        (status = 422, description = "Input validation error", body = ErrorResponse,
202
            example = json!({"error": "Input validation error"}),
203
204
            content_type="text/event-stream"),
        (status = 500, description = "Incomplete generation", body = ErrorResponse,
205
            example = json!({"error": "Incomplete generation"}),
206
            content_type="text/event-stream"),
207
208
    )
)]
209
210
211
212
213
214
215
#[instrument(
    skip(infer),
    fields(
        total_time,
        validation_time,
        queue_time,
        inference_time,
216
217
        time_per_token,
        seed,
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    )
)]
async fn generate_stream(
    infer: Extension<Infer>,
    req: Json<GenerateRequest>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let span = tracing::Span::current();
    let start_time = Instant::now();

    let stream = async_stream::stream! {
        // Inference
        let mut end_reached = false;
        let mut error = false;
        let details = req.0.parameters.details;

233
        match infer.generate_stream(req.0).instrument(info_span!(parent: &span, "async_stream")).await {
234
            Ok(mut response_stream) => {
Yannic Kilcher's avatar
Yannic Kilcher committed
235
                // Server-Sent Event stream
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
                while let Some(response) = response_stream.next().await {
                    match response {
                        Ok(response) => {
                            match response {
                                // Prefill is ignored
                                InferStreamResponse::Prefill(_) => {}
                                // Yield event for every new token
                                InferStreamResponse::Token(token) => {
                                    // StreamResponse
                                    let stream_token = StreamResponse {
                                        token,
                                        generated_text: None,
                                        details: None,
                                    };

                                    yield Ok(Event::default().json_data(stream_token).unwrap())
                                }
                                // Yield event for last token and compute timings
                                InferStreamResponse::End {
                                    token,
                                    generated_text,
                                    start,
                                    queued,
                                } => {
                                    // Token details
                                    let details = match details {
262
263
                                        true => Some(StreamDetails {
                                            finish_reason: FinishReason::from(generated_text.finish_reason),
264
265
266
267
268
269
270
271
272
273
274
275
276
277
                                            generated_tokens: generated_text.generated_tokens,
                                            seed: generated_text.seed,
                                        }),
                                        false => None,
                                    };

                                    // Timings
                                    let total_time = start_time.elapsed();
                                    let validation_time = queued - start_time;
                                    let queue_time = start - queued;
                                    let inference_time = Instant::now() - start;
                                    let time_per_token = inference_time / generated_text.generated_tokens;

                                    // Tracing metadata
278
279
280
281
282
                                    span.record("total_time", format!("{total_time:?}"));
                                    span.record("validation_time", format!("{validation_time:?}"));
                                    span.record("queue_time", format!("{queue_time:?}"));
                                    span.record("inference_time", format!("{inference_time:?}"));
                                    span.record("time_per_token", format!("{time_per_token:?}"));
283
                                    span.record("seed", format!("{:?}", generated_text.seed));
284
285
                                    tracing::info!(parent: &span, "Output: {}", generated_text.text);

286
287
288
289
290
291
292
293
294
                                    // Metrics
                                    metrics::increment_counter!("tgi_request_success");
                                    metrics::histogram!("tgi_request_duration", total_time);
                                    metrics::histogram!("tgi_request_validation_duration", validation_time);
                                    metrics::histogram!("tgi_request_queue_duration", queue_time);
                                    metrics::histogram!("tgi_request_inference_duration", inference_time);
                                    metrics::histogram!("tgi_request_mean_time_per_token_duration", time_per_token);
                                    metrics::histogram!("tgi_request_generated_tokens", generated_text.generated_tokens as f64);

295
296
297
298
299
300
301
302
303
304
305
306
                                    // StreamResponse
                                    end_reached = true;
                                    let stream_token = StreamResponse {
                                        token,
                                        generated_text: Some(generated_text.text),
                                        details
                                    };

                                    yield Ok(Event::default().json_data(stream_token).unwrap())
                                }
                            }
                        }
307
                        // yield error
308
309
310
311
312
313
314
                        Err(err) => {
                            error = true;
                            yield Ok(Event::from(err))
                        }
                    }
                }
            },
315
            // yield error
316
317
318
319
320
321
322
323
324
            Err(err) => {
                error = true;
                yield Ok(Event::from(err))
            }
        }
        // Check if generation reached the end
        // Skip if we already sent an error
        if !end_reached && !error {
            let err = InferError::IncompleteGeneration;
325
            metrics::increment_counter!("tgi_request_failure", "err" => "incomplete");
326
            tracing::error!("{err}");
327
328
329
330
331
332
333
            yield Ok(Event::from(err))
        }
    };

    Sse::new(stream).keep_alive(KeepAlive::default())
}

334
335
336
337
338
339
340
341
342
343
344
/// Prometheus metrics scrape endpoint
#[utoipa::path(
    get,
    tag = "Text Generation Inference",
    path = "/metrics",
    responses((status = 200, description = "Prometheus Metrics", body = String))
)]
async fn metrics(prom_handle: Extension<PrometheusHandle>) -> String {
    prom_handle.render()
}

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
345
346
347
348
/// Serving method
#[allow(clippy::too_many_arguments)]
pub async fn run(
    max_concurrent_requests: usize,
349
    max_stop_sequences: usize,
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
350
    max_input_length: usize,
351
    max_total_tokens: usize,
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
352
    max_batch_size: usize,
353
    max_waiting_tokens: usize,
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
354
355
356
357
    client: ShardedClient,
    tokenizer: Tokenizer,
    validation_workers: usize,
    addr: SocketAddr,
358
    allow_origin: Option<AllowOrigin>,
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
359
) {
360
361
362
363
364
365
    // OpenAPI documentation
    #[derive(OpenApi)]
    #[openapi(
        paths(
            generate,
            generate_stream,
366
            metrics,
367
368
369
370
371
        ),
        components(
            schemas(
                GenerateRequest,
                GenerateParameters,
372
                PrefillToken,
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
                Token,
                GenerateResponse,
                Details,
                FinishReason,
                StreamResponse,
                StreamDetails,
                ErrorResponse,
            )
        ),
        tags(
            (name = "Text Generation Inference", description = "Hugging Face Text Generation Inference API")
        ),
        info(
            title = "Text Generation Inference",
            license(
                name = "Apache 2.0",
                url = "https://www.apache.org/licenses/LICENSE-2.0"
            )
        )
    )]
    struct ApiDoc;

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
395
    // Create state
396
397
398
399
400
401
402
    let validation = Validation::new(
        validation_workers,
        tokenizer,
        max_stop_sequences,
        max_input_length,
        max_total_tokens,
    );
403
404
    let infer = Infer::new(
        client,
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
405
        validation,
406
407
408
409
        max_batch_size,
        max_waiting_tokens,
        max_concurrent_requests,
    );
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
410

411
412
413
414
415
416
    // Prometheus handler
    let builder = PrometheusBuilder::new();
    let prom_handle = builder
        .install_recorder()
        .expect("failed to install metrics recorder");

417
418
419
420
421
422
423
    // CORS layer
    let allow_origin = allow_origin.unwrap_or(AllowOrigin::any());
    let cors_layer = CorsLayer::new()
        .allow_methods([Method::GET, Method::POST])
        .allow_headers([http::header::CONTENT_TYPE])
        .allow_origin(allow_origin);

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
424
    // Create router
Olivier Dehaene's avatar
Olivier Dehaene committed
425
    let app = Router::new()
426
        .merge(SwaggerUi::new("/docs").url("/api-doc/openapi.json", ApiDoc::openapi()))
427
        .route("/", post(compat_generate))
Olivier Dehaene's avatar
Olivier Dehaene committed
428
        .route("/generate", post(generate))
429
        .route("/generate_stream", post(generate_stream))
430
        .route("/", get(health))
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
431
        .route("/health", get(health))
432
        .layer(Extension(infer))
433
434
        .route("/metrics", get(metrics))
        .layer(Extension(prom_handle))
435
436
        .layer(opentelemetry_tracing_layer())
        .layer(cors_layer);
Olivier Dehaene's avatar
Olivier Dehaene committed
437

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
438
    // Run server
Olivier Dehaene's avatar
Olivier Dehaene committed
439
    axum::Server::bind(&addr)
Olivier Dehaene's avatar
Olivier Dehaene committed
440
        .serve(app.into_make_service())
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
441
442
        // Wait until all requests are finished to shut down
        .with_graceful_shutdown(shutdown_signal())
Olivier Dehaene's avatar
Olivier Dehaene committed
443
444
        .await
        .unwrap();
Olivier Dehaene's avatar
Olivier Dehaene committed
445
}
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471

/// Shutdown signal handler
async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }

    tracing::info!("signal received, starting graceful shutdown");
472
    opentelemetry::global::shutdown_tracer_provider();
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
473
}
474

475
476
477
478
479
480
481
482
483
484
485
impl From<i32> for FinishReason {
    fn from(finish_reason: i32) -> Self {
        let finish_reason = text_generation_client::FinishReason::from_i32(finish_reason).unwrap();
        match finish_reason {
            text_generation_client::FinishReason::Length => FinishReason::Length,
            text_generation_client::FinishReason::EosToken => FinishReason::EndOfSequenceToken,
            text_generation_client::FinishReason::StopSequence => FinishReason::StopSequence,
        }
    }
}

486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
/// Convert to Axum supported formats
impl From<InferError> for (StatusCode, Json<ErrorResponse>) {
    fn from(err: InferError) -> Self {
        let status_code = match err {
            InferError::GenerationError(_) => StatusCode::FAILED_DEPENDENCY,
            InferError::Overloaded(_) => StatusCode::TOO_MANY_REQUESTS,
            InferError::ValidationError(_) => StatusCode::UNPROCESSABLE_ENTITY,
            InferError::IncompleteGeneration => StatusCode::INTERNAL_SERVER_ERROR,
        };

        (
            status_code,
            Json(ErrorResponse {
                error: err.to_string(),
            }),
        )
    }
}

impl From<InferError> for Event {
    fn from(err: InferError) -> Self {
        Event::default()
            .json_data(ErrorResponse {
                error: err.to_string(),
            })
            .unwrap()
    }
}