Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 116 additions & 7 deletions sdks/typescript/src/apache_beam/worker/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,102 @@ export interface StateProvider {
}

// TODO: (Advanced) Cross-bundle caching.
/**
* Wrapper for cached values that tracks their weight (memory size).
*/
interface WeightedCacheEntry<T> {
entry: MaybePromise<T>;
weight: number;
}

/**
* Estimates the memory size of a value in bytes.
* This is a simplified estimation - actual memory usage may vary.
*/
function estimateSize(value: any): number {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (value === null || value === undefined) {
return 8;
}

const type = typeof value;

if (type === "boolean") {
return 4;
}
if (type === "number") {
return 8;
}
if (type === "string") {
// Each character is 2 bytes in JavaScript (UTF-16) + overhead
return 40 + value.length * 2;
}
if (value instanceof Uint8Array || value instanceof Buffer) {
return 40 + value.length;
}
if (Array.isArray(value)) {
let size = 40; // Array overhead
for (const item of value) {
size += estimateSize(item);
}
return size;
}
if (type === "object") {
let size = 40; // Object overhead
for (const key of Object.keys(value)) {
size += estimateSize(key) + estimateSize(value[key]);
}
return size;
}

// Default for unknown types
return 64;
}
Comment on lines +61 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The recursive implementation of estimateSize for objects and arrays does not handle circular references. If a cached object contains a cycle (e.g., a.b = a), this function will enter an infinite loop, leading to a stack overflow and crashing the worker. This is a critical stability risk.

function estimateSize(value: any, visited = new Set<any>()): number {
  if (value === null || value === undefined) {
    return 8;
  }

  if (typeof value === "object") {
    if (visited.has(value)) {
      return 8; // Account for reference size, not the full object again.
    }
    visited.add(value);
  }

  const type = typeof value;

  if (type === "boolean") {
    return 4;
  }
  if (type === "number") {
    return 8;
  }
  if (type === "string") {
    // Each character is 2 bytes in JavaScript (UTF-16) + overhead
    return 40 + value.length * 2;
  }
  if (value instanceof Uint8Array || value instanceof Buffer) {
    return 40 + value.length;
  }
  if (Array.isArray(value)) {
    let size = 40; // Array overhead
    for (const item of value) {
      size += estimateSize(item, visited);
    }
    return size;
  }
  if (type === "object") {
    let size = 40; // Object overhead
    for (const key of Object.keys(value)) {
      size += estimateSize(key, visited) + estimateSize(value[key], visited);
    }
    return size;
  }

  // Default for unknown types
  return 64;
}


// Default cache size: 100MB
const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;

export class CachingStateProvider implements StateProvider {
underlying: StateProvider;
cache: Map<string, MaybePromise<any>> = new Map();
cache: Map<string, WeightedCacheEntry<any>> = new Map();
maxCacheWeight: number;
currentWeight: number = 0;

constructor(underlying: StateProvider) {
constructor(
underlying: StateProvider,
maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
) {
this.underlying = underlying;
this.maxCacheWeight = maxCacheWeight;
}

/**
* Evicts least recently used entries until the cache is under the weight limit.
* JavaScript Maps preserve insertion order, so the first entry is the oldest.
*/
private evictIfNeeded() {
while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
// Remove the first (oldest) entry
const firstKey = this.cache.keys().next().value;
if (firstKey !== undefined) {
const evicted = this.cache.get(firstKey);
if (evicted !== undefined) {
this.currentWeight -= evicted.weight;
}
this.cache.delete(firstKey);
}
}
}
Comment on lines +121 to +133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The evictIfNeeded method can be made more efficient and readable. Currently, it gets the first key using this.cache.keys().next().value and then performs a separate this.cache.get(firstKey) lookup. You can achieve this in a single operation by iterating over this.cache.entries().

  private evictIfNeeded() {
    while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
      // JavaScript Maps preserve insertion order, so the first entry is the oldest.
      const [firstKey, evicted] = this.cache.entries().next().value;
      this.currentWeight -= evicted.weight;
      this.cache.delete(firstKey);
    }
  }


/**
* Moves a cache entry to the end (most recently used) by deleting and re-adding it.
* This maintains LRU order: most recently accessed items are at the end.
*/
private touchCacheEntry(cacheKey: string) {
const value = this.cache.get(cacheKey);
if (value !== undefined) {
this.cache.delete(cacheKey);
this.cache.set(cacheKey, value);
}
}

getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
Expand All @@ -62,21 +152,40 @@ export class CachingStateProvider implements StateProvider {
"base64",
);
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey)!;
// Cache hit: move to end (most recently used)
this.touchCacheEntry(cacheKey);
return this.cache.get(cacheKey)!.entry;
}
// Cache miss: fetch from underlying provider
let result = this.underlying.getState(stateKey, decode);
const this_ = this;
if (result.type === "promise") {
result = {
type: "promise",
promise: result.promise.then((value) => {
this_.cache.set(cacheKey, { type: "value", value });
// When promise resolves, update cache with resolved value
// Get the current entry to update its weight
const currentEntry = this.cache.get(cacheKey);
if (currentEntry !== undefined) {
// Remove old weight from total
this.currentWeight -= currentEntry.weight;
}
const resolvedWeight = estimateSize(value);
this.cache.set(cacheKey, {
entry: { type: "value", value },
weight: resolvedWeight,
});
this.currentWeight += resolvedWeight;
this.evictIfNeeded();
return value;
}),
};
}
// TODO: (Perf) Cache eviction.
this.cache.set(cacheKey, result);
// Estimate weight for the new entry
const weight = result.type === "value" ? estimateSize(result.value) : 64; // Promise placeholder weight
// Evict if needed before adding new entry
this.currentWeight += weight;
this.evictIfNeeded();
this.cache.set(cacheKey, { entry: result, weight });
Comment on lines +185 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a bug in the ordering of operations for a synchronous cache miss. evictIfNeeded() is called before the new item is added to the cache via this.cache.set(). If the cache is empty and a new item's weight exceeds maxCacheWeight, evictIfNeeded() will not run (as this.cache.size is 0). The oversized item is then added, causing the cache to violate its weight limit until the next eviction event. The item should be added to the cache before eviction is checked.

    // Add new entry and then evict if needed
    this.currentWeight += weight;
    this.cache.set(cacheKey, { entry: result, weight });
    this.evictIfNeeded();

return result;
}
}
Expand Down
Loading