profile_handler.go 9.64 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
Copyright 2025 NVIDIA Corporation.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package disagg

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
24
25
	"os"
	"strings"
26
27
28
29
30
31
32
33

	log "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
	schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
	logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func getEnvBoolOrDefault(key string, defaultVal bool) bool {
	val, ok := os.LookupEnv(key)
	if !ok {
		return defaultVal
	}
	switch strings.ToLower(val) {
	case "true", "1", "yes":
		return true
	case "false", "0", "no":
		return false
	default:
		return defaultVal
	}
}

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
const (
	DisaggProfileHandlerType = "disagg-profile-handler"
)

// compile-time type assertion
var _ framework.ProfileHandler = &DisaggProfileHandler{}

// DisaggProfileHandlerConfig holds the configuration for the DisaggProfileHandler.
type DisaggProfileHandlerConfig struct{}

// DisaggProfileHandlerFactory defines the factory function for DisaggProfileHandler.
func DisaggProfileHandlerFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
	cfg := DisaggProfileHandlerConfig{}
	if rawParameters != nil {
		if err := json.Unmarshal(rawParameters, &cfg); err != nil {
			return nil, fmt.Errorf("failed to parse %s plugin parameters: %w", DisaggProfileHandlerType, err)
		}
	}
67
68
	enforceDisagg := getEnvBoolOrDefault("DYN_ENFORCE_DISAGG", false)
	return NewDisaggProfileHandler(enforceDisagg).WithName(name), nil
69
70
71
}

// NewDisaggProfileHandler initializes a new DisaggProfileHandler.
72
func NewDisaggProfileHandler(enforceDisagg bool) *DisaggProfileHandler {
73
	return &DisaggProfileHandler{
74
75
		typedName:     plugins.TypedName{Type: DisaggProfileHandlerType, Name: DisaggProfileHandlerType},
		enforceDisagg: enforceDisagg,
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
	}
}

// DisaggProfileHandler is a ProfileHandler that orchestrates prefill/decode disaggregated serving.
//
// # Disaggregated mode detection
//
// In Dynamo's native architecture, disaggregated mode is determined by whether prefill workers
// actually exist at runtime (the is_disaggregated flag in the Rust KV router). However, the
// GAIE EPP framework determines profile availability at configuration time, not at runtime.
// To bridge this gap, DisaggProfileHandler uses the EPP profile mechanism as a proxy:
// it checks whether a "prefill" scheduling profile is registered in the config. If prefill
// workers are configured but none are actually running, the prefill profile's label-filter
// will find zero pods, causing the profile to fail — and the handler gracefully degrades
// to aggregated mode (see below).
//
// # Scheduling flow
//
// On each scheduling cycle it:
//  1. Checks whether a "prefill" profile is registered in the config.
//  2. Writes PrefillEnabledState into CycleState so scorer plugins can read it.
//  3. If a prefill profile exists: runs the "prefill" profile first, then the "decode" profile.
//     The "decode" profile is the primary (the pod the request is ultimately sent to).
//  4. If no prefill profile exists: runs only the "decode" profile (pure aggregated mode).
//
// # Graceful degradation
//
// When a prefill profile is configured but no prefill workers are available at runtime,
// the handler degrades gracefully to aggregated mode on a per-request basis:
//
//  1. Pick (iteration 1): prefill profile exists → writes PrefillEnabled=true → runs prefill profile.
//  2. Prefill profile runs: label-filter finds 0 prefill pods → profile fails → result is nil.
//  3. Pick (iteration 2): sees prefill result is nil → overwrites PrefillEnabled=false → runs decode profile.
//  4. Decode scorer runs: reads PrefillEnabled=false → passes isDisaggregated=false to the Rust
//     decode router → full KV cache overlap scoring is used (overlap_score_weight=1.0).
//
// This means the same YAML config works transparently for both aggregated and disaggregated
// deployments. If prefill workers come up later, subsequent requests automatically use
// disaggregated routing. If they go down, requests fall back to aggregated mode.
type DisaggProfileHandler struct {
116
117
	typedName     plugins.TypedName
	enforceDisagg bool
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
}

// TypedName returns the type and name tuple of this plugin instance.
func (h *DisaggProfileHandler) TypedName() plugins.TypedName {
	return h.typedName
}

// WithName sets the name of the profile handler.
func (h *DisaggProfileHandler) WithName(name string) *DisaggProfileHandler {
	h.typedName.Name = name
	return h
}

// Pick selects which profiles to run in the current iteration.
//
// Iteration 1 (no results yet):
//   - Writes PrefillEnabledState into CycleState.
//   - If a "prefill" profile exists → returns it alone (run prefill first).
//   - Otherwise → returns the "decode" profile.
//
// Iteration 2 (prefill result exists, decode not yet):
//   - Returns the "decode" profile.
//
// Iteration 3+ (all results collected):
//   - Returns empty map to stop the loop.
func (h *DisaggProfileHandler) Pick(ctx context.Context, cycleState *schedtypes.CycleState, _ *schedtypes.LLMRequest,
	profiles map[string]*framework.SchedulerProfile, profileResults map[string]*schedtypes.ProfileRunResult) map[string]*framework.SchedulerProfile {

	logger := log.FromContext(ctx).V(logutil.VERBOSE)

	// First call: determine if prefill is enabled and write state.
	if len(profileResults) == 0 {
		_, prefillExists := profiles[PrefillProfileName]
		state := &PrefillEnabledState{Enabled: prefillExists}
		cycleState.Write(PrefillEnabledStateKey, state)
		logger.Info("DisaggProfileHandler: prefill enabled state determined", "prefillEnabled", prefillExists)

		if prefillExists {
			// Run prefill profile first.
			return map[string]*framework.SchedulerProfile{
				PrefillProfileName: profiles[PrefillProfileName],
			}
		}
		// No prefill profile — run decode only.
		if decodeProfile, ok := profiles[DecodeProfileName]; ok {
			return map[string]*framework.SchedulerProfile{
				DecodeProfileName: decodeProfile,
			}
		}
		// Fallback: return all profiles.
		return profiles
	}

	// Second call: prefill has run, now run decode.
	if prefillResult, prefillDone := profileResults[PrefillProfileName]; prefillDone {
		if _, decodeDone := profileResults[DecodeProfileName]; !decodeDone {
			if prefillResult == nil {
175
176
177
178
179
180
181
				if h.enforceDisagg {
					// enforce_disagg=true: do not fall back to aggregated mode.
					// Stop the scheduling loop — ProcessResults will reject the request.
					logger.Info("DisaggProfileHandler: prefill profile failed and enforce_disagg=true, rejecting request")
					return map[string]*framework.SchedulerProfile{}
				}
				// enforce_disagg=false: fall back to aggregated decode.
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
				logger.Info("DisaggProfileHandler: prefill profile failed (no workers?), falling back to aggregated decode")
				cycleState.Write(PrefillEnabledStateKey, &PrefillEnabledState{Enabled: false})
			}

			if decodeProfile, ok := profiles[DecodeProfileName]; ok {
				return map[string]*framework.SchedulerProfile{
					DecodeProfileName: decodeProfile,
				}
			}
		}
	}

	// All profiles have been executed.
	return map[string]*framework.SchedulerProfile{}
}

// ProcessResults aggregates the profile run results and designates the primary profile.
// The "decode" profile is always the primary (the pod that handles the request).
200
func (h *DisaggProfileHandler) ProcessResults(_ context.Context, _ *schedtypes.CycleState, req *schedtypes.LLMRequest,
201
202
	profileResults map[string]*schedtypes.ProfileRunResult) (*schedtypes.SchedulingResult, error) {

203
204
205
206
207
208
209
210
211
212
213
214
215
216
	// When enforce_disagg=true and the prefill worker ID header was not set
	// (prefill router not activated or scorer failed), reject the request
	// at the EPP level instead of forwarding it to the sidecar without
	// routing headers.
	if h.enforceDisagg && (req.Headers == nil || req.Headers[PrefillWorkerIDHeader] == "") {
		// Only enforce if a prefill profile was configured and ran.
		if _, prefillRan := profileResults[PrefillProfileName]; prefillRan {
			return nil, errors.New(
				"disaggregated mode enforced (DYN_ENFORCE_DISAGG=true) but prefill workers " +
					"are not available; request rejected. Either wait for prefill workers " +
					"to register or set DYN_ENFORCE_DISAGG=false to allow aggregated fallback")
		}
	}

217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
	if len(profileResults) == 0 {
		return nil, errors.New("disagg profile handler received no profile results")
	}

	// Determine primary profile name.
	primaryProfile := DecodeProfileName
	if _, ok := profileResults[DecodeProfileName]; !ok {
		// If there's no decode result, pick whichever profile ran.
		for name := range profileResults {
			primaryProfile = name
			break
		}
	}

	if profileResults[primaryProfile] == nil {
232
233
234
235
236
237
		if h.enforceDisagg {
			return nil, errors.New(
				"disaggregated mode enforced (DYN_ENFORCE_DISAGG=true) but prefill workers " +
					"are not available; request rejected. Either wait for prefill workers " +
					"to register or set DYN_ENFORCE_DISAGG=false to allow aggregated fallback")
		}
238
239
240
241
242
243
244
245
		return nil, fmt.Errorf("primary profile '%s' failed to produce a result", primaryProfile)
	}

	return &schedtypes.SchedulingResult{
		ProfileResults:     profileResults,
		PrimaryProfileName: primaryProfile,
	}, nil
}