Skip to content

Commit ef51450

Browse files
committed
Merge commit '5eceb31'
2 parents f7b2057 + 5eceb31 commit ef51450

File tree

18 files changed

+382
-26
lines changed

18 files changed

+382
-26
lines changed

backend/src/api/index.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,39 @@ setImmediate(async () => {
6565
// Store chat socket globally for service access
6666
global.devtelWebSocket = chatSocketInstance
6767

68-
// Fix stuck integrations on startup
68+
// Fix stuck integrations on startup - only mark as error if truly stuck
6969
try {
7070
const { sequelize } = await databaseInit()
71+
72+
// Only mark integrations as stuck if:
73+
// 1. They have no active runs (PENDING, PROCESSING, DELAYED)
74+
// 2. Their last run is in a final state (PROCESSED, ERROR)
75+
// 3. They've been in this state for over 1 hour
7176
const stuckIntegrations = await sequelize.query(
72-
`UPDATE integrations
73-
SET status = 'done', "updatedAt" = NOW()
74-
WHERE status IN ('mapping', 'in-progress', 'processing')
75-
AND "updatedAt" < NOW() - INTERVAL '30 minutes'
76-
AND "deletedAt" IS NULL
77+
`UPDATE integrations i
78+
SET status = CASE
79+
WHEN EXISTS (
80+
SELECT 1 FROM "integrationRuns" ir
81+
WHERE ir."integrationId" = i.id
82+
AND ir.state = 'processed'
83+
ORDER BY ir."createdAt" DESC
84+
LIMIT 1
85+
) THEN 'done'
86+
ELSE 'error'
87+
END,
88+
"updatedAt" = NOW()
89+
WHERE i.status IN ('mapping', 'in-progress', 'processing')
90+
AND i."updatedAt" < NOW() - INTERVAL '1 hour'
91+
AND i."deletedAt" IS NULL
92+
AND NOT EXISTS (
93+
SELECT 1 FROM "integrationRuns" ir2
94+
WHERE ir2."integrationId" = i.id
95+
AND ir2.state IN ('pending', 'processing', 'delayed')
96+
)
7797
RETURNING id, platform, status`,
7898
{ type: QueryTypes.UPDATE }
7999
)
100+
80101
if (stuckIntegrations && stuckIntegrations[0] && stuckIntegrations[0].length > 0) {
81102
serviceLogger.info({ count: stuckIntegrations[0].length, integrations: stuckIntegrations[0] }, 'Fixed stuck integrations on startup')
82103
}

backend/src/api/integration/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export default (app) => {
2727
)
2828
app.get(`/tenant/:tenantId/integration`, safeWrap(require('./integrationList').default))
2929
app.get(`/tenant/:tenantId/integration/:id`, safeWrap(require('./integrationFind').default))
30+
app.get(`/tenant/:tenantId/integration/:id/progress`, safeWrap(require('./integrationProgress').default))
3031

3132
app.put(
3233
`/authenticate/:tenantId/:code`,
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import { IntegrationRunState } from '@gitmesh/types'
2+
import Permissions from '../../security/permissions'
3+
import PermissionChecker from '../../services/user/permissionChecker'
4+
import IntegrationRunRepository from '../../database/repositories/integrationRunRepository'
5+
import IntegrationStreamRepository from '../../database/repositories/integrationStreamRepository'
6+
import { IntegrationStreamState } from '../../types/integrationStreamTypes'
7+
8+
export default async (req, res) => {
9+
new PermissionChecker(req).validateHas(Permissions.values.integrationRead)
10+
11+
const { id } = req.params
12+
13+
const runRepo = new IntegrationRunRepository(req)
14+
const streamRepo = new IntegrationStreamRepository(req)
15+
16+
// Get the last run for this integration
17+
const lastRun = await runRepo.findLastRun(id)
18+
19+
if (!lastRun) {
20+
return req.responseHandler.success(req, res, {
21+
hasRun: false,
22+
status: 'no-run',
23+
message: 'No integration run found',
24+
})
25+
}
26+
27+
// Get stream statistics
28+
const allStreams = await streamRepo.findByRunId(lastRun.id, 1, 1000)
29+
30+
const stats = {
31+
total: allStreams.length,
32+
processed: allStreams.filter(s => s.state === IntegrationStreamState.PROCESSED).length,
33+
processing: allStreams.filter(s => s.state === IntegrationStreamState.PROCESSING).length,
34+
pending: allStreams.filter(s => s.state === IntegrationStreamState.PENDING).length,
35+
error: allStreams.filter(s => s.state === IntegrationStreamState.ERROR).length,
36+
}
37+
38+
// Calculate progress percentage
39+
const progress = stats.total > 0
40+
? Math.round(((stats.processed + stats.error) / stats.total) * 100)
41+
: 0
42+
43+
// Determine status message
44+
let statusMessage = ''
45+
let isStuck = false
46+
47+
const now = new Date()
48+
const lastUpdate = new Date(lastRun.updatedAt)
49+
const minutesSinceUpdate = (now.getTime() - lastUpdate.getTime()) / 1000 / 60
50+
51+
if (lastRun.state === IntegrationRunState.PROCESSED) {
52+
statusMessage = `Completed successfully. Processed ${stats.processed} streams.`
53+
} else if (lastRun.state === IntegrationRunState.ERROR) {
54+
statusMessage = `Completed with errors. ${stats.error} streams failed after retries.`
55+
} else if (lastRun.state === IntegrationRunState.DELAYED) {
56+
const delayedUntil = lastRun.delayedUntil ? new Date(lastRun.delayedUntil) : null
57+
if (delayedUntil) {
58+
const minutesUntilResume = Math.max(0, Math.round((delayedUntil.getTime() - now.getTime()) / 1000 / 60))
59+
statusMessage = `Delayed due to rate limiting. Will resume in ${minutesUntilResume} minutes.`
60+
} else {
61+
statusMessage = 'Delayed, waiting to resume...'
62+
}
63+
} else if (lastRun.state === IntegrationRunState.PROCESSING) {
64+
if (stats.total === 0) {
65+
statusMessage = 'Detecting data streams to process...'
66+
} else if (stats.processing > 0) {
67+
statusMessage = `Processing stream ${stats.processed + stats.processing} of ${stats.total}...`
68+
} else if (stats.pending > 0) {
69+
statusMessage = `Processed ${stats.processed} of ${stats.total} streams. ${stats.pending} pending...`
70+
} else {
71+
statusMessage = `Finalizing... ${stats.processed} streams processed.`
72+
}
73+
74+
// Check if stuck
75+
if (minutesSinceUpdate > 60) {
76+
isStuck = true
77+
statusMessage += ` (Warning: No progress for ${Math.round(minutesSinceUpdate)} minutes)`
78+
}
79+
} else if (lastRun.state === IntegrationRunState.PENDING) {
80+
statusMessage = 'Waiting to start...'
81+
}
82+
83+
// Get recent error streams for debugging
84+
const errorStreams = allStreams
85+
.filter(s => s.state === IntegrationStreamState.ERROR)
86+
.slice(0, 5)
87+
.map(s => ({
88+
name: s.name,
89+
error: s.error,
90+
retries: s.retries,
91+
}))
92+
93+
const payload = {
94+
hasRun: true,
95+
runId: lastRun.id,
96+
runState: lastRun.state,
97+
status: lastRun.state,
98+
progress,
99+
stats,
100+
statusMessage,
101+
isStuck,
102+
createdAt: lastRun.createdAt,
103+
updatedAt: lastRun.updatedAt,
104+
processedAt: lastRun.processedAt,
105+
delayedUntil: lastRun.delayedUntil,
106+
errorStreams: errorStreams.length > 0 ? errorStreams : undefined,
107+
}
108+
109+
return req.responseHandler.success(req, res, payload)
110+
}

backend/src/serverless/integrations/services/integrationRunProcessor.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ export class IntegrationRunProcessor extends LoggerBase {
148148

149149
await this.integrationRunRepository.markProcessing(req.runId)
150150
run.state = IntegrationRunState.PROCESSING
151+
152+
logger.info({
153+
runId: req.runId,
154+
integrationId: integration.id,
155+
platform: integration.platform
156+
}, 'Integration run marked as PROCESSING, starting stream detection and processing')
151157

152158
if (integration.settings.updateMemberAttributes) {
153159
logger.trace('Updating member attributes!')
@@ -242,6 +248,12 @@ export class IntegrationRunProcessor extends LoggerBase {
242248
logger.trace('Detecting streams!')
243249
try {
244250
const pendingStreams = await intService.getStreams(stepContext)
251+
logger.info({ count: pendingStreams.length }, 'Detected streams for integration run')
252+
253+
if (pendingStreams.length === 0) {
254+
logger.warn('No streams detected for integration run! This may indicate a configuration issue.')
255+
}
256+
245257
const createStreams: DbIntegrationStreamCreateData[] = pendingStreams.map((s) => ({
246258
runId: req.runId,
247259
tenantId: run.tenantId,
@@ -252,6 +264,8 @@ export class IntegrationRunProcessor extends LoggerBase {
252264
}))
253265
await this.integrationStreamRepository.bulkCreate(createStreams)
254266
await this.integrationRunRepository.touch(run.id)
267+
268+
logger.info({ count: createStreams.length }, 'Created streams in database')
255269
} catch (err) {
256270
if (err.rateLimitResetSeconds) {
257271
// need to delay integration processing
@@ -260,6 +274,7 @@ export class IntegrationRunProcessor extends LoggerBase {
260274
return
261275
}
262276

277+
logger.error(err, 'Error detecting streams for integration run')
263278
throw err
264279
}
265280
}
@@ -275,6 +290,12 @@ export class IntegrationRunProcessor extends LoggerBase {
275290
} else {
276291
nextStream = await this.integrationStreamRepository.getNextStreamToProcess(req.runId)
277292
}
293+
294+
if (!nextStream) {
295+
logger.warn('No streams to process for this integration run')
296+
} else {
297+
logger.info('Starting stream processing loop')
298+
}
278299

279300
while (nextStream) {
280301
if ((req as any).exiting) {
@@ -501,10 +522,18 @@ export class IntegrationRunProcessor extends LoggerBase {
501522
case IntegrationRunState.ERROR:
502523
status = 'error'
503524
break
525+
case IntegrationRunState.DELAYED:
526+
status = 'in-progress' // Keep as in-progress when delayed
527+
break
528+
case IntegrationRunState.PROCESSING:
529+
status = 'in-progress' // Keep as in-progress when processing
530+
break
504531
default:
505532
status = integration.status
506533
}
507534

535+
logger.info({ newState, status, runId: req.runId }, 'Integration run completed, updating integration status')
536+
508537
await IntegrationRepository.update(
509538
integration.id,
510539
{

frontend/components.d.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,55 @@ declare module '@vue/runtime-core' {
1515
CommunityHeader: typeof import('./src/components/landing/CommunityHeader.vue')['default']
1616
CommunityHero: typeof import('./src/components/landing/CommunityHero.vue')['default']
1717
CommunityQuickStart: typeof import('./src/components/landing/CommunityQuickStart.vue')['default']
18+
ElAlert: typeof import('element-plus/es')['ElAlert']
1819
ElAside: typeof import('element-plus/es')['ElAside']
1920
ElAvatar: typeof import('element-plus/es')['ElAvatar']
2021
ElButton: typeof import('element-plus/es')['ElButton']
22+
ElButtonGroup: typeof import('element-plus/es')['ElButtonGroup']
2123
ElCard: typeof import('element-plus/es')['ElCard']
2224
ElCheckbox: typeof import('element-plus/es')['ElCheckbox']
25+
ElCheckboxGroup: typeof import('element-plus/es')['ElCheckboxGroup']
26+
ElCol: typeof import('element-plus/es')['ElCol']
2327
ElCollapse: typeof import('element-plus/es')['ElCollapse']
2428
ElCollapseItem: typeof import('element-plus/es')['ElCollapseItem']
2529
ElContainer: typeof import('element-plus/es')['ElContainer']
30+
ElDatePicker: typeof import('element-plus/es')['ElDatePicker']
2631
ElDialog: typeof import('element-plus/es')['ElDialog']
2732
ElDivider: typeof import('element-plus/es')['ElDivider']
2833
ElDrawer: typeof import('element-plus/es')['ElDrawer']
2934
ElDropdown: typeof import('element-plus/es')['ElDropdown']
3035
ElDropdownItem: typeof import('element-plus/es')['ElDropdownItem']
36+
ElDropdownMenu: typeof import('element-plus/es')['ElDropdownMenu']
37+
ElEmpty: typeof import('element-plus/es')['ElEmpty']
3138
ElFooter: typeof import('element-plus/es')['ElFooter']
3239
ElForm: typeof import('element-plus/es')['ElForm']
3340
ElFormItem: typeof import('element-plus/es')['ElFormItem']
41+
ElIcon: typeof import('element-plus/es')['ElIcon']
3442
ElInput: typeof import('element-plus/es')['ElInput']
43+
ElInputNumber: typeof import('element-plus/es')['ElInputNumber']
3544
ElMain: typeof import('element-plus/es')['ElMain']
3645
ElMenu: typeof import('element-plus/es')['ElMenu']
3746
ElOption: typeof import('element-plus/es')['ElOption']
47+
ElOptionGroup: typeof import('element-plus/es')['ElOptionGroup']
3848
ElPagination: typeof import('element-plus/es')['ElPagination']
3949
ElPopover: typeof import('element-plus/es')['ElPopover']
50+
ElRadio: typeof import('element-plus/es')['ElRadio']
51+
ElRadioButton: typeof import('element-plus/es')['ElRadioButton']
52+
ElRadioGroup: typeof import('element-plus/es')['ElRadioGroup']
53+
ElRow: typeof import('element-plus/es')['ElRow']
54+
ElScrollbar: typeof import('element-plus/es')['ElScrollbar']
4055
ElSelect: typeof import('element-plus/es')['ElSelect']
56+
ElSkeleton: typeof import('element-plus/es')['ElSkeleton']
57+
ElSkeletonItem: typeof import('element-plus/es')['ElSkeletonItem']
58+
ElSlider: typeof import('element-plus/es')['ElSlider']
4159
ElSwitch: typeof import('element-plus/es')['ElSwitch']
60+
ElTable: typeof import('element-plus/es')['ElTable']
61+
ElTableColumn: typeof import('element-plus/es')['ElTableColumn']
62+
ElTabPane: typeof import('element-plus/es')['ElTabPane']
63+
ElTabs: typeof import('element-plus/es')['ElTabs']
4264
ElTag: typeof import('element-plus/es')['ElTag']
65+
ElTimeline: typeof import('element-plus/es')['ElTimeline']
66+
ElTimelineItem: typeof import('element-plus/es')['ElTimelineItem']
4367
ElTooltip: typeof import('element-plus/es')['ElTooltip']
4468
RouterLink: typeof import('vue-router')['RouterLink']
4569
RouterView: typeof import('vue-router')['RouterView']

frontend/src/modules/integration/components/integration-list-item.vue

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@
3737

3838
<span class="text-xs text-gray-200 mr-2">In progress</span>
3939
<el-tooltip
40+
v-if="progressData && progressData.hasRun && progressData.statusMessage"
41+
:content="progressData.statusMessage"
42+
placement="top"
43+
>
44+
<i class="ri-question-line text-gray-400" />
45+
</el-tooltip>
46+
<el-tooltip
47+
v-else
4048
content="Fetching first activities from an integration might take a few minutes"
4149
placement="top"
4250
>
@@ -52,6 +60,17 @@
5260
>
5361
<span class="text-3xs italic text-gray-500">{{ lastSynced.relative }}</span>
5462
</el-tooltip>
63+
<div v-else-if="isConnected && progressData && progressData.hasRun" class="text-3xs italic text-gray-500">
64+
<div v-if="progressData.stats && progressData.stats.total > 0 && progressData.progress > 0">
65+
{{ progressData.progress }}% complete ({{ progressData.stats.processed }}/{{ progressData.stats.total }} streams)
66+
</div>
67+
<div v-else-if="progressData.stats && progressData.stats.total === 0">
68+
Initializing...
69+
</div>
70+
<div v-if="progressData.isStuck" class="text-yellow-600 mt-1">
71+
⚠ May be stuck - check logs
72+
</div>
73+
</div>
5574
</div>
5675
</div>
5776
</div>
@@ -115,11 +134,12 @@
115134

116135
<script setup>
117136
import { useStore } from 'vuex';
118-
import { computed, onMounted, ref } from 'vue';
137+
import { computed, onMounted, onUnmounted, ref } from 'vue';
119138
import { FeatureFlag } from '@/utils/featureFlag';
120139
import AppIntegrationConnect from '@/modules/integration/components/integration-connect.vue';
121140
import { isCurrentDateAfterGivenWorkingDays } from '@/utils/date';
122141
import { ERROR_BANNER_WORKING_DAYS_DISPLAY } from '@/modules/integration/integration-store';
142+
import { IntegrationService } from '@/modules/integration/integration-service';
123143
import moment from 'moment';
124144
125145
const store = useStore();
@@ -130,6 +150,26 @@ const props = defineProps({
130150
},
131151
});
132152
153+
const progressData = ref(null);
154+
let progressInterval = null;
155+
156+
const fetchProgress = async () => {
157+
if (!isConnected.value || isDone.value || isError.value) {
158+
return;
159+
}
160+
161+
try {
162+
const data = await IntegrationService.getProgress(props.integration.id);
163+
// Only update if we got valid data
164+
if (data && typeof data === 'object') {
165+
progressData.value = data;
166+
}
167+
} catch (error) {
168+
// Silently fail - integration might not have a run yet
169+
console.debug('Integration progress not available yet:', error.message);
170+
}
171+
};
172+
133173
onMounted(() => {
134174
moment.updateLocale('en', {
135175
relativeTime: {
@@ -143,6 +183,19 @@ onMounted(() => {
143183
dd: '%dd',
144184
},
145185
});
186+
187+
// Fetch progress immediately if in progress
188+
if (isConnected.value && !isDone.value && !isError.value) {
189+
fetchProgress();
190+
// Poll every 10 seconds
191+
progressInterval = setInterval(fetchProgress, 10000);
192+
}
193+
});
194+
195+
onUnmounted(() => {
196+
if (progressInterval) {
197+
clearInterval(progressInterval);
198+
}
146199
});
147200
148201
const computedClass = computed(() => ({

0 commit comments

Comments
 (0)