Skip to content

fix: Resolve event delivery configuration header passing between frontend and backend #7514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
67e2f7d
📝 (chat.py): add support for passing request object to build_flow fun…
Cristhianzl Apr 8, 2025
7ad1570
📝 (chat.py): add comment explaining the purpose of setting event deli…
Cristhianzl Apr 9, 2025
3b963a4
📝 (chat.py): add request parameter to build_flow function to access F…
Cristhianzl Apr 9, 2025
9539d5c
📝 (build.py): Add EventDeliveryType enum to support different event d…
Cristhianzl Apr 9, 2025
c66152e
🐛 (build.py): fix condition to check if event_delivery is in a list o…
Cristhianzl Apr 9, 2025
c5b9347
[autofix.ci] apply automated fixes
autofix-ci[bot] Apr 9, 2025
fe20983
♻️ (buildUtils.ts): remove debugger statement from buildFlowVertices …
Cristhianzl Apr 9, 2025
dc871fc
Merge branch 'cz/fix-event-delivery-direct' of github.com:langflow-ai…
Cristhianzl Apr 9, 2025
74d1f61
Update src/frontend/src/utils/buildUtils.ts
Cristhianzl Apr 9, 2025
8cda145
Update src/frontend/src/stores/utilityStore.ts
Cristhianzl Apr 9, 2025
0c069c8
Update src/frontend/src/controllers/API/queries/config/use-get-config.ts
Cristhianzl Apr 9, 2025
02baf74
Update src/backend/base/langflow/api/v1/chat.py
Cristhianzl Apr 9, 2025
0494fa0
Update src/backend/base/langflow/api/v1/chat.py
Cristhianzl Apr 9, 2025
13bd3e4
♻️ (NodeStatus/index.tsx, new-modal.tsx, flowStore.ts, index.ts, buil…
Cristhianzl Apr 9, 2025
b785778
Merge branch 'cz/fix-event-delivery-direct' of github.com:langflow-ai…
Cristhianzl Apr 9, 2025
b241868
♻️ (chat.py): remove unused import and dependency on SettingsService …
Cristhianzl Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/backend/base/langflow/api/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from langflow.api.disconnect import DisconnectHandlerStreamingResponse
from langflow.api.utils import (
CurrentActiveUser,
EventDeliveryType,
build_graph_from_data,
build_graph_from_db,
format_elapsed_time,
Expand Down Expand Up @@ -84,12 +85,12 @@ async def get_flow_events_response(
*,
job_id: str,
queue_service: JobQueueService,
stream: bool = True,
event_delivery: EventDeliveryType,
):
"""Get events for a specific build job, either as a stream or single event."""
try:
main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id)
if stream:
if event_delivery in (EventDeliveryType.STREAMING, EventDeliveryType.DIRECT):
if event_task is None:
logger.error(f"No event task found for job {job_id}")
raise HTTPException(status_code=404, detail="No event task found for job")
Expand Down
7 changes: 7 additions & 0 deletions src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import uuid
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING, Annotated, Any

from fastapi import Depends, HTTPException, Query
Expand Down Expand Up @@ -34,6 +35,12 @@
DbSession = Annotated[AsyncSession, Depends(get_session)]


class EventDeliveryType(str, Enum):
STREAMING = "streaming"
DIRECT = "direct"
POLLING = "polling"


def has_api_terms(word: str):
return "api" in word and ("key" in word or ("token" in word and "tokens" not in word))

Expand Down
16 changes: 9 additions & 7 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from langflow.api.utils import (
CurrentActiveUser,
DbSession,
EventDeliveryType,
build_and_cache_graph_from_data,
build_graph_from_db,
format_elapsed_time,
Expand Down Expand Up @@ -55,12 +56,10 @@
get_chat_service,
get_queue_service,
get_session,
get_settings_service,
get_telemetry_service,
session_scope,
)
from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService
from langflow.services.settings.service import SettingsService
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload

if TYPE_CHECKING:
Expand Down Expand Up @@ -154,7 +153,7 @@ async def build_flow(
current_user: CurrentActiveUser,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
flow_name: str | None = None,
settings_service: Annotated[SettingsService, Depends(get_settings_service)],
event_delivery: EventDeliveryType = EventDeliveryType.POLLING,
):
"""Build and process a flow, returning a job ID for event polling.

Expand All @@ -174,6 +173,7 @@ async def build_flow(
queue_service: Queue service for job management
flow_name: Optional name for the flow
settings_service: Settings service
event_delivery: Optional event delivery type - default is streaming

Returns:
Dict with job_id that can be used to poll for build status
Expand All @@ -197,12 +197,14 @@ async def build_flow(
queue_service=queue_service,
flow_name=flow_name,
)
if settings_service.settings.event_delivery != "direct":

# This is required to support FE tests - we need to be able to set the event delivery to direct
if event_delivery == EventDeliveryType.DIRECT:
return {"job_id": job_id}
return await get_flow_events_response(
job_id=job_id,
queue_service=queue_service,
stream=True,
event_delivery=event_delivery,
)


Expand All @@ -211,13 +213,13 @@ async def get_build_events(
job_id: str,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
*,
stream: bool = True,
event_delivery: EventDeliveryType = EventDeliveryType.STREAMING,
):
"""Get events for a specific build job."""
return await get_flow_events_response(
job_id=job_id,
queue_service=queue_service,
stream=stream,
event_delivery=event_delivery,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export default function NodeStatus({
const isBuilding = useFlowStore((state) => state.isBuilding);
const setNode = useFlowStore((state) => state.setNode);
const version = useDarkStore((state) => state.version);
const config = useGetConfig();
const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery);
const setErrorData = useAlertStore((state) => state.setErrorData);

const postTemplateValue = usePostTemplateValue({
Expand All @@ -96,10 +96,6 @@ export default function NodeStatus({
node: data.node,
});

const shouldStreamEvents = () => {
return config.data?.event_delivery === EventDeliveryType.STREAMING;
};

// Start polling when connection is initiated
const startPolling = () => {
window.open(connectionLink, "_blank");
Expand Down Expand Up @@ -169,8 +165,7 @@ export default function NodeStatus({
setValidationStatus(null);
buildFlow({
stopNodeId: nodeId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
eventDelivery: eventDeliveryConfig,
});
}

Expand Down Expand Up @@ -265,8 +260,7 @@ export default function NodeStatus({
if (buildStatus === BuildStatus.BUILDING || isBuilding) return;
buildFlow({
stopNodeId: nodeId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
eventDelivery: eventDeliveryConfig,
});
track("Flow Build - Clicked", { stopNodeId: nodeId });
};
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/controllers/API/api.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import axios, { AxiosError, AxiosInstance, AxiosRequestConfig } from "axios";
import * as fetchIntercept from "fetch-intercept";
import { useEffect } from "react";
import { Cookies } from "react-cookie";
import { BuildStatus } from "../../constants/enums";
import { BuildStatus, EventDeliveryType } from "../../constants/enums";
import useAlertStore from "../../stores/alertStore";
import useFlowStore from "../../stores/flowStore";
import { checkDuplicateRequestAndStoreRequest } from "./helpers/check-duplicate-requests";
Expand Down Expand Up @@ -263,6 +263,7 @@ export type StreamingRequestParams = {
onError?: (statusCode: number) => void;
onNetworkError?: (error: Error) => void;
buildController: AbortController;
eventDeliveryConfig?: EventDeliveryType;
};

async function performStreamingRequest({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
const setWebhookPollingInterval = useUtilityStore(
(state) => state.setWebhookPollingInterval,
);

const setEventDelivery = useUtilityStore((state) => state.setEventDelivery);
const { query } = UseRequestProcessor();

const getConfigFn = async () => {
Expand All @@ -59,6 +59,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
setWebhookPollingInterval(
data.webhook_polling_interval ?? DEFAULT_POLLING_INTERVAL,
);
setEventDelivery(data.event_delivery ?? EventDeliveryType.POLLING);
}
return data;
};
Expand Down
9 changes: 2 additions & 7 deletions src/frontend/src/modals/IOModal/new-modal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,7 @@ export default function IOModal({

const chatValue = useUtilityStore((state) => state.chatValueStore);
const setChatValue = useUtilityStore((state) => state.setChatValueStore);
const config = useGetConfig();

function shouldStreamEvents() {
return config.data?.event_delivery === EventDeliveryType.STREAMING;
}
const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery);

const sendMessage = useCallback(
async ({
Expand All @@ -178,8 +174,7 @@ export default function IOModal({
files: files,
silent: true,
session: sessionId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
eventDelivery: eventDeliveryConfig,
}).catch((err) => {
console.error(err);
});
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/stores/flowStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,6 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
edges: get().edges || undefined,
logBuilds: get().onFlowPage,
playgroundPage,
stream,
eventDelivery,
});
get().setIsBuilding(false);
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/stores/utilityStore.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventDeliveryType } from "@/constants/enums";
import { Pagination, Tag } from "@/types/utils/types";
import { UtilityStoreType } from "@/types/zustand/utility";
import { create } from "zustand";
Expand Down Expand Up @@ -43,4 +44,7 @@ export const useUtilityStore = create<UtilityStoreType>((set, get) => ({
currentSessionId: "",
setCurrentSessionId: (sessionId: string) =>
set({ currentSessionId: sessionId }),
eventDelivery: EventDeliveryType.POLLING,
setEventDelivery: (eventDelivery: EventDeliveryType) =>
set({ eventDelivery }),
}));
2 changes: 0 additions & 2 deletions src/frontend/src/types/zustand/flow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ export type FlowStoreType = {
files,
silent,
session,
stream,
eventDelivery,
}: {
startNodeId?: string;
Expand All @@ -158,7 +157,6 @@ export type FlowStoreType = {
files?: string[];
silent?: boolean;
session?: string;
stream?: boolean;
eventDelivery?: EventDeliveryType;
}) => Promise<void>;
getFlow: () => { nodes: Node[]; edges: EdgeType[]; viewport: Viewport };
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/types/zustand/utility/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventDeliveryType } from "@/constants/enums";
import { Pagination, Tag } from "@/types/utils/types";

export type UtilityStoreType = {
Expand Down Expand Up @@ -25,4 +26,6 @@ export type UtilityStoreType = {
setCurrentSessionId: (sessionId: string) => void;
setClientId: (clientId: string) => void;
clientId: string;
eventDelivery: EventDeliveryType;
setEventDelivery: (eventDelivery: EventDeliveryType) => void;
};
35 changes: 19 additions & 16 deletions src/frontend/src/utils/buildUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ type BuildVerticesParams = {
logBuilds?: boolean;
session?: string;
playgroundPage?: boolean;
stream?: boolean;
eventDelivery?: EventDeliveryType;
eventDelivery: EventDeliveryType;
};

function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI {
Expand Down Expand Up @@ -148,7 +147,10 @@ export async function buildFlowVerticesWithFallback(
e.message === POLLING_MESSAGES.STREAMING_NOT_SUPPORTED
) {
// Fallback to polling
return await buildFlowVertices({ ...params, stream: false });
return await buildFlowVertices({
...params,
eventDelivery: EventDeliveryType.POLLING,
});
}
throw e;
}
Expand Down Expand Up @@ -176,13 +178,16 @@ async function pollBuildEvents(
): Promise<void> {
let isDone = false;
while (!isDone) {
const response = await fetch(`${url}?stream=false`, {
method: "GET",
headers: {
"Content-Type": "application/json",
const response = await fetch(
`${url}?event_delivery=${EventDeliveryType.POLLING}`,
{
method: "GET",
headers: {
"Content-Type": "application/json",
},
signal: abortController.signal, // Add abort signal to fetch
},
signal: abortController.signal, // Add abort signal to fetch
});
);

if (!response.ok) {
const errorData = await response.json().catch(() => ({}));
Expand Down Expand Up @@ -241,7 +246,6 @@ export async function buildFlowVertices({
logBuilds,
session,
playgroundPage,
stream = true,
eventDelivery,
}: BuildVerticesParams) {
const inputs = {};
Expand All @@ -260,10 +264,10 @@ export async function buildFlowVertices({
queryParams.append("log_builds", logBuilds.toString());
}

// Add stream parameter when using direct event delivery
if (eventDelivery === EventDeliveryType.DIRECT) {
queryParams.append("stream", "true");
}
queryParams.append(
"event_delivery",
eventDelivery ?? EventDeliveryType.POLLING,
);

if (queryParams.toString()) {
buildUrl = `${buildUrl}?${queryParams.toString()}`;
Expand Down Expand Up @@ -376,13 +380,12 @@ export async function buildFlowVertices({
}
});
useFlowStore.getState().setBuildController(buildController);

// Then stream the events
const eventsUrl = `${BASE_URL_API}build/${job_id}/events`;
const buildResults: Array<boolean> = [];
const verticesStartTimeMs: Map<string, number> = new Map();

if (stream) {
if (eventDelivery === EventDeliveryType.STREAMING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#7526 will help with this. let's push this PR and then we fix this

return performStreamingRequest({
method: "GET",
url: eventsUrl,
Expand Down
Loading