diff --git a/latencypredictor/prediction_server.py b/latencypredictor/prediction_server.py index 581f83421c..5b8ae0fc64 100644 --- a/latencypredictor/prediction_server.py +++ b/latencypredictor/prediction_server.py @@ -219,14 +219,25 @@ def is_ready(self) -> bool: def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) -> pd.DataFrame: """ Prepare features with interaction terms to match training server. - Args: df: DataFrame with raw features model_type: 'ttft' or 'tpot' - Returns: DataFrame with engineered features including interactions """ + # Encode pod_type as categorical (common for both TTFT and TPOT) + # Convert to categorical with known categories for consistent encoding + if 'pod_type' in df.columns: + df['pod_type'] = df['pod_type'].fillna('') # Handle NaN + df['pod_type_cat'] = pd.Categorical( + df['pod_type'], + categories=['', 'prefill', 'decode'], # '' = monolithic, prefill, decode + ordered=False + ) + else: + # If pod_type column doesn't exist, create it as empty (monolithic) + df['pod_type_cat'] = pd.Categorical([''] * len(df), categories=['', 'prefill', 'decode'], ordered=False) + if model_type == "ttft": # Create interaction: prefix score * input length df['effective_input_tokens'] = (1-df['prefix_cache_score']) * df['input_token_length'] @@ -238,9 +249,9 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) # make it categorical for tree models (safe for LGB, XGB with enable_categorical) df['prefill_score_bucket'] = pd.Categorical(df['prefill_score_bucket'], categories=[0,1,2,3], ordered=True) - - - # Return TTFT features with interaction + + + # Return TTFT features with interaction and pod_type feature_cols = [ 'kv_cache_percentage', 'input_token_length', @@ -248,11 +259,12 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) 'num_request_running', 'prefix_cache_score', 'effective_input_tokens', - 'prefill_score_bucket' + 'prefill_score_bucket', + 'pod_type_cat' ] - + return df[feature_cols] - + else: # tpot # TPOT doesn't use prefix_cache_score, so no interaction needed feature_cols = [ @@ -260,9 +272,9 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) 'input_token_length', 'num_request_waiting', 'num_request_running', - 'num_tokens_generated' + 'num_tokens_generated', + 'pod_type_cat' ] - return df[feature_cols] def load_models(self) -> bool: @@ -333,10 +345,17 @@ def predict(self, features: dict) -> Tuple[float, float]: #df_tpot = pd.DataFrame([tpot_raw_data]) if self.model_type == ModelType.BAYESIAN_RIDGE: - + # Bayesian Ridge can't handle categorical features directly + # Drop categorical bucket, but one-hot encode pod_type ttft_for_scale = df_ttft.drop(columns=['prefill_score_bucket'], errors='ignore') + if 'pod_type_cat' in ttft_for_scale.columns: + ttft_for_scale = pd.get_dummies(ttft_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) ttft_scaled = self.ttft_scaler.transform(ttft_for_scale) - tpot_scaled = self.tpot_scaler.transform(df_tpot) + + tpot_for_scale = df_tpot.copy() + if 'pod_type_cat' in tpot_for_scale.columns: + tpot_for_scale = pd.get_dummies(tpot_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) + tpot_scaled = self.tpot_scaler.transform(tpot_for_scale) ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True) tpot_pred_mean, tpot_std = self.tpot_model.predict(tpot_scaled, return_std=True) @@ -416,9 +435,17 @@ def predict_batch(self, features_list: List[dict]) -> Tuple[np.ndarray, np.ndarr #df_tpot_batch = pd.DataFrame(tpot_raw_data) if self.model_type == ModelType.BAYESIAN_RIDGE: + # Bayesian Ridge can't handle categorical features directly + # Drop categorical bucket, but one-hot encode pod_type ttft_for_scale = df_ttft_batch.drop(columns=['prefill_score_bucket'], errors='ignore') + if 'pod_type_cat' in ttft_for_scale.columns: + ttft_for_scale = pd.get_dummies(ttft_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) ttft_scaled = self.ttft_scaler.transform(ttft_for_scale) - tpot_scaled = self.tpot_scaler.transform(df_tpot_batch) + + tpot_for_scale = df_tpot_batch.copy() + if 'pod_type_cat' in tpot_for_scale.columns: + tpot_for_scale = pd.get_dummies(tpot_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) + tpot_scaled = self.tpot_scaler.transform(tpot_for_scale) ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True) tpot_pred_mean, tpot_std = self.tpot_model.predict(tpot_scaled, return_std=True) @@ -471,6 +498,7 @@ class PredictionRequest(BaseModel): num_request_running: int = Field(..., ge=0) num_tokens_generated: int = Field(..., ge=0) prefix_cache_score: float = Field(..., ge=0.0, le=1.0, description="Prefix cache hit ratio score (0.0 to 1.0)") + pod_type: Optional[str] = Field(default="", description="Pod type: 'prefill', 'decode', or '' for monolithic") class PredictionResponse(BaseModel): diff --git a/latencypredictor/training_server.py b/latencypredictor/training_server.py index 3e1e2751f4..d2e8cb1917 100644 --- a/latencypredictor/training_server.py +++ b/latencypredictor/training_server.py @@ -336,14 +336,25 @@ def _store_descaled_coefficients(self, model, scaler, feature_names, model_name) def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) -> pd.DataFrame: """ Prepare features with interaction terms for better model learning. - Args: df: DataFrame with raw features model_type: 'ttft' or 'tpot' - Returns: DataFrame with engineered features including interactions """ + # Encode pod_type as categorical (common for both TTFT and TPOT) + # Convert to categorical with known categories for consistent encoding + if 'pod_type' in df.columns: + df['pod_type'] = df['pod_type'].fillna('') # Handle NaN + df['pod_type_cat'] = pd.Categorical( + df['pod_type'], + categories=['', 'prefill', 'decode'], # '' = monolithic, prefill, decode + ordered=False + ) + else: + # If pod_type column doesn't exist, create it as empty (monolithic) + df['pod_type_cat'] = pd.Categorical([''] * len(df), categories=['', 'prefill', 'decode'], ordered=False) + if model_type == "ttft": # Create interaction: prefix score * input length # This captures that prefix caching benefit scales with input size @@ -358,7 +369,7 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) df['prefill_score_bucket'] = pd.Categorical(df['prefill_score_bucket'], categories=[0,1,2,3], ordered=True) - # Return TTFT features with interaction + # Return TTFT features with interaction and pod_type feature_cols = [ 'kv_cache_percentage', 'input_token_length', @@ -366,11 +377,10 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) 'num_request_running', 'prefix_cache_score', 'effective_input_tokens', - 'prefill_score_bucket' + 'prefill_score_bucket', + 'pod_type_cat' ] - return df[feature_cols] - else: # tpot # TPOT doesn't use prefix_cache_score, so no interaction needed feature_cols = [ @@ -378,12 +388,11 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) 'input_token_length', 'num_request_waiting', 'num_request_running', - 'num_tokens_generated' + 'num_tokens_generated', + 'pod_type_cat' ] - return df[feature_cols] - def shutdown(self): """Signal the training thread to exit and join it.""" self._shutdown_event.set() @@ -419,11 +428,22 @@ def _train_model_with_scaling(self, features: pd.DataFrame, target: pd.Series, m raise ValueError("Empty training data") if features.isnull().any().any() or target.isnull().any(): raise ValueError("Training data contains NaN values") - if np.isinf(features.values).any() or np.isinf(target.values).any(): + # Check only numeric columns for infinity (categorical columns cause isinf to fail) + numeric_features = features.select_dtypes(include=[np.number]) + if len(numeric_features.columns) > 0 and np.isinf(numeric_features.values).any(): raise ValueError("Training data contains infinite values") + if np.isinf(target.values).any(): + raise ValueError("Target data contains infinite values") if self.model_type == ModelType.BAYESIAN_RIDGE: + # Bayesian Ridge can't handle categorical features directly + # Drop categorical bucket, but one-hot encode pod_type to preserve the information features = features.drop(columns=['prefill_score_bucket'], errors='ignore') + + # One-hot encode pod_type_cat if it exists (converts to numeric 0/1 columns) + if 'pod_type_cat' in features.columns: + features = pd.get_dummies(features, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) + scaler = StandardScaler() features_scaled = scaler.fit_transform(features) if np.isnan(features_scaled).any() or np.isinf(features_scaled).any(): @@ -438,10 +458,10 @@ def _train_model_with_scaling(self, features: pd.DataFrame, target: pd.Series, m elif self.model_type == ModelType.XGBOOST: # XGBoost with quantile regression if model_name == "ttft": - # enforce your TTFT feature order + # enforce your TTFT feature order (including pod_type_cat) ttft_order = [ "kv_cache_percentage", "input_token_length", "num_request_waiting", - "num_request_running", "prefix_cache_score", "effective_input_tokens", "prefill_score_bucket" + "num_request_running", "prefix_cache_score", "effective_input_tokens", "prefill_score_bucket", "pod_type_cat" ] if list(features.columns) != ttft_order: try: @@ -491,15 +511,15 @@ def _train_model_with_scaling(self, features: pd.DataFrame, target: pd.Series, m elif model_name == "tpot": - tpot_order = ["kv_cache_percentage","input_token_length","num_request_waiting","num_request_running","num_tokens_generated"] + tpot_order = ["kv_cache_percentage","input_token_length","num_request_waiting","num_request_running","num_tokens_generated","pod_type_cat"] if list(features.columns) != tpot_order: try: features = features[tpot_order] except Exception as _: raise ValueError(f"TPOT features must be exactly {tpot_order}; got {list(features.columns)}") - mono_str = "(1,1,1,1,1)" + mono_str = "(1,1,1,1,1,0)" # pod_type_cat has no monotone constraint else: - mono_str = "(0,0,0,0,0)" # default + mono_str = "(0,0,0,0,0,0)" # default (6 features with pod_type_cat) model = xgb.XGBRegressor( n_estimators=200, # Number of trees to build (moderate value for balanced accuracy and speed) max_depth=6, # Depth of trees; 6 is typically a sweet spot balancing bias/variance @@ -569,23 +589,24 @@ def _calculate_quantile_metrics_on_test(self, model, scaler, test_data, model_na if self.model_type == ModelType.BAYESIAN_RIDGE: feature_cols = [ 'kv_cache_percentage','input_token_length','num_request_waiting', - 'num_request_running','prefix_cache_score','effective_input_tokens' + 'num_request_running','prefix_cache_score','effective_input_tokens','pod_type_cat' ] else: # XGBoost or LightGBM feature_cols = [ 'kv_cache_percentage','input_token_length','num_request_waiting', - 'num_request_running','prefix_cache_score','effective_input_tokens','prefill_score_bucket' + 'num_request_running','prefix_cache_score','effective_input_tokens','prefill_score_bucket','pod_type_cat' ] else: # tpot - feature_cols = ['kv_cache_percentage', 'input_token_length', - 'num_request_waiting', 'num_request_running', 'num_tokens_generated'] + feature_cols = ['kv_cache_percentage', 'input_token_length', + 'num_request_waiting', 'num_request_running', 'num_tokens_generated', 'pod_type_cat'] - X = df_features[feature_cols] # ✅ Now has properly typed categorical! - - - + X = df_features[feature_cols] + # For Bayesian Ridge, one-hot encode pod_type_cat before scaling if self.model_type == ModelType.BAYESIAN_RIDGE and scaler is not None: + # One-hot encode pod_type_cat (Bayesian Ridge can't handle categorical features) + if 'pod_type_cat' in X.columns: + X = pd.get_dummies(X, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) X = scaler.transform(X) y_true = df_raw[target_col].values @@ -635,6 +656,7 @@ def _create_default_model(self, model_type: str) -> Union[Tuple[BayesianRidge, S 'num_request_running': [0, ], 'num_tokens_generated': [1,] }) + features = self._prepare_features_with_interaction(features, "tpot") target = pd.Series([10.0]) return self._train_model_with_scaling(features, target, model_name=model_type) except Exception as e: @@ -662,10 +684,10 @@ def train(self): df_ttft = self._prepare_features_with_interaction(raw_ttft.copy(), model_type="ttft") print(f"TTFT training data size: {len(df_ttft)} with sample data: {df_ttft.columns.tolist()}") if len(df_ttft) >= settings.MIN_SAMPLES_FOR_RETRAIN: - # Updated TTFT features to include prefix_cache_score + # Updated TTFT features to include prefix_cache_score and pod_type_cat ttft_feature_cols_tree = [ 'kv_cache_percentage','input_token_length','num_request_waiting', - 'num_request_running','prefix_cache_score','effective_input_tokens','prefill_score_bucket' + 'num_request_running','prefix_cache_score','effective_input_tokens','prefill_score_bucket','pod_type_cat' ] ttft_feature_cols_br = [ 'kv_cache_percentage','input_token_length','num_request_waiting', @@ -675,7 +697,8 @@ def train(self): # Build X_ttft for all model types, then trim for BR X_ttft = df_ttft[ttft_feature_cols_tree] if self.model_type == ModelType.BAYESIAN_RIDGE: - X_ttft = X_ttft[ttft_feature_cols_br] + # For Bayesian Ridge, drop categorical features (handled by one-hot encoding in _train_model_with_scaling) + X_ttft = df_ttft # Use full df_ttft which will be processed by _train_model_with_scaling y_ttft = raw_ttft['actual_ttft_ms'] @@ -734,8 +757,8 @@ def train(self): df_tpot = pd.DataFrame(tpot_snap).dropna() df_tpot = df_tpot[df_tpot['actual_tpot_ms'] > 0] if len(df_tpot) >= settings.MIN_SAMPLES_FOR_RETRAIN: - # TPOT features remain unchanged - X_tpot = df_tpot[['kv_cache_percentage', 'input_token_length', 'num_request_waiting', 'num_request_running', 'num_tokens_generated']] + # TPOT features - use feature preparation to add pod_type_cat + X_tpot = self._prepare_features_with_interaction(df_tpot.copy(), model_type="tpot") y_tpot = df_tpot['actual_tpot_ms'] try: result = self._train_model_with_scaling(X_tpot, y_tpot, model_name="tpot") @@ -816,20 +839,33 @@ def predict(self, features: dict) -> Tuple[float, float, float, float]: if not isinstance(features[f], (int, float)): raise ValueError(f"Invalid type for feature {f}: expected number") - # Updated TTFT features to include prefix_cache_score + # Updated TTFT features to include prefix_cache_score and pod_type ttft_cols = ['kv_cache_percentage','input_token_length','num_request_waiting','num_request_running','prefix_cache_score'] tpot_cols = ['kv_cache_percentage','input_token_length','num_request_waiting','num_request_running','num_tokens_generated'] - # Create DataFrames for predictions df_ttft = pd.DataFrame([{col: features[col] for col in ttft_cols}]) - # Add interaction term for TTFT + # Add pod_type if present (otherwise _prepare_features_with_interaction will default to '') + if 'pod_type' in features: + df_ttft['pod_type'] = features['pod_type'] + # Add interaction term for TTFT (includes pod_type encoding) df_ttft = self._prepare_features_with_interaction(df_ttft, model_type="ttft") + df_tpot = pd.DataFrame([{col: features[col] for col in tpot_cols}]) + # Add pod_type if present + if 'pod_type' in features: + df_tpot['pod_type'] = features['pod_type'] + # Add pod_type encoding for TPOT + df_tpot = self._prepare_features_with_interaction(df_tpot, model_type="tpot") if self.model_type == ModelType.BAYESIAN_RIDGE: - # Use scaling for Bayesian Ridge + # Use scaling for Bayesian Ridge - drop categorical bucket, one-hot encode pod_type df_ttft = df_ttft.drop(columns=['prefill_score_bucket'], errors='ignore') + if 'pod_type_cat' in df_ttft.columns: + df_ttft = pd.get_dummies(df_ttft, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) ttft_scaled = self.ttft_scaler.transform(df_ttft) + + if 'pod_type_cat' in df_tpot.columns: + df_tpot = pd.get_dummies(df_tpot, columns=['pod_type_cat'], prefix='pod_type', drop_first=False) tpot_scaled = self.tpot_scaler.transform(df_tpot) ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True) @@ -1169,12 +1205,13 @@ def emit_metrics(model, coefficients, feats, prefix): if self.model_type == ModelType.BAYESIAN_RIDGE: ttft_feats = ["kv_cache_percentage","input_token_length","num_request_waiting", "num_request_running","prefix_cache_score","effective_input_tokens"] + tpot_feats = ["kv_cache_percentage","input_token_length","num_request_waiting", + "num_request_running","num_tokens_generated"] else: ttft_feats = ["kv_cache_percentage","input_token_length","num_request_waiting", - "num_request_running","prefix_cache_score","effective_input_tokens","prefill_score_bucket"] - - tpot_feats = ["kv_cache_percentage","input_token_length","num_request_waiting", - "num_request_running","num_tokens_generated"] + "num_request_running","prefix_cache_score","effective_input_tokens","prefill_score_bucket","pod_type_cat"] + tpot_feats = ["kv_cache_percentage","input_token_length","num_request_waiting", + "num_request_running","num_tokens_generated","pod_type_cat"] emit_metrics(ttft_model, self.ttft_coefficients, ttft_feats, "ttft") emit_metrics(tpot_model, self.tpot_coefficients, tpot_feats, "tpot") @@ -1302,6 +1339,7 @@ class TrainingEntry(BaseModel): actual_tpot_ms: float = Field(..., ge=0.0) num_tokens_generated: int = Field(..., ge=0) prefix_cache_score: float = Field(..., ge=0.0, le=1.0, description="Prefix cache hit ratio score (0.0 to 1.0)") + pod_type: Optional[str] = Field(default="", description="Pod type: 'prefill', 'decode', or '' for monolithic") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class PredictionRequest(BaseModel): @@ -1311,6 +1349,7 @@ class PredictionRequest(BaseModel): num_request_running: int = Field(..., ge=0) num_tokens_generated: int = Field(..., ge=0) prefix_cache_score: float = Field(..., ge=0.0, le=1.0, description="Prefix cache hit ratio score (0.0 to 1.0)") + pod_type: Optional[str] = Field(default="", description="Pod type: 'prefill', 'decode', or '' for monolithic") class PredictionResponse(BaseModel): ttft_ms: float = Field(..., description=f"Predicted {settings.QUANTILE_ALPHA:.0%} quantile TTFT in milliseconds") diff --git a/sidecars/latencypredictorasync/types.go b/sidecars/latencypredictorasync/types.go index c8eadefe23..2d2ebb67c7 100644 --- a/sidecars/latencypredictorasync/types.go +++ b/sidecars/latencypredictorasync/types.go @@ -133,6 +133,7 @@ type TrainingEntry struct { ActualTTFT float64 `json:"actual_ttft_ms"` ActualTPOT float64 `json:"actual_tpot_ms"` PrefixCacheScore float64 `json:"prefix_cache_score"` + PodType string `json:"pod_type,omitempty"` // "prefill", "decode", or "" for monolithic Timestamp time.Time `json:"timestamp"` } @@ -147,6 +148,7 @@ type PredictionRequest struct { NumRequestRunning int `json:"num_request_running"` NumTokensGenerated int `json:"num_tokens_generated"` PrefixCacheScore float64 `json:"prefix_cache_score"` + PodType string `json:"pod_type,omitempty"` // "prefill", "decode", or "" for monolithic } type PredictionResponse struct {