Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/mail/components/mail/reply-composer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ export default function ReplyCompose({ messageId }: ReplyComposeProps) {
if (!mode || !emailData) return null;

return (
<div className="w-full rounded-xl">
<div className="w-full rounded-xl overflow-visible">
<EmailComposer
editorClassName="min-h-[50px]"
className="w-full !max-w-none border pb-1"
className="w-full !max-w-none border pb-1 overflow-visible"
onSendEmail={handleSendEmail}
onClose={async () => {
setMode(null);
Expand Down
76 changes: 34 additions & 42 deletions apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,51 +711,43 @@ export default class extends WorkerEntrypoint<typeof env> {
switch (true) {
case batch.queue.startsWith('subscribe-queue'): {
console.log('batch', batch);
try {
await Promise.all(
batch.messages.map(async (msg: Message<ISubscribeBatch>) => {
const connectionId = msg.body.connectionId;
const providerId = msg.body.providerId;
try {
await enableBrainFunction({ id: connectionId, providerId });
} catch (error) {
console.error(
`Failed to enable brain function for connection ${connectionId}:`,
error,
);
}
}),
);
console.log('[SUBSCRIBE_QUEUE] batch done');
} finally {
batch.ackAll();
}
await Promise.all(
batch.messages.map(async (msg: Message<ISubscribeBatch>) => {
const connectionId = msg.body.connectionId;
const providerId = msg.body.providerId;
try {
await enableBrainFunction({ id: connectionId, providerId });
} catch (error) {
console.error(
`Failed to enable brain function for connection ${connectionId}:`,
error,
);
}
}),
);
console.log('[SUBSCRIBE_QUEUE] batch done');
return;
}
case batch.queue.startsWith('thread-queue'): {
try {
await Promise.all(
batch.messages.map(async (msg: Message<IThreadBatch>) => {
const providerId = msg.body.providerId;
const historyId = msg.body.historyId;
const subscriptionName = msg.body.subscriptionName;
const workflow = runWorkflow(EWorkflowType.MAIN, {
providerId,
historyId,
subscriptionName,
});

try {
const result = await Effect.runPromise(workflow);
console.log('[THREAD_QUEUE] result', result);
} catch (error) {
console.error('Error running workflow', error);
}
}),
);
} finally {
batch.ackAll();
}
await Promise.all(
batch.messages.map(async (msg: Message<IThreadBatch>) => {
const providerId = msg.body.providerId;
const historyId = msg.body.historyId;
const subscriptionName = msg.body.subscriptionName;
const workflow = runWorkflow(EWorkflowType.MAIN, {
providerId,
historyId,
subscriptionName,
});

try {
const result = await Effect.runPromise(workflow);
console.log('[THREAD_QUEUE] result', result);
} catch (error) {
console.error('Error running workflow', error);
}
}),
);
break;
}
}
Expand Down
103 changes: 60 additions & 43 deletions apps/server/src/pipelines.effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import {
analyzeEmailIntent,
} from './thread-workflow-utils';
import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types';
import { EWorkflowType, getPromptName, runWorkflow } from './pipelines';
import { getZeroAgent } from './lib/server-utils';
import { type gmail_v1 } from '@googleapis/gmail';
import { getPromptName } from './pipelines';
import { env } from 'cloudflare:workers';
import { connection } from './db/schema';
import { Effect, Console } from 'effect';
Expand Down Expand Up @@ -140,7 +140,7 @@ export const runMainWorkflow = (
nextHistoryId: historyId,
};

const result = yield* runZeroWorkflow(zeroWorkflowParams).pipe(
const result = yield* runWorkflow(EWorkflowType.ZERO, zeroWorkflowParams).pipe(
Effect.mapError(
(error): MainWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, error }),
),
Expand Down Expand Up @@ -185,16 +185,22 @@ export const runZeroWorkflow = (
const { connectionId, historyId, nextHistoryId } = params;

const historyProcessingKey = `history_${connectionId}__${historyId}`;
const isProcessing = yield* Effect.tryPromise({
try: () => env.gmail_processing_threads.get(historyProcessingKey),

// Atomic lock acquisition to prevent race conditions
const lockAcquired = yield* Effect.tryPromise({
try: async () => {
const response = await env.gmail_processing_threads.put(historyProcessingKey, 'true', {
expirationTtl: 3600,
});
return response !== null; // null means key already existed
},
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});

if (isProcessing === 'true') {
if (!lockAcquired) {
yield* Console.log('[ZERO_WORKFLOW] History already being processed:', {
connectionId,
historyId,
processingStatus: isProcessing,
});
return yield* Effect.fail({
_tag: 'HistoryAlreadyProcessing' as const,
Expand All @@ -203,12 +209,10 @@ export const runZeroWorkflow = (
});
}

yield* Effect.tryPromise({
try: () =>
env.gmail_processing_threads.put(historyProcessingKey, 'true', { expirationTtl: 3600 }),
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});
yield* Console.log('[ZERO_WORKFLOW] Set processing flag for history:', historyProcessingKey);
yield* Console.log(
'[ZERO_WORKFLOW] Acquired processing lock for history:',
historyProcessingKey,
);

const { db, conn } = createDb(env.HYPERDRIVE.connectionString);

Expand Down Expand Up @@ -257,11 +261,6 @@ export const runZeroWorkflow = (
catch: (error) => ({ _tag: 'GmailApiError' as const, error }),
});

if (!history.length) {
yield* Console.log('[ZERO_WORKFLOW] No history found, skipping');
return 'No history found';
}

yield* Effect.tryPromise({
try: () => {
console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId);
Expand All @@ -270,29 +269,34 @@ export const runZeroWorkflow = (
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});

if (!history.length) {
yield* Console.log('[ZERO_WORKFLOW] No history found, skipping');
return 'No history found';
}

// Extract thread IDs from history
const threadsChanged = new Set<string>();
const threadsAdded = new Set<string>();
history.forEach((historyItem) => {
if (historyItem.messagesAdded) {
historyItem.messagesAdded.forEach((messageAdded) => {
if (messageAdded.message?.threadId) {
threadsChanged.add(messageAdded.message.threadId);
// threadsChanged.add(messageAdded.message.threadId);
threadsAdded.add(messageAdded.message.threadId);
}
});
}
if (historyItem.labelsAdded) {
historyItem.labelsAdded.forEach((labelAdded) => {
if (labelAdded.message?.threadId) {
threadsChanged.add(labelAdded.message.threadId);
// threadsChanged.add(labelAdded.message.threadId);
}
});
}
if (historyItem.labelsRemoved) {
historyItem.labelsRemoved.forEach((labelRemoved) => {
if (labelRemoved.message?.threadId) {
threadsChanged.add(labelRemoved.message.threadId);
// threadsChanged.add(labelRemoved.message.threadId);
}
});
}
Expand Down Expand Up @@ -355,6 +359,17 @@ export const runZeroWorkflow = (
const threadResults = yield* Effect.all(
threadWorkflowParams.map((params) =>
Effect.gen(function* () {
// Check if thread is already processing
const isProcessing = yield* Effect.tryPromise({
try: () => env.gmail_processing_threads.get(params.threadId.toString()),
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});

if (isProcessing === 'true') {
yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId);
return 'Thread already processing';
}

// Set processing flag for thread
yield* Effect.tryPromise({
try: () => {
Expand All @@ -369,19 +384,8 @@ export const runZeroWorkflow = (
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});

// Check if thread is already processing
const isProcessing = yield* Effect.tryPromise({
try: () => env.gmail_processing_threads.get(params.threadId.toString()),
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});

if (isProcessing === 'true') {
yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId);
return 'Thread already processing';
}

// Run the thread workflow
return yield* runThreadWorkflow(params).pipe(
return yield* runWorkflow(EWorkflowType.THREAD, params).pipe(
Effect.mapError(
(error): ZeroWorkflowError => ({
_tag: 'WorkflowCreationFailed' as const,
Expand All @@ -394,19 +398,22 @@ export const runZeroWorkflow = (
{ concurrency: 1, discard: true }, // Process up to 5 threads concurrently
);

yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length);
yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults);
} else {
yield* Console.log('[ZERO_WORKFLOW] No threads to process');
}

// // Clean up processing flag
// yield* Effect.tryPromise({
// try: () => {
// console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey);
// return env.gmail_processing_threads.delete(historyProcessingKey);
// },
// catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
// }).pipe(Effect.orElse(() => Effect.succeed(null)));
// Clean up processing flag
yield* Effect.tryPromise({
try: () => {
console.log(
'[ZERO_WORKFLOW] Clearing processing flag for history:',
historyProcessingKey,
);
return env.gmail_processing_threads.delete(historyProcessingKey);
},
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
}).pipe(Effect.orElse(() => Effect.succeed(null)));

yield* Console.log('[ZERO_WORKFLOW] Processing complete');
return 'Zero workflow completed successfully';
Expand Down Expand Up @@ -887,8 +894,18 @@ export const runThreadWorkflow = (
add: labelsToAdd,
remove: labelsToRemove,
});
await agent.modifyLabels([threadId.toString()], labelsToAdd, labelsToRemove);
await agent.syncThread({ threadId: threadId.toString() });
await agent.modifyThreadLabelsInDB(
threadId.toString(),
labelsToAdd,
labelsToRemove,
);
await agent.modifyLabels(
[threadId.toString()],
labelsToAdd,
labelsToRemove,
true,
);
// await agent.syncThread({ threadId: threadId.toString() });
console.log('[THREAD_WORKFLOW] Successfully modified thread labels');
} else {
console.log('[THREAD_WORKFLOW] No label changes needed - labels already match');
Expand Down
Loading