Skip to content

Commit f37db98

Browse files
committed
Add grpc Server
1 parent 0a2bf7e commit f37db98

10 files changed

Lines changed: 686 additions & 37 deletions

File tree

.github/workflows/format.yml

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,22 @@ name: Run formatter on PR
33
on: [push, pull_request]
44

55
jobs:
6-
format:
7-
runs-on: ubuntu-latest
8-
steps:
9-
- name: Checkout repository
10-
uses: actions/checkout@v4
11-
with:
12-
fetch-depth: 0
6+
format:
7+
runs-on: ubuntu-latest
8+
steps:
9+
- name: Checkout repository
10+
uses: actions/checkout@v4
11+
with:
12+
fetch-depth: 0
1313

14-
- name: Setup Node.js
15-
uses: actions/setup-node@v4
16-
with:
17-
node-version: '20'
18-
cache: 'npm'
19-
20-
- name: Install dependencies
21-
run: npm ci
22-
23-
- name: Run formatter
24-
run: npm run format
14+
- name: Setup Node.js
15+
uses: actions/setup-node@v4
16+
with:
17+
node-version: '20'
18+
cache: 'npm'
2519

20+
- name: Install dependencies
21+
run: npm ci
2622

23+
- name: Run formatter
24+
run: npm run format

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"typescript-eslint": "^8.30.1"
3838
},
3939
"dependencies": {
40+
"@grpc/grpc-js": "^1.14.0",
4041
"@trpc/server": "^11.0.1",
4142
"cors": "^2.8.5",
4243
"express": "^5.1.0",

packages/client/src/pages/DashboardPage/RunPage/TracingComponent/TracePanel/latency.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ function nanoToSeconds(nanoTimestamp: string | number): number {
1414
return Number(nanoTimestamp) / 1_000_000_000;
1515
}
1616

17-
const NANOSECONDS_THRESHOLD = 1_000_000;
17+
const NANOSECONDS_THRESHOLD = 1_000_000_000;
1818

1919
const formatLatencyValue = (latencyNs: number) => {
2020
if (latencyNs > NANOSECONDS_THRESHOLD) {

packages/server/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"build": "tsc -p tsconfig.json"
99
},
1010
"dependencies": {
11+
"@grpc/grpc-js": "^1.14.0",
1112
"@opentelemetry/otlp-transformer": "^0.207.0",
1213
"@trpc/server": "^11.0.1",
1314
"@types/google-protobuf": "^3.15.12",

packages/server/src/index.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1+
import * as trpcExpress from '@trpc/server/adapters/express';
12
import express from 'express';
23
import { createServer } from 'http';
3-
import * as trpcExpress from '@trpc/server/adapters/express';
4-
import { initializeDatabase } from './database';
5-
import { appRouter } from './trpc/router';
6-
import { SocketManager } from './trpc/socket';
7-
import { ConfigManager } from '../../shared/src/config';
8-
import path from 'path';
94
import opener from 'opener';
10-
import { promptUser } from '../../shared/src/utils/terminal';
5+
import path from 'path';
116
import portfinder from 'portfinder';
7+
import { ConfigManager } from '../../shared/src/config';
8+
import { promptUser } from '../../shared/src/utils/terminal';
9+
import { initializeDatabase } from './database';
10+
import { OtelGrpcServer } from './otel/grpc-server';
1211
import otelRouter from './otel/router';
12+
import { appRouter } from './trpc/router';
13+
import { SocketManager } from './trpc/socket';
1314

1415
async function initializeServer() {
1516
try {
@@ -64,6 +65,13 @@ async function initializeServer() {
6465
// Initialize SocketManager
6566
SocketManager.init(httpServer);
6667

68+
// Initialize and start gRPC server
69+
70+
const port = configManager.getConfig().port;
71+
console.log('Starting gRPC server on port:', port);
72+
const otelGrpcServer = new OtelGrpcServer(port);
73+
await otelGrpcServer.start();
74+
6775
// Serve static files in development mode
6876
if (process.env.NODE_ENV === 'production') {
6977
const publicPath = path.join(__dirname, '../../public');
@@ -91,7 +99,7 @@ async function initializeServer() {
9199
}
92100
});
93101

94-
return httpServer;
102+
return { httpServer, otelGrpcServer };
95103
} catch (error) {
96104
console.error('Error initializing server:', error);
97105
console.error('Error stack:', (error as Error).stack);
@@ -101,14 +109,22 @@ async function initializeServer() {
101109

102110
// Set up the server and start listening
103111
initializeServer()
104-
.then((server) => {
112+
.then(({ httpServer, otelGrpcServer }) => {
105113
// Handle graceful shutdown
106-
const cleanup = () => {
114+
const cleanup = async () => {
107115
console.log('Closing Socket.IO connections');
108116
SocketManager.close();
109117

118+
console.log('Stopping gRPC server');
119+
try {
120+
await otelGrpcServer.stop();
121+
} catch (error) {
122+
console.error('Error stopping gRPC server:', error);
123+
otelGrpcServer.forceShutdown();
124+
}
125+
110126
console.log('Closing HTTP server');
111-
server.close(() => {
127+
httpServer.close(() => {
112128
console.log('HTTP server closed');
113129
process.exit(0);
114130
});

packages/server/src/otel/generate.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ protoc \
1818
--proto_path=./opentelemetry-proto/ \
1919
opentelemetry-proto/opentelemetry/proto/common/v1/*.proto \
2020
opentelemetry-proto/opentelemetry/proto/resource/v1/*.proto \
21-
opentelemetry-proto/opentelemetry/proto/trace/v1/*.proto
21+
opentelemetry-proto/opentelemetry/proto/trace/v1/*.proto \
22+
opentelemetry-proto/opentelemetry/proto/collector/trace/v1/*.proto \
2223

2324
# 3. Clean up OpenTelemetry proto files:
2425

25-
rm -rf opentelemetry-proto
26+
# rm -rf opentelemetry-proto
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import * as grpc from '@grpc/grpc-js';
2+
import { SpanDao } from '../dao/Trace';
3+
import { SocketManager } from '../trpc/socket';
4+
import { opentelemetry } from './opentelemetry/proto/collector/trace/v1/trace_service';
5+
import * as traceProto from './opentelemetry/proto/trace/v1/trace';
6+
import { SpanProcessor } from './processor';
7+
8+
/**
9+
* gRPC server implementation for OpenTelemetry TraceService
10+
*/
11+
export class OtelGrpcServer {
12+
private server: grpc.Server;
13+
private port: number;
14+
15+
constructor(port: number = 4317) {
16+
this.port = port;
17+
this.server = new grpc.Server();
18+
this.setupService();
19+
}
20+
21+
private setupService(): void {
22+
// Create TraceService implementation
23+
const traceServiceImpl = {
24+
Export: this.handleExport.bind(this),
25+
};
26+
27+
// Add the service to the server
28+
this.server.addService(
29+
opentelemetry.proto.collector.trace.v1
30+
.UnimplementedTraceServiceService.definition,
31+
traceServiceImpl,
32+
);
33+
}
34+
35+
/**
36+
* Handle Export RPC call
37+
*/
38+
private async handleExport(
39+
call: grpc.ServerUnaryCall<
40+
opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest,
41+
opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse
42+
>,
43+
callback: grpc.sendUnaryData<opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse>,
44+
): Promise<void> {
45+
try {
46+
console.debug('[OTEL gRPC] Received ExportTraceServiceRequest');
47+
48+
const request = call.request;
49+
const resourceSpans = request.resource_spans || [];
50+
51+
if (resourceSpans.length === 0) {
52+
console.warn('[OTEL gRPC] Empty resource_spans in request');
53+
const response =
54+
new opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse();
55+
callback(null, response);
56+
return;
57+
}
58+
59+
// Convert ResourceSpans to plain objects for processing
60+
const resourceSpansArray = resourceSpans.map(
61+
(rs: traceProto.opentelemetry.proto.trace.v1.ResourceSpans) =>
62+
rs.toObject(),
63+
);
64+
65+
// Process spans using the existing processor
66+
const spans =
67+
SpanProcessor.batchProcessOTLPTraces(resourceSpansArray);
68+
69+
// Save spans to the database
70+
await SpanDao.saveSpans(spans);
71+
72+
// Broadcast spans to the run room
73+
SocketManager.broadcastSpanDataToRunRoom(spans);
74+
75+
console.debug(
76+
`[OTEL gRPC] Successfully processed ${spans.length} spans`,
77+
);
78+
79+
// Create success response
80+
const response =
81+
new opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse();
82+
callback(null, response);
83+
} catch (error: unknown) {
84+
console.error('[OTEL gRPC] Error processing traces:', error);
85+
const errorMessage =
86+
error instanceof Error ? error.message : 'Unknown error';
87+
88+
const response =
89+
new opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse();
90+
const partialSuccess =
91+
new opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess();
92+
partialSuccess.error_message = errorMessage;
93+
response.partial_success = partialSuccess;
94+
95+
callback(
96+
{
97+
code: grpc.status.INTERNAL,
98+
details: errorMessage,
99+
metadata: new grpc.Metadata(),
100+
},
101+
response,
102+
);
103+
}
104+
}
105+
106+
/**
107+
* Start the gRPC server
108+
*/
109+
public start(): Promise<void> {
110+
return new Promise((resolve, reject) => {
111+
this.server.bindAsync(
112+
`0.0.0.0:${this.port}`,
113+
grpc.ServerCredentials.createInsecure(),
114+
(error: Error | null, port: number) => {
115+
if (error) {
116+
console.error(
117+
`[OTEL gRPC] Failed to start server on port ${this.port}:`,
118+
error,
119+
);
120+
reject(error);
121+
return;
122+
}
123+
124+
this.server.start();
125+
console.log(
126+
`[OTEL gRPC] Server started on port ${port} (0.0.0.0:${this.port})`,
127+
);
128+
resolve();
129+
},
130+
);
131+
});
132+
}
133+
134+
/**
135+
* Stop the gRPC server
136+
*/
137+
public stop(): Promise<void> {
138+
return new Promise((resolve, reject) => {
139+
this.server.tryShutdown((error?: Error) => {
140+
if (error) {
141+
console.error(
142+
'[OTEL gRPC] Error shutting down server:',
143+
error,
144+
);
145+
reject(error);
146+
} else {
147+
console.log('[OTEL gRPC] Server stopped');
148+
resolve();
149+
}
150+
});
151+
});
152+
}
153+
154+
/**
155+
* Force shutdown the gRPC server
156+
*/
157+
public forceShutdown(): void {
158+
this.server.forceShutdown();
159+
console.log('[OTEL gRPC] Server force shutdown');
160+
}
161+
}

0 commit comments

Comments
 (0)