Skip to content

#130 AsyncIterator.return() #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions packages/aws-lambda-graphql/src/DynamoDBConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
IConnectionManager,
ISubscriptionManager,
IConnectionData,
ISubscriber,
HydrateConnectionOptions,
} from './types';
import { computeTTL } from './helpers';
Expand Down Expand Up @@ -211,8 +212,10 @@ export class DynamoDBConnectionManager implements IConnectionManager {
}
};

unregisterConnection = async ({ id }: DynamoDBConnection): Promise<void> => {
await Promise.all([
unregisterConnection = async ({
id,
}: DynamoDBConnection): Promise<ISubscriber[]> => {
const [, subscribers] = await Promise.all([
this.db
.delete({
Key: {
Expand All @@ -223,6 +226,8 @@ export class DynamoDBConnectionManager implements IConnectionManager {
.promise(),
this.subscriptions.unsubscribeAllByConnectionId(id),
]);

return subscribers;
};

closeConnection = async ({ id, data }: DynamoDBConnection): Promise<void> => {
Expand Down
10 changes: 8 additions & 2 deletions packages/aws-lambda-graphql/src/DynamoDBSubscriptionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ export class DynamoDBSubscriptionManager implements ISubscriptionManager {
}
};

unsubscribeAllByConnectionId = async (connectionId: string) => {
unsubscribeAllByConnectionId = async (
connectionId: string,
): Promise<ISubscriber[]> => {
const subscribers: ISubscriber[] = [];
let cursor: DynamoDB.DocumentClient.Key | undefined;

do {
Expand All @@ -324,9 +327,10 @@ export class DynamoDBSubscriptionManager implements ISubscriptionManager {
Limit: 12, // Maximum of 25 request items sent to DynamoDB a time
})
.promise();
subscribers.push(...(Items as ISubscriber[]));

if (Items == null || (LastEvaluatedKey == null && Items.length === 0)) {
return;
return subscribers;
}

if (Items.length > 0) {
Expand All @@ -353,6 +357,8 @@ export class DynamoDBSubscriptionManager implements ISubscriptionManager {

cursor = LastEvaluatedKey;
} while (cursor);

return subscribers;
};

generateSubscriptionId = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ export class MemorySubscriptionManager implements ISubscriptionManager {
);
}

return Promise.resolve();
return Promise.resolve([]);
};
}
5 changes: 3 additions & 2 deletions packages/aws-lambda-graphql/src/RedisConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ export class RedisConnectionManager implements IConnectionManager {
}
};

unregisterConnection = async ({ id }: IConnection): Promise<void> => {
unregisterConnection = async ({ id }: IConnection) => {
const key = prefixRedisKey(`connection:${id}`);
await Promise.all([
const [, subscribers] = await Promise.all([
this.redisClient.del(key),
this.subscriptions.unsubscribeAllByConnectionId(id),
]);
return subscribers;
};

closeConnection = async ({ id, data }: IConnection): Promise<void> => {
Expand Down
3 changes: 3 additions & 0 deletions packages/aws-lambda-graphql/src/RedisSubscriptionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ export class RedisSubscriptionManager implements ISubscriptionManager {
};

unsubscribeAllByConnectionId = async (connectionId: string) => {
const subscribers: ISubscriber[] = [];
let done = false;
const limit = 50;
let offset = 0;
Expand All @@ -244,6 +245,7 @@ export class RedisSubscriptionManager implements ISubscriptionManager {
const result = await this.redisClient.get(key);
if (result) {
subscriber = JSON.parse(result);
subscribers.push(subscriber as ISubscriber);
const subscriptionId = this.generateSubscriptionId(
connectionId,
subscriber.operationId,
Expand All @@ -270,6 +272,7 @@ export class RedisSubscriptionManager implements ISubscriptionManager {
}
} while (!done);
await this.redisClient.del(subscriptionListKey);
return subscribers;
};

generateSubscriptionId = (
Expand Down
40 changes: 38 additions & 2 deletions packages/aws-lambda-graphql/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
Context as LambdaContext,
Handler as LambdaHandler,
} from 'aws-lambda';
import { isAsyncIterable } from 'iterall';
import { isAsyncIterable, getAsyncIterator } from 'iterall';
import { ExecutionResult } from 'graphql';
import { PubSub } from 'graphql-subscriptions';
import {
Expand Down Expand Up @@ -357,7 +357,43 @@ export class Server<
onDisconnect(connection);
}

await this.connectionManager.unregisterConnection(connection);
const subscribers = await this.connectionManager.unregisterConnection(
connection,
);

const promises = subscribers.map(async (subscriber) => {
const pubSub = new PubSub();

const options = await this.createGraphQLServerOptions(
event,
lambdaContext,
{
connection,
operation: subscriber.operation,
pubSub,
},
);

const iterable = await execute({
...options,
connection,
connectionManager: this.connectionManager,
event,
lambdaContext,
operation: subscriber.operation,
pubSub,
registerSubscriptions: false,
subscriptionManager: this.subscriptionManager,
});

if (!isAsyncIterable(iterable)) {
return;
}

const iterator = getAsyncIterator(iterable);
await iterator.return?.();
});
await Promise.all(promises);

return {
body: '',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ export class WebSocketConnectionManager implements IConnectionManager {
});
};

unregisterConnection = async (connection: IConnection): Promise<void> => {
unregisterConnection = async (connection: IConnection) => {
this.connections.delete(connection.id);
return [];
};

closeConnection = async (connection: WSConnection): Promise<void> => {
Expand Down
5 changes: 4 additions & 1 deletion packages/aws-lambda-graphql/src/__tests__/Server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ describe('Server', () => {
{},
);
(connectionManager.unregisterConnection as jest.Mock).mockResolvedValueOnce(
undefined,
[],
);

await expect(
Expand Down Expand Up @@ -478,6 +478,9 @@ describe('Server', () => {
(connectionManager.hydrateConnection as jest.Mock).mockResolvedValueOnce(
{},
);
(connectionManager.unregisterConnection as jest.Mock).mockResolvedValueOnce(
[],
);

await expect(
handlerWithOnDisconnect(
Expand Down
28 changes: 28 additions & 0 deletions packages/aws-lambda-graphql/src/types/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export interface IConnection {
/**
* Unique connection id
*/
readonly id: string;

/**
* Extra connection data, this data is stored only upon registration
* All values should be JSON serializable
*/
readonly data: IConnectionData;
}

export interface IConnectionData {
[key: string]: any;

/**
* Connection context data provided from GQL_CONNECTION_INIT message or from onConnect method
* This data is passed to graphql resolvers' context
* All values should be JSON serializable
*/
context: Object;

/**
* Indicates whether connection sent GQL_CONNECTION_INIT message or
*/
readonly isInitialized: boolean;
}
32 changes: 4 additions & 28 deletions packages/aws-lambda-graphql/src/types/connections.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,7 @@
export interface IConnection {
/**
* Unique connection id
*/
readonly id: string;

/**
* Extra connection data, this data is stored only upon registration
* All values should be JSON serializable
*/
readonly data: IConnectionData;
}

export interface IConnectionData {
[key: string]: any;
import { IConnection, IConnectionData } from './connection';
import { ISubscriber } from './subscriptions';

/**
* Connection context data provided from GQL_CONNECTION_INIT message or from onConnect method
* This data is passed to graphql resolvers' context
* All values should be JSON serializable
*/
context: Object;

/**
* Indicates whether connection sent GQL_CONNECTION_INIT message or
*/
readonly isInitialized: boolean;
}
export { IConnection, IConnectionData };

export interface HydrateConnectionOptions {
/**
Expand Down Expand Up @@ -58,6 +34,6 @@ export interface IConnectionManager {
connection: IConnection,
payload: string | Buffer,
): Promise<void>;
unregisterConnection(connection: IConnection): Promise<void>;
unregisterConnection(connection: IConnection): Promise<ISubscriber[]>;
closeConnection(connection: IConnection): Promise<void>;
}
4 changes: 2 additions & 2 deletions packages/aws-lambda-graphql/src/types/subscriptions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IConnection } from './connections';
import { IConnection } from './connection';
import { IdentifiedOperationRequest, OperationRequest } from './operations';

export interface ISubscriptionEvent {
Expand Down Expand Up @@ -51,5 +51,5 @@ export interface ISubscriptionManager {
*
* @param connectionId
*/
unsubscribeAllByConnectionId(connectionId: string): Promise<any>;
unsubscribeAllByConnectionId(connectionId: string): Promise<ISubscriber[]>;
}