Skip to content

Update SFTP filename #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions plugins/sftp/service/src/adapters/adapters.sftp.mongoose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ const SftpObservationsSchema = new mongoose.Schema({
eventId: { type: Number, required: true, unique: true },
observationId: { type: String, required: true },
status: { type: String, enum: Object.values(SftpStatus), required: true }
},{
timestamps: { createdAt: 'createdAt',updatedAt: 'updatedAt' }
}, {
timestamps: { createdAt: 'createdAt', updatedAt: 'updatedAt' }
});

export interface SftpAttrs {
Expand Down Expand Up @@ -48,17 +48,17 @@ export class MongooseSftpObservationRepository implements SftpObservationReposit
}

async findAll(eventId: MageEventId): Promise<SftpAttrs[]> {
const documents = await this.model.find({eventId: eventId})
const documents = await this.model.find({ eventId: eventId })
return documents.map(document => document.toJSON())
}

async findAllByStatus(eventId: MageEventId, status: SftpStatus[]): Promise<SftpAttrs[]> {
const documents = await this.model.find({eventId: eventId, status: { $in: status}})
const documents = await this.model.find({ eventId: eventId, status: { $in: status } })
return documents.map(document => document.toJSON())
}

async findLatest(eventId: MageEventId): Promise<SftpAttrs | null> {
const document = await this.model.findOne({ eventId: eventId }, { updatedAt: true }, { sort: { updatedAt: -1 }, limit: 1 })
const document = await this.model.findOne({ eventId: eventId }, { updatedAt: true }, { sort: { updatedAt: -1 }, limit: 1 })
return document ? (document.toJSON() as SftpAttrs) : null
}

Expand Down
32 changes: 32 additions & 0 deletions plugins/sftp/service/src/adapters/adapters.sftp.teams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import mongoose from 'mongoose';

const TeamSchema = new mongoose.Schema({
name: { type: String, required: true },
userIds: [{ type: mongoose.Schema.Types.ObjectId }],
id: { type: String, required: true },
acl: { type: Object, required: true }
});

export interface TeamDoc extends mongoose.Document {
name: string;
id: mongoose.Types.ObjectId;
userIds: mongoose.Types.ObjectId[];
teamEventId?: string;
}

export class MongooseTeamsRepository {
readonly model: mongoose.Model<TeamDoc>;

constructor(connection: mongoose.Connection) {
this.model = connection.model<TeamDoc>('teams', TeamSchema);
}

async findTeamsByUserId(userId: string | undefined): Promise<TeamDoc[]> {
if (!userId) {
return [];
}
const userObjectId = new mongoose.Types.ObjectId(userId);
const teams = await this.model.find({ userIds: userObjectId }).exec();
return teams.map(team => team.toJSON() as TeamDoc);
}
}
30 changes: 30 additions & 0 deletions plugins/sftp/service/src/adapters/adapters.sftp.users.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import mongoose from 'mongoose';

const UserSchema = new mongoose.Schema({
username: { type: String, required: true },
_id: [{ type: mongoose.Schema.Types.ObjectId }],
active: { type: Boolean, required: true }
});

export interface UserDoc extends mongoose.Document {
username: string;
_id: mongoose.Types.ObjectId;
active: boolean;
}

export class MongooseUsersRepository {
readonly model: mongoose.Model<UserDoc>;

constructor(connection: mongoose.Connection) {
this.model = connection.model<UserDoc>('users', UserSchema);
}

async findUserById(userId: string | undefined): Promise<UserDoc | null> {
if (!userId) {
return null;
}
const userObjectId = new mongoose.Types.ObjectId(userId);
const user = await this.model.findOne({ _id: userObjectId }).exec();
return user ? (user.toJSON() as UserDoc) : null;
}
}
86 changes: 74 additions & 12 deletions plugins/sftp/service/src/controller/controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import SFTPClient from 'ssh2-sftp-client';
import { PageOf } from '@ngageoint/mage.service/lib/entities/entities.global'
import { ArchiveResult, ArchiveStatus, ArchiverFactory, ObservationArchiver, TriggerRule } from '../format/entities.format'
import archiver from 'archiver'
import { MongooseTeamsRepository } from 'src/adapters/adapters.sftp.teams'
import { MongooseUsersRepository } from 'src/adapters/adapters.sftp.users'

function newEvent(id: MageEventId): MageEventAttrs {
return {
Expand Down Expand Up @@ -39,7 +41,7 @@ function newEvent(id: MageEventId): MageEventAttrs {
}

function newObservation(event: MageEvent, lastModified: Date): ObservationAttrs {
return {
return {
id: "1",
eventId: event.id,
userId: "test",
Expand Down Expand Up @@ -88,7 +90,7 @@ describe('automated processing', () => {
let allEvents: Map<MageEventId, MageEvent>
let stateRepository: TestPluginStateRepository
let eventObservationRepositories: Map<MageEventId, jasmine.SpyObj<EventScopedObservationRepository>>
let observationRepository: (event: MageEventId) => Promise<jasmine.SpyObj<EventScopedObservationRepository>>
let observationRepository: (event: MageEventId) => Promise<jasmine.SpyObj<EventScopedObservationRepository>>
let archiveFactory: jasmine.SpyObj<ArchiverFactory>
let sftpClient: jasmine.SpyObj<SFTPClient>
let clock: jasmine.Clock
Expand Down Expand Up @@ -127,6 +129,8 @@ describe('automated processing', () => {
eventRepository.findActiveEvents.and.resolveTo([])

const sftpRepository = jasmine.createSpyObj<SftpObservationRepository>('sftpRespository', ['findAll'])
const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])

stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1
Expand All @@ -140,7 +144,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand Down Expand Up @@ -174,6 +180,12 @@ describe('automated processing', () => {
sftpRepository.findAllByStatus.and.resolveTo([])
sftpRepository.findLatest.and.resolveTo(null)

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

Expand All @@ -186,7 +198,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand Down Expand Up @@ -233,6 +247,12 @@ describe('automated processing', () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

archiveFactory.createArchiver.and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = new SftpController(
Expand All @@ -242,7 +262,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand Down Expand Up @@ -290,6 +312,12 @@ describe('automated processing', () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

archiveFactory.createArchiver.and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = new SftpController(
Expand All @@ -299,7 +327,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand Down Expand Up @@ -347,6 +377,12 @@ describe('automated processing', () => {
})
sftpRepository.findLatest.and.resolveTo(null)

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

archiveFactory.createArchiver.and.returnValue(newArchiver(ArchiveStatus.Incomplete))

const controller = new SftpController(
Expand All @@ -356,7 +392,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand Down Expand Up @@ -399,6 +437,12 @@ describe('automated processing', () => {
sftpRepository.findLatest.and.resolveTo(null)
sftpRepository.isProcessed.and.resolveTo(false)

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

const archiverSpy = jasmine.createSpyObj<ObservationArchiver>('archiver', ['createArchive'])
archiverSpy.createArchive.and.resolveTo(ArchiveResult.complete(archiver('zip')))
archiveFactory.createArchiver.and.returnValue(archiverSpy)
Expand All @@ -410,7 +454,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand All @@ -424,7 +470,7 @@ describe('automated processing', () => {
expect(sftpRepository.postStatus).toHaveBeenCalledWith(event1.id, observation.id, SftpStatus.SUCCESS)
expect(archiverSpy.createArchive).toHaveBeenCalled()
})

it('processes updated observations w/ create/update trigger', async () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1
Expand Down Expand Up @@ -454,6 +500,12 @@ describe('automated processing', () => {
sftpRepository.findLatest.and.resolveTo(null)
sftpRepository.isProcessed.and.resolveTo(false)

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

const archiverSpy = jasmine.createSpyObj<ObservationArchiver>('archiver', ['createArchive'])
archiverSpy.createArchive.and.resolveTo(ArchiveResult.complete(archiver('zip')))
archiveFactory.createArchiver.and.returnValue(archiverSpy)
Expand All @@ -465,7 +517,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand All @@ -481,7 +535,7 @@ describe('automated processing', () => {
})

it('skips processing of updated observations w/ create trigger', async () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true, initiation: {rule: TriggerRule.Create,timeout: 60 } }
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true, initiation: { rule: TriggerRule.Create, timeout: 60 } }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
Expand Down Expand Up @@ -509,6 +563,12 @@ describe('automated processing', () => {
sftpRepository.findLatest.and.resolveTo(null)
sftpRepository.isProcessed.and.resolveTo(true)

const teamRepo = jasmine.createSpyObj<MongooseTeamsRepository>('teamRepo', ['findTeamsByUserId'])
teamRepo.findTeamsByUserId.and.resolveTo([])

const userRepo = jasmine.createSpyObj<MongooseUsersRepository>('userRepo', ['findUserById'])
userRepo.findUserById.and.resolveTo(null)

const archiverSpy = jasmine.createSpyObj<ObservationArchiver>('archiver', ['createArchive'])
archiverSpy.createArchive.and.resolveTo(ArchiveResult.complete(archiver('zip')))
archiveFactory.createArchiver.and.returnValue(archiverSpy)
Expand All @@ -520,7 +580,9 @@ describe('automated processing', () => {
sftpRepository,
sftpClient,
archiveFactory,
console
console,
teamRepo,
userRepo
)

await controller.start()
Expand Down
24 changes: 19 additions & 5 deletions plugins/sftp/service/src/controller/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { PassThrough } from 'stream';
import { SFTPPluginConfig, defaultSFTPPluginConfig, encryptDecrypt } from '../configuration/SFTPPluginConfig';
import { ArchiveFormat, ArchiveStatus, ArchiverFactory, ArchiveResult, TriggerRule } from '../format/entities.format';
import { SftpAttrs, SftpObservationRepository, SftpStatus } from '../adapters/adapters.sftp.mongoose';
import { MongooseTeamsRepository } from '../adapters/adapters.sftp.teams';
import { MongooseUsersRepository } from 'src/adapters/adapters.sftp.users';

/**
* Class used to process observations for SFTP
Expand Down Expand Up @@ -63,6 +65,10 @@ export class SftpController {
*/
private console: Console;

private mongooseTeamsRepository: MongooseTeamsRepository;

private userRepository: MongooseUsersRepository

/**
* Constructor.
* @param stateRepository The plugins configuration.
Expand All @@ -78,7 +84,9 @@ export class SftpController {
sftpObservationRepository: SftpObservationRepository,
sftpClient: SFTPClient,
archiverFactory: ArchiverFactory,
console: Console
console: Console,
mongooseTeamsRepository: MongooseTeamsRepository,
userRepository: MongooseUsersRepository
) {
this.stateRepository = stateRepository;
this.eventRepository = eventRepository;
Expand All @@ -88,6 +96,8 @@ export class SftpController {
this.archiveFactory = archiverFactory
this.configuration = null
this.console = console;
this.mongooseTeamsRepository = mongooseTeamsRepository;
this.userRepository = userRepository;
}

/**
Expand Down Expand Up @@ -244,15 +254,19 @@ export class SftpController {

if (result instanceof ArchiveResult) {
if (result.status === ArchiveStatus.Complete || (result.status === ArchiveStatus.Incomplete && (observation.lastModified.getTime() + timeout) > Date.now())) {
this.console.log(`posting status of success`)
const stream = new PassThrough()
result.archive.pipe(stream)
await result.archive.finalize()
await this.sftpClient.put(stream, `${sftpPath}/${observation.id}.zip`)
const teams = await this.mongooseTeamsRepository.findTeamsByUserId(observation.userId);
// Filter out events from the teams response (bug) and teams that are not in the event
const newTeams = teams.filter((team) => team.teamEventId == null && event.teamIds?.map((teamId) => teamId.toString()).includes(team._id.toString()))
const teamNames = newTeams.length > 0 ? `${newTeams.map(team => team.name).join('_')}_` : '';
const user = await this.userRepository.findUserById(observation.userId);
const filename = (`${event.name}_${teamNames}${user?.username || observation.userId}_${observation.id}`)
this.console.info(`Adding sftp observation ${observation.id} to ${sftpPath}/${filename}.zip`)
await this.sftpClient.put(stream, `${sftpPath}/${filename}.zip`)
await this.sftpObservationRepository.postStatus(event.id, observation.id, SftpStatus.SUCCESS)
} else {
this.console.log(`posting status of pending`)

this.console.info(`pending observation ${observation.id}`)
await this.sftpObservationRepository.postStatus(event.id, observation.id, SftpStatus.PENDING)
}
Expand Down
Loading