@@ -99,6 +99,178 @@ function extractBillingHeaders(headers) {
9999 */
100100const limiter = rateLimiter . create ( ) ;
101101
102+ const ET_WARNING_THRESHOLDS = [ 50 , 75 , 90 , 95 ] ;
103+ const ET_DEFAULT_WEIGHTS = Object . freeze ( {
104+ input : 1.0 ,
105+ cacheRead : 0.1 ,
106+ output : 4.0 ,
107+ reasoning : 4.0 ,
108+ } ) ;
109+ let etGuardState = {
110+ configKey : null ,
111+ totalEffectiveTokens : 0 ,
112+ emittedThresholds : new Set ( ) ,
113+ } ;
114+ const effectiveTokenConfigCache = {
115+ rawMax : undefined ,
116+ rawMultipliers : undefined ,
117+ parsed : { max : null , multipliers : { } } ,
118+ } ;
119+
120+ function createEffectiveTokenState ( configKey = null ) {
121+ return {
122+ configKey,
123+ totalEffectiveTokens : 0 ,
124+ emittedThresholds : new Set ( ) ,
125+ } ;
126+ }
127+
128+ function parseMaxEffectiveTokens ( raw ) {
129+ if ( raw === undefined || raw === null || String ( raw ) . trim ( ) === '' ) return null ;
130+ const parsed = Number ( raw ) ;
131+ if ( ! Number . isInteger ( parsed ) || parsed <= 0 ) return null ;
132+ return parsed ;
133+ }
134+
135+ function parseModelMultipliers ( raw ) {
136+ if ( ! raw || String ( raw ) . trim ( ) === '' ) return { } ;
137+ try {
138+ const parsed = JSON . parse ( raw ) ;
139+ if ( ! parsed || typeof parsed !== 'object' || Array . isArray ( parsed ) ) return { } ;
140+ const result = { } ;
141+ for ( const [ model , value ] of Object . entries ( parsed ) ) {
142+ const num = Number ( value ) ;
143+ if ( Number . isFinite ( num ) && num > 0 ) {
144+ result [ model ] = num ;
145+ }
146+ }
147+ return result ;
148+ } catch {
149+ return { } ;
150+ }
151+ }
152+
153+ function getEffectiveTokenConfig ( ) {
154+ const rawMax = process . env . AWF_MAX_EFFECTIVE_TOKENS ;
155+ const rawMultipliers = process . env . AWF_EFFECTIVE_TOKEN_MODEL_MULTIPLIERS ;
156+ if ( effectiveTokenConfigCache . rawMax === rawMax && effectiveTokenConfigCache . rawMultipliers === rawMultipliers ) {
157+ return effectiveTokenConfigCache . parsed ;
158+ }
159+
160+ effectiveTokenConfigCache . rawMax = rawMax ;
161+ effectiveTokenConfigCache . rawMultipliers = rawMultipliers ;
162+ const parsedMultipliers = Object . freeze ( parseModelMultipliers ( rawMultipliers ) ) ;
163+ effectiveTokenConfigCache . parsed = {
164+ max : parseMaxEffectiveTokens ( rawMax ) ,
165+ multipliers : parsedMultipliers ,
166+ } ;
167+ return effectiveTokenConfigCache . parsed ;
168+ }
169+
170+ function getEffectiveTokenState ( config ) {
171+ if ( ! config . max ) return null ;
172+ const configKey = `${ config . max } |${ JSON . stringify ( config . multipliers ) } ` ;
173+ if ( etGuardState . configKey !== configKey ) {
174+ etGuardState = createEffectiveTokenState ( configKey ) ;
175+ }
176+ return etGuardState ;
177+ }
178+
179+ function calculateEffectiveTokens ( normalizedUsage , model , config ) {
180+ const multiplier = config . multipliers [ model ] ?? 1 ;
181+ const baseWeightedTokens =
182+ ( ET_DEFAULT_WEIGHTS . input * ( normalizedUsage . input_tokens || 0 ) ) +
183+ ( ET_DEFAULT_WEIGHTS . cacheRead * ( normalizedUsage . cache_read_tokens || 0 ) ) +
184+ ( ET_DEFAULT_WEIGHTS . output * ( normalizedUsage . output_tokens || 0 ) ) +
185+ ( ET_DEFAULT_WEIGHTS . reasoning * ( normalizedUsage . reasoning_tokens || 0 ) ) ;
186+ return {
187+ multiplier,
188+ baseWeightedTokens,
189+ effectiveTokens : multiplier * baseWeightedTokens ,
190+ } ;
191+ }
192+
193+ function applyEffectiveTokenUsage ( normalizedUsage , model ) {
194+ const config = getEffectiveTokenConfig ( ) ;
195+ const state = getEffectiveTokenState ( config ) ;
196+ if ( ! state || ! normalizedUsage ) return null ;
197+
198+ const previousTotal = state . totalEffectiveTokens ;
199+ const calc = calculateEffectiveTokens ( normalizedUsage , model || 'unknown' , config ) ;
200+ state . totalEffectiveTokens += calc . effectiveTokens ;
201+ const percentUsed = ( state . totalEffectiveTokens / config . max ) * 100 ;
202+
203+ const crossedThresholds = [ ] ;
204+ for ( const threshold of ET_WARNING_THRESHOLDS ) {
205+ if ( percentUsed >= threshold && ! state . emittedThresholds . has ( threshold ) ) {
206+ state . emittedThresholds . add ( threshold ) ;
207+ crossedThresholds . push ( threshold ) ;
208+ }
209+ }
210+
211+ return {
212+ maxEffectiveTokens : config . max ,
213+ previousTotalEffectiveTokens : previousTotal ,
214+ totalEffectiveTokens : state . totalEffectiveTokens ,
215+ effectiveTokensThisResponse : calc . effectiveTokens ,
216+ modelMultiplier : calc . multiplier ,
217+ crossedThresholds,
218+ maxExceeded : state . totalEffectiveTokens >= config . max ,
219+ } ;
220+ }
221+
222+ function getEffectiveTokenBlockState ( ) {
223+ const config = getEffectiveTokenConfig ( ) ;
224+ const state = getEffectiveTokenState ( config ) ;
225+ if ( ! state ) return null ;
226+ return {
227+ maxEffectiveTokens : config . max ,
228+ totalEffectiveTokens : state . totalEffectiveTokens ,
229+ maxExceeded : state . totalEffectiveTokens >= config . max ,
230+ } ;
231+ }
232+
233+ function getEffectiveTokenReflectState ( ) {
234+ const config = getEffectiveTokenConfig ( ) ;
235+ const state = getEffectiveTokenState ( config ) ;
236+ if ( ! state ) {
237+ return {
238+ enabled : false ,
239+ max_effective_tokens : null ,
240+ total_effective_tokens : 0 ,
241+ remaining_effective_tokens : null ,
242+ percent_used : 0 ,
243+ thresholds_crossed : [ ] ,
244+ } ;
245+ }
246+ return {
247+ enabled : true ,
248+ max_effective_tokens : config . max ,
249+ total_effective_tokens : state . totalEffectiveTokens ,
250+ remaining_effective_tokens : Math . max ( 0 , config . max - state . totalEffectiveTokens ) ,
251+ percent_used : Math . round ( ( state . totalEffectiveTokens / config . max ) * 10000 ) / 100 ,
252+ thresholds_crossed : [ ...state . emittedThresholds ] . sort ( ( a , b ) => a - b ) ,
253+ } ;
254+ }
255+
256+ function resetEffectiveTokenGuardForTests ( ) {
257+ etGuardState = createEffectiveTokenState ( ) ;
258+ effectiveTokenConfigCache . rawMax = undefined ;
259+ effectiveTokenConfigCache . rawMultipliers = undefined ;
260+ effectiveTokenConfigCache . parsed = { max : null , multipliers : { } } ;
261+ }
262+
263+ function buildEffectiveTokenLimitError ( etState ) {
264+ return {
265+ error : {
266+ type : 'effective_tokens_limit_reached' ,
267+ message : `Maximum effective tokens reached (${ etState . totalEffectiveTokens . toFixed ( 2 ) } / ${ etState . maxEffectiveTokens } ).` ,
268+ total_effective_tokens : etState . totalEffectiveTokens ,
269+ max_effective_tokens : etState . maxEffectiveTokens ,
270+ } ,
271+ } ;
272+ }
273+
102274// ── Utility ───────────────────────────────────────────────────────────────────
103275
104276/**
@@ -305,6 +477,23 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath =
305477 } ) ;
306478 }
307479
480+ const etBlock = getEffectiveTokenBlockState ( ) ;
481+ if ( etBlock && etBlock . maxExceeded ) {
482+ const duration = Date . now ( ) - startTime ;
483+ metrics . gaugeDec ( 'active_requests' , { provider } ) ;
484+ metrics . increment ( 'requests_total' , { provider, method : req . method , status_class : '4xx' } ) ;
485+ metrics . observe ( 'request_duration_ms' , duration , { provider } ) ;
486+ logRequest ( 'warn' , 'effective_tokens_limit_reached' , {
487+ request_id : requestId ,
488+ provider,
489+ total_effective_tokens : etBlock . totalEffectiveTokens ,
490+ max_effective_tokens : etBlock . maxEffectiveTokens ,
491+ } ) ;
492+ res . writeHead ( 429 , { 'Content-Type' : 'application/json' , 'X-Request-ID' : requestId } ) ;
493+ res . end ( JSON . stringify ( buildEffectiveTokenLimitError ( etBlock ) ) ) ;
494+ return ;
495+ }
496+
308497 const options = {
309498 hostname : targetHost , port : 443 , path : upstreamPath ,
310499 method : req . method , headers,
@@ -362,8 +551,18 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath =
362551
363552 res . writeHead ( proxyRes . statusCode , resHeaders ) ;
364553 proxyRes . pipe ( res ) ;
365-
366- trackTokenUsage ( proxyRes , { requestId, provider, path : sanitizeForLog ( req . url ) , startTime, metrics, billingInfo, initiatorSent } ) ;
554+ trackTokenUsage ( proxyRes , {
555+ requestId,
556+ provider,
557+ path : sanitizeForLog ( req . url ) ,
558+ startTime,
559+ metrics,
560+ billingInfo,
561+ initiatorSent,
562+ onUsage : ( normalizedUsage , model ) => {
563+ applyEffectiveTokenUsage ( normalizedUsage , model ) ;
564+ } ,
565+ } ) ;
367566 } ) ;
368567
369568 proxyReq . on ( 'error' , ( err ) => {
@@ -427,6 +626,20 @@ function proxyWebSocket(req, socket, head, targetHost, injectHeaders, provider,
427626
428627 const upstreamPath = buildUpstreamPath ( req . url , targetHost , basePath ) ;
429628
629+ const etBlock = getEffectiveTokenBlockState ( ) ;
630+ if ( etBlock && etBlock . maxExceeded ) {
631+ logRequest ( 'warn' , 'effective_tokens_limit_reached' , {
632+ request_id : requestId ,
633+ provider,
634+ total_effective_tokens : etBlock . totalEffectiveTokens ,
635+ max_effective_tokens : etBlock . maxEffectiveTokens ,
636+ } ) ;
637+ socket . write ( 'HTTP/1.1 429 Too Many Requests\r\nContent-Type: application/json\r\nConnection: close\r\n\r\n' ) ;
638+ socket . write ( JSON . stringify ( buildEffectiveTokenLimitError ( etBlock ) ) ) ;
639+ socket . destroy ( ) ;
640+ return ;
641+ }
642+
430643 const rateCheck = limiter . check ( provider , 0 ) ;
431644 if ( ! rateCheck . allowed ) {
432645 metrics . increment ( 'rate_limit_rejected_total' , { provider, limit_type : rateCheck . limitType } ) ;
@@ -530,7 +743,16 @@ function proxyWebSocket(req, socket, head, targetHost, injectHeaders, provider,
530743 tlsSocket . pipe ( socket ) ;
531744 socket . pipe ( tlsSocket ) ;
532745
533- trackWebSocketTokenUsage ( tlsSocket , { requestId, provider, path : sanitizeForLog ( req . url ) , startTime, metrics } ) ;
746+ trackWebSocketTokenUsage ( tlsSocket , {
747+ requestId,
748+ provider,
749+ path : sanitizeForLog ( req . url ) ,
750+ startTime,
751+ metrics,
752+ onUsage : ( normalizedUsage , model ) => {
753+ applyEffectiveTokenUsage ( normalizedUsage , model ) ;
754+ } ,
755+ } ) ;
534756
535757 socket . once ( 'close' , ( ) => { finalize ( false ) ; tlsSocket . destroy ( ) ; } ) ;
536758 tlsSocket . once ( 'close' , ( ) => { finalize ( false ) ; socket . destroy ( ) ; } ) ;
@@ -552,4 +774,7 @@ module.exports = {
552774 limiter,
553775 proxyAgent,
554776 HTTPS_PROXY ,
777+ getEffectiveTokenReflectState,
778+ // Exported for tests
779+ resetEffectiveTokenGuardForTests,
555780} ;
0 commit comments