Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions latencypredictor/prediction_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,25 @@ def is_ready(self) -> bool:
def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) -> pd.DataFrame:
"""
Prepare features with interaction terms to match training server.
Args:
df: DataFrame with raw features
model_type: 'ttft' or 'tpot'
Returns:
DataFrame with engineered features including interactions
"""
# Encode pod_type as categorical (common for both TTFT and TPOT)
# Convert to categorical with known categories for consistent encoding
if 'pod_type' in df.columns:
df['pod_type'] = df['pod_type'].fillna('') # Handle NaN
df['pod_type_cat'] = pd.Categorical(
df['pod_type'],
categories=['', 'prefill', 'decode'], # '' = monolithic, prefill, decode
ordered=False
)
else:
# If pod_type column doesn't exist, create it as empty (monolithic)
df['pod_type_cat'] = pd.Categorical([''] * len(df), categories=['', 'prefill', 'decode'], ordered=False)

if model_type == "ttft":
# Create interaction: prefix score * input length
df['effective_input_tokens'] = (1-df['prefix_cache_score']) * df['input_token_length']
Expand All @@ -238,31 +249,32 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str)

# make it categorical for tree models (safe for LGB, XGB with enable_categorical)
df['prefill_score_bucket'] = pd.Categorical(df['prefill_score_bucket'], categories=[0,1,2,3], ordered=True)
# Return TTFT features with interaction


# Return TTFT features with interaction and pod_type
feature_cols = [
'kv_cache_percentage',
'input_token_length',
'num_request_waiting',
'num_request_running',
'prefix_cache_score',
'effective_input_tokens',
'prefill_score_bucket'
'prefill_score_bucket',
'pod_type_cat'
]

return df[feature_cols]

else: # tpot
# TPOT doesn't use prefix_cache_score, so no interaction needed
feature_cols = [
'kv_cache_percentage',
'input_token_length',
'num_request_waiting',
'num_request_running',
'num_tokens_generated'
'num_tokens_generated',
'pod_type_cat'
]

return df[feature_cols]

def load_models(self) -> bool:
Expand Down Expand Up @@ -333,10 +345,17 @@ def predict(self, features: dict) -> Tuple[float, float]:
#df_tpot = pd.DataFrame([tpot_raw_data])

if self.model_type == ModelType.BAYESIAN_RIDGE:

# Bayesian Ridge can't handle categorical features directly
# Drop categorical bucket, but one-hot encode pod_type
ttft_for_scale = df_ttft.drop(columns=['prefill_score_bucket'], errors='ignore')
if 'pod_type_cat' in ttft_for_scale.columns:
ttft_for_scale = pd.get_dummies(ttft_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
ttft_scaled = self.ttft_scaler.transform(ttft_for_scale)
tpot_scaled = self.tpot_scaler.transform(df_tpot)

tpot_for_scale = df_tpot.copy()
if 'pod_type_cat' in tpot_for_scale.columns:
tpot_for_scale = pd.get_dummies(tpot_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
tpot_scaled = self.tpot_scaler.transform(tpot_for_scale)

ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True)
tpot_pred_mean, tpot_std = self.tpot_model.predict(tpot_scaled, return_std=True)
Expand Down Expand Up @@ -416,9 +435,17 @@ def predict_batch(self, features_list: List[dict]) -> Tuple[np.ndarray, np.ndarr
#df_tpot_batch = pd.DataFrame(tpot_raw_data)

if self.model_type == ModelType.BAYESIAN_RIDGE:
# Bayesian Ridge can't handle categorical features directly
# Drop categorical bucket, but one-hot encode pod_type
ttft_for_scale = df_ttft_batch.drop(columns=['prefill_score_bucket'], errors='ignore')
if 'pod_type_cat' in ttft_for_scale.columns:
ttft_for_scale = pd.get_dummies(ttft_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
ttft_scaled = self.ttft_scaler.transform(ttft_for_scale)
tpot_scaled = self.tpot_scaler.transform(df_tpot_batch)

tpot_for_scale = df_tpot_batch.copy()
if 'pod_type_cat' in tpot_for_scale.columns:
tpot_for_scale = pd.get_dummies(tpot_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
tpot_scaled = self.tpot_scaler.transform(tpot_for_scale)

ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True)
tpot_pred_mean, tpot_std = self.tpot_model.predict(tpot_scaled, return_std=True)
Expand Down Expand Up @@ -471,6 +498,7 @@ class PredictionRequest(BaseModel):
num_request_running: int = Field(..., ge=0)
num_tokens_generated: int = Field(..., ge=0)
prefix_cache_score: float = Field(..., ge=0.0, le=1.0, description="Prefix cache hit ratio score (0.0 to 1.0)")
pod_type: Optional[str] = Field(default="", description="Pod type: 'prefill', 'decode', or '' for monolithic")


class PredictionResponse(BaseModel):
Expand Down
Loading