Skip to content

Commit 59a7440

Browse files
Cristhianzlogabrielluizautofix-ci[bot]
authored
fix: Resolve event delivery configuration header passing between frontend and backend (#7514)
Co-authored-by: Gabriel Luiz Freitas Almeida <[email protected]> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 467ae9e commit 59a7440

File tree

12 files changed

+54
-46
lines changed

12 files changed

+54
-46
lines changed

src/backend/base/langflow/api/build.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from langflow.api.disconnect import DisconnectHandlerStreamingResponse
1414
from langflow.api.utils import (
1515
CurrentActiveUser,
16+
EventDeliveryType,
1617
build_graph_from_data,
1718
build_graph_from_db,
1819
format_elapsed_time,
@@ -84,12 +85,12 @@ async def get_flow_events_response(
8485
*,
8586
job_id: str,
8687
queue_service: JobQueueService,
87-
stream: bool = True,
88+
event_delivery: EventDeliveryType,
8889
):
8990
"""Get events for a specific build job, either as a stream or single event."""
9091
try:
9192
main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id)
92-
if stream:
93+
if event_delivery in (EventDeliveryType.STREAMING, EventDeliveryType.DIRECT):
9394
if event_task is None:
9495
logger.error(f"No event task found for job {job_id}")
9596
raise HTTPException(status_code=404, detail="No event task found for job")

src/backend/base/langflow/api/utils.py

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import uuid
44
from datetime import timedelta
5+
from enum import Enum
56
from typing import TYPE_CHECKING, Annotated, Any
67

78
from fastapi import Depends, HTTPException, Query
@@ -34,6 +35,12 @@
3435
DbSession = Annotated[AsyncSession, Depends(get_session)]
3536

3637

38+
class EventDeliveryType(str, Enum):
39+
STREAMING = "streaming"
40+
DIRECT = "direct"
41+
POLLING = "polling"
42+
43+
3744
def has_api_terms(word: str):
3845
return "api" in word and ("key" in word or ("token" in word and "tokens" not in word))
3946

src/backend/base/langflow/api/v1/chat.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from langflow.api.utils import (
2828
CurrentActiveUser,
2929
DbSession,
30+
EventDeliveryType,
3031
build_and_cache_graph_from_data,
3132
build_graph_from_db,
3233
format_elapsed_time,
@@ -55,12 +56,10 @@
5556
get_chat_service,
5657
get_queue_service,
5758
get_session,
58-
get_settings_service,
5959
get_telemetry_service,
6060
session_scope,
6161
)
6262
from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService
63-
from langflow.services.settings.service import SettingsService
6463
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload
6564

6665
if TYPE_CHECKING:
@@ -154,7 +153,7 @@ async def build_flow(
154153
current_user: CurrentActiveUser,
155154
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
156155
flow_name: str | None = None,
157-
settings_service: Annotated[SettingsService, Depends(get_settings_service)],
156+
event_delivery: EventDeliveryType = EventDeliveryType.POLLING,
158157
):
159158
"""Build and process a flow, returning a job ID for event polling.
160159
@@ -174,6 +173,7 @@ async def build_flow(
174173
queue_service: Queue service for job management
175174
flow_name: Optional name for the flow
176175
settings_service: Settings service
176+
event_delivery: Optional event delivery type - default is streaming
177177
178178
Returns:
179179
Dict with job_id that can be used to poll for build status
@@ -197,12 +197,14 @@ async def build_flow(
197197
queue_service=queue_service,
198198
flow_name=flow_name,
199199
)
200-
if settings_service.settings.event_delivery != "direct":
200+
201+
# This is required to support FE tests - we need to be able to set the event delivery to direct
202+
if event_delivery == EventDeliveryType.DIRECT:
201203
return {"job_id": job_id}
202204
return await get_flow_events_response(
203205
job_id=job_id,
204206
queue_service=queue_service,
205-
stream=True,
207+
event_delivery=event_delivery,
206208
)
207209

208210

@@ -211,13 +213,13 @@ async def get_build_events(
211213
job_id: str,
212214
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
213215
*,
214-
stream: bool = True,
216+
event_delivery: EventDeliveryType = EventDeliveryType.STREAMING,
215217
):
216218
"""Get events for a specific build job."""
217219
return await get_flow_events_response(
218220
job_id=job_id,
219221
queue_service=queue_service,
220-
stream=stream,
222+
event_delivery=event_delivery,
221223
)
222224

223225

src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx

+3-9
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ export default function NodeStatus({
8787
const isBuilding = useFlowStore((state) => state.isBuilding);
8888
const setNode = useFlowStore((state) => state.setNode);
8989
const version = useDarkStore((state) => state.version);
90-
const config = useGetConfig();
90+
const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery);
9191
const setErrorData = useAlertStore((state) => state.setErrorData);
9292

9393
const postTemplateValue = usePostTemplateValue({
@@ -96,10 +96,6 @@ export default function NodeStatus({
9696
node: data.node,
9797
});
9898

99-
const shouldStreamEvents = () => {
100-
return config.data?.event_delivery === EventDeliveryType.STREAMING;
101-
};
102-
10399
// Start polling when connection is initiated
104100
const startPolling = () => {
105101
window.open(connectionLink, "_blank");
@@ -169,8 +165,7 @@ export default function NodeStatus({
169165
setValidationStatus(null);
170166
buildFlow({
171167
stopNodeId: nodeId,
172-
stream: shouldStreamEvents(),
173-
eventDelivery: config.data?.event_delivery,
168+
eventDelivery: eventDeliveryConfig,
174169
});
175170
}
176171

@@ -265,8 +260,7 @@ export default function NodeStatus({
265260
if (buildStatus === BuildStatus.BUILDING || isBuilding) return;
266261
buildFlow({
267262
stopNodeId: nodeId,
268-
stream: shouldStreamEvents(),
269-
eventDelivery: config.data?.event_delivery,
263+
eventDelivery: eventDeliveryConfig,
270264
});
271265
track("Flow Build - Clicked", { stopNodeId: nodeId });
272266
};

src/frontend/src/controllers/API/api.tsx

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import axios, { AxiosError, AxiosInstance, AxiosRequestConfig } from "axios";
66
import * as fetchIntercept from "fetch-intercept";
77
import { useEffect } from "react";
88
import { Cookies } from "react-cookie";
9-
import { BuildStatus } from "../../constants/enums";
9+
import { BuildStatus, EventDeliveryType } from "../../constants/enums";
1010
import useAlertStore from "../../stores/alertStore";
1111
import useFlowStore from "../../stores/flowStore";
1212
import { checkDuplicateRequestAndStoreRequest } from "./helpers/check-duplicate-requests";
@@ -263,6 +263,7 @@ export type StreamingRequestParams = {
263263
onError?: (statusCode: number) => void;
264264
onNetworkError?: (error: Error) => void;
265265
buildController: AbortController;
266+
eventDeliveryConfig?: EventDeliveryType;
266267
};
267268

268269
async function performStreamingRequest({

src/frontend/src/controllers/API/queries/config/use-get-config.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
3939
const setWebhookPollingInterval = useUtilityStore(
4040
(state) => state.setWebhookPollingInterval,
4141
);
42-
42+
const setEventDelivery = useUtilityStore((state) => state.setEventDelivery);
4343
const { query } = UseRequestProcessor();
4444

4545
const getConfigFn = async () => {
@@ -59,6 +59,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
5959
setWebhookPollingInterval(
6060
data.webhook_polling_interval ?? DEFAULT_POLLING_INTERVAL,
6161
);
62+
setEventDelivery(data.event_delivery ?? EventDeliveryType.POLLING);
6263
}
6364
return data;
6465
};

src/frontend/src/modals/IOModal/new-modal.tsx

+2-7
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,7 @@ export default function IOModal({
155155

156156
const chatValue = useUtilityStore((state) => state.chatValueStore);
157157
const setChatValue = useUtilityStore((state) => state.setChatValueStore);
158-
const config = useGetConfig();
159-
160-
function shouldStreamEvents() {
161-
return config.data?.event_delivery === EventDeliveryType.STREAMING;
162-
}
158+
const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery);
163159

164160
const sendMessage = useCallback(
165161
async ({
@@ -178,8 +174,7 @@ export default function IOModal({
178174
files: files,
179175
silent: true,
180176
session: sessionId,
181-
stream: shouldStreamEvents(),
182-
eventDelivery: config.data?.event_delivery,
177+
eventDelivery: eventDeliveryConfig,
183178
}).catch((err) => {
184179
console.error(err);
185180
});

src/frontend/src/stores/flowStore.ts

-1
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,6 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
835835
edges: get().edges || undefined,
836836
logBuilds: get().onFlowPage,
837837
playgroundPage,
838-
stream,
839838
eventDelivery,
840839
});
841840
get().setIsBuilding(false);

src/frontend/src/stores/utilityStore.ts

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { EventDeliveryType } from "@/constants/enums";
12
import { Pagination, Tag } from "@/types/utils/types";
23
import { UtilityStoreType } from "@/types/zustand/utility";
34
import { create } from "zustand";
@@ -43,4 +44,7 @@ export const useUtilityStore = create<UtilityStoreType>((set, get) => ({
4344
currentSessionId: "",
4445
setCurrentSessionId: (sessionId: string) =>
4546
set({ currentSessionId: sessionId }),
47+
eventDelivery: EventDeliveryType.POLLING,
48+
setEventDelivery: (eventDelivery: EventDeliveryType) =>
49+
set({ eventDelivery }),
4650
}));

src/frontend/src/types/zustand/flow/index.ts

-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ export type FlowStoreType = {
149149
files,
150150
silent,
151151
session,
152-
stream,
153152
eventDelivery,
154153
}: {
155154
startNodeId?: string;
@@ -158,7 +157,6 @@ export type FlowStoreType = {
158157
files?: string[];
159158
silent?: boolean;
160159
session?: string;
161-
stream?: boolean;
162160
eventDelivery?: EventDeliveryType;
163161
}) => Promise<void>;
164162
getFlow: () => { nodes: Node[]; edges: EdgeType[]; viewport: Viewport };

src/frontend/src/types/zustand/utility/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { EventDeliveryType } from "@/constants/enums";
12
import { Pagination, Tag } from "@/types/utils/types";
23

34
export type UtilityStoreType = {
@@ -25,4 +26,6 @@ export type UtilityStoreType = {
2526
setCurrentSessionId: (sessionId: string) => void;
2627
setClientId: (clientId: string) => void;
2728
clientId: string;
29+
eventDelivery: EventDeliveryType;
30+
setEventDelivery: (eventDelivery: EventDeliveryType) => void;
2831
};

src/frontend/src/utils/buildUtils.ts

+19-16
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ type BuildVerticesParams = {
4040
logBuilds?: boolean;
4141
session?: string;
4242
playgroundPage?: boolean;
43-
stream?: boolean;
44-
eventDelivery?: EventDeliveryType;
43+
eventDelivery: EventDeliveryType;
4544
};
4645

4746
function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI {
@@ -148,7 +147,10 @@ export async function buildFlowVerticesWithFallback(
148147
e.message === POLLING_MESSAGES.STREAMING_NOT_SUPPORTED
149148
) {
150149
// Fallback to polling
151-
return await buildFlowVertices({ ...params, stream: false });
150+
return await buildFlowVertices({
151+
...params,
152+
eventDelivery: EventDeliveryType.POLLING,
153+
});
152154
}
153155
throw e;
154156
}
@@ -176,13 +178,16 @@ async function pollBuildEvents(
176178
): Promise<void> {
177179
let isDone = false;
178180
while (!isDone) {
179-
const response = await fetch(`${url}?stream=false`, {
180-
method: "GET",
181-
headers: {
182-
"Content-Type": "application/json",
181+
const response = await fetch(
182+
`${url}?event_delivery=${EventDeliveryType.POLLING}`,
183+
{
184+
method: "GET",
185+
headers: {
186+
"Content-Type": "application/json",
187+
},
188+
signal: abortController.signal, // Add abort signal to fetch
183189
},
184-
signal: abortController.signal, // Add abort signal to fetch
185-
});
190+
);
186191

187192
if (!response.ok) {
188193
const errorData = await response.json().catch(() => ({}));
@@ -241,7 +246,6 @@ export async function buildFlowVertices({
241246
logBuilds,
242247
session,
243248
playgroundPage,
244-
stream = true,
245249
eventDelivery,
246250
}: BuildVerticesParams) {
247251
const inputs = {};
@@ -260,10 +264,10 @@ export async function buildFlowVertices({
260264
queryParams.append("log_builds", logBuilds.toString());
261265
}
262266

263-
// Add stream parameter when using direct event delivery
264-
if (eventDelivery === EventDeliveryType.DIRECT) {
265-
queryParams.append("stream", "true");
266-
}
267+
queryParams.append(
268+
"event_delivery",
269+
eventDelivery ?? EventDeliveryType.POLLING,
270+
);
267271

268272
if (queryParams.toString()) {
269273
buildUrl = `${buildUrl}?${queryParams.toString()}`;
@@ -376,13 +380,12 @@ export async function buildFlowVertices({
376380
}
377381
});
378382
useFlowStore.getState().setBuildController(buildController);
379-
380383
// Then stream the events
381384
const eventsUrl = `${BASE_URL_API}build/${job_id}/events`;
382385
const buildResults: Array<boolean> = [];
383386
const verticesStartTimeMs: Map<string, number> = new Map();
384387

385-
if (stream) {
388+
if (eventDelivery === EventDeliveryType.STREAMING) {
386389
return performStreamingRequest({
387390
method: "GET",
388391
url: eventsUrl,

0 commit comments

Comments
 (0)