Skip to content

Commit bd81c53

Browse files
committed
dev(puterfs): move write_new to extension
1 parent 689fb91 commit bd81c53

File tree

3 files changed

+232
-113
lines changed

3 files changed

+232
-113
lines changed

extensions/puterfs/main.js

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@
1717
* along with this program. If not, see <https://www.gnu.org/licenses/>.
1818
*/
1919

20+
const STUCK_STATUS_TIMEOUT = 10 * 1000;
21+
const STUCK_ALARM_TIMEOUT = 20 * 1000;
22+
2023
const uuidv4 = require('uuid').v4;
2124
const path_ = require('node:path');
2225

26+
const { db } = extension.import('data');
27+
2328
const svc_metering = extension.import('service:meteringService');
2429
const svc_trace = extension.import('service:traceService');
2530
const svc_fs = extension.import('service:filesystem');
31+
const { stuck_detector_stream, hashing_stream } = extension.import('core').util.streamutil;
2632

2733
// TODO: filesystem providers should not need to call EventService
2834
const svc_event = extension.import('service:event');
@@ -37,6 +43,9 @@ const svc_fsEntryFetcher = extension.import('service:fsEntryFetcher');
3743
const svc_resource = extension.import('service:resourceService');
3844
const svc_fsLock = extension.import('service:fslock');
3945

46+
// Not sure where these really belong yet
47+
const svc_fileCache = extension.import('service:file-cache');
48+
4049
// TODO: depending on mountpoint service will not be necessary
4150
// once the storage provider is moved to this extension
4251
const svc_mountpoint = extension.import('service:mountpoint');
@@ -496,6 +505,225 @@ class PuterFSProvider {
496505
return child_uuids;
497506
}
498507

508+
/**
509+
* Write a new file to the filesystem. Throws an error if the destination
510+
* already exists.
511+
*
512+
* @param {Object} param
513+
* @param {Context} param.context
514+
* @param {FSNode} param.parent: The parent directory of the file.
515+
* @param {string} param.name: The name of the file.
516+
* @param {File} param.file: The file to write.
517+
* @returns {Promise<FSNode>}
518+
*/
519+
async write_new ({ context, parent, name, file }) {
520+
console.log('calling write new');
521+
const {
522+
tmp, fsentry_tmp, message, actor: inputActor, app_id,
523+
} = context.values;
524+
const actor = inputActor ?? Context.get('actor');
525+
526+
const uid = uuidv4();
527+
528+
// determine bucket region
529+
let bucket_region = config.s3_region ?? config.region;
530+
let bucket = config.s3_bucket;
531+
532+
if ( ! await svc_acl.check(actor, parent, 'write') ) {
533+
throw await svc_acl.get_safe_acl_error(actor, parent, 'write');
534+
}
535+
536+
const storage_resp = await this.#storage_upload({
537+
uuid: uid,
538+
bucket,
539+
bucket_region,
540+
file,
541+
tmp: {
542+
...tmp,
543+
path: path_.join(await parent.get('path'), name),
544+
},
545+
});
546+
547+
fsentry_tmp.thumbnail = await fsentry_tmp.thumbnail_promise;
548+
delete fsentry_tmp.thumbnail_promise;
549+
550+
const timestamp = Math.round(Date.now() / 1000);
551+
const raw_fsentry = {
552+
uuid: uid,
553+
is_dir: 0,
554+
user_id: actor.type.user.id,
555+
created: timestamp,
556+
accessed: timestamp,
557+
modified: timestamp,
558+
parent_uid: await parent.get('uid'),
559+
name,
560+
size: file.size,
561+
path: path_.join(await parent.get('path'), name),
562+
...fsentry_tmp,
563+
bucket_region,
564+
bucket,
565+
associated_app_id: app_id ?? null,
566+
};
567+
568+
svc_event.emit('fs.pending.file', {
569+
fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
570+
context,
571+
});
572+
573+
svc_resource.register({
574+
uid,
575+
status: RESOURCE_STATUS_PENDING_CREATE,
576+
});
577+
578+
const filesize = file.size;
579+
svc_size.change_usage(actor.type.user.id, filesize);
580+
581+
// Meter ingress
582+
const ownerId = await parent.get('user_id');
583+
const ownerActor = new Actor({
584+
type: new UserActorType({
585+
user: await get_user({ id: ownerId }),
586+
}),
587+
});
588+
589+
svc_metering.incrementUsage(ownerActor, 'filesystem:ingress:bytes', filesize);
590+
591+
const entryOp = await svc_fsEntry.insert(raw_fsentry);
592+
593+
(async () => {
594+
await entryOp.awaitDone();
595+
svc_resource.free(uid);
596+
597+
const new_item_node = await svc_fs.node(new NodeUIDSelector(uid));
598+
const new_item = await new_item_node.get('entry');
599+
const store_version_id = storage_resp.VersionId;
600+
if ( store_version_id ) {
601+
// insert version into db
602+
db.write('INSERT INTO `fsentry_versions` (`user_id`, `fsentry_id`, `fsentry_uuid`, `version_id`, `message`, `ts_epoch`) VALUES (?, ?, ?, ?, ?, ?)',
603+
[
604+
actor.type.user.id,
605+
new_item.id,
606+
new_item.uuid,
607+
store_version_id,
608+
message ?? null,
609+
timestamp,
610+
]);
611+
}
612+
})();
613+
614+
const node = await svc_fs.node(new NodeUIDSelector(uid));
615+
616+
svc_event.emit('fs.create.file', {
617+
node,
618+
context,
619+
});
620+
621+
return node;
622+
}
623+
624+
/**
625+
* @param {Object} param
626+
* @param {File} param.file: The file to write.
627+
* @returns
628+
*/
629+
async #storage_upload ({
630+
uuid,
631+
bucket,
632+
bucket_region,
633+
file,
634+
tmp,
635+
}) {
636+
const storage = svc_mountpoint.get_storage(this.constructor.name);
637+
638+
bucket ??= config.s3_bucket;
639+
bucket_region ??= config.s3_region ?? config.region;
640+
641+
let upload_tracker = new UploadProgressTracker();
642+
643+
svc_event.emit('fs.storage.upload-progress', {
644+
upload_tracker,
645+
context: Context.get(),
646+
meta: {
647+
item_uid: uuid,
648+
item_path: tmp.path,
649+
},
650+
});
651+
652+
if ( ! file.buffer ) {
653+
let stream = file.stream;
654+
let alarm_timeout = null;
655+
stream = stuck_detector_stream(stream, {
656+
timeout: STUCK_STATUS_TIMEOUT,
657+
on_stuck: () => {
658+
this.frame.status = OperationFrame.FRAME_STATUS_STUCK;
659+
console.warn('Upload stream stuck might be stuck', {
660+
bucket_region,
661+
bucket,
662+
uuid,
663+
});
664+
alarm_timeout = setTimeout(() => {
665+
extension.errors.report('fs.write.s3-upload', {
666+
message: 'Upload stream stuck for too long',
667+
alarm: true,
668+
extra: {
669+
bucket_region,
670+
bucket,
671+
uuid,
672+
},
673+
});
674+
}, STUCK_ALARM_TIMEOUT);
675+
},
676+
on_unstuck: () => {
677+
clearTimeout(alarm_timeout);
678+
this.frame.status = OperationFrame.FRAME_STATUS_WORKING;
679+
},
680+
});
681+
file = { ...file, stream };
682+
}
683+
684+
let hashPromise;
685+
if ( file.buffer ) {
686+
const hash = crypto.createHash('sha256');
687+
hash.update(file.buffer);
688+
hashPromise = Promise.resolve(hash.digest('hex'));
689+
} else {
690+
const hs = hashing_stream(file.stream);
691+
file.stream = hs.stream;
692+
hashPromise = hs.hashPromise;
693+
}
694+
695+
hashPromise.then(hash => {
696+
svc_event.emit('outer.fs.write-hash', {
697+
hash, uuid,
698+
});
699+
});
700+
701+
const state_upload = storage.create_upload();
702+
703+
try {
704+
await state_upload.run({
705+
uid: uuid,
706+
file,
707+
storage_meta: { bucket, bucket_region },
708+
storage_api: { progress_tracker: upload_tracker },
709+
});
710+
} catch (e) {
711+
extension.errors.report('fs.write.storage-upload', {
712+
source: e || new Error('unknown'),
713+
trace: true,
714+
alarm: true,
715+
extra: {
716+
bucket_region,
717+
bucket,
718+
uuid,
719+
},
720+
});
721+
throw APIError.create('upload_failed');
722+
}
723+
724+
return state_upload;
725+
}
726+
499727
async #rmnode ({ node, options }) {
500728
// Services
501729
if ( !options.override_immutable && await node.get('immutable') ) {

src/backend/src/CoreModule.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const { RuntimeModule } = require('./extension/RuntimeModule.js');
2828
const { TYPE_DIRECTORY, TYPE_FILE } = require('./filesystem/FSNodeContext.js');
2929
const { TDetachable } = require('@heyputer/putility/src/traits/traits.js');
3030
const { MultiDetachable } = require('@heyputer/putility/src/libs/listener.js');
31+
const { OperationFrame } = require('./services/OperationTraceService');
3132

3233
/**
3334
* Core module for the Puter platform that includes essential services including
@@ -107,6 +108,7 @@ const install = async ({ context, services, app, useapi, modapi }) => {
107108
LLRead,
108109
TYPE_DIRECTORY,
109110
TYPE_FILE,
111+
OperationFrame,
110112
});
111113
def('core.fs.selectors', require('./filesystem/node/selectors'));
112114
def('core.util.stream', require('./util/streamutil'));

src/backend/src/modules/puterfs/lib/PuterFSProvider.js

Lines changed: 2 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -189,119 +189,8 @@ class PuterFSProvider extends putility.AdvancedBase {
189189
* @returns {Promise<FSNode>}
190190
*/
191191
async write_new ({ context, parent, name, file }) {
192-
const {
193-
tmp, fsentry_tmp, message, actor: inputActor, app_id,
194-
} = context.values;
195-
const actor = inputActor ?? Context.get('actor');
196-
197-
const sizeService = this.#services.get('sizeService');
198-
const resourceService = this.#services.get('resourceService');
199-
const svc_fsEntry = this.#services.get('fsEntryService');
200-
const svc_event = this.#services.get('event');
201-
const fs = this.#services.get('filesystem');
202-
203-
// TODO: fs:decouple-versions
204-
// add version hook externally so LLCWrite doesn't
205-
// need direct database access
206-
const db = this.#services.get('database').get(DB_WRITE, 'filesystem');
207-
208-
const uid = uuidv4();
209-
210-
// determine bucket region
211-
let bucket_region = config.s3_region ?? config.region;
212-
let bucket = config.s3_bucket;
213-
214-
const svc_acl = this.#services.get('acl');
215-
if ( ! await svc_acl.check(actor, parent, 'write') ) {
216-
throw await svc_acl.get_safe_acl_error(actor, parent, 'write');
217-
}
218-
219-
const storage_resp = await this.#storage_upload({
220-
uuid: uid,
221-
bucket,
222-
bucket_region,
223-
file,
224-
tmp: {
225-
...tmp,
226-
path: path.join(await parent.get('path'), name),
227-
},
228-
});
229-
230-
fsentry_tmp.thumbnail = await fsentry_tmp.thumbnail_promise;
231-
delete fsentry_tmp.thumbnail_promise;
232-
233-
const timestamp = Math.round(Date.now() / 1000);
234-
const raw_fsentry = {
235-
uuid: uid,
236-
is_dir: 0,
237-
user_id: actor.type.user.id,
238-
created: timestamp,
239-
accessed: timestamp,
240-
modified: timestamp,
241-
parent_uid: await parent.get('uid'),
242-
name,
243-
size: file.size,
244-
path: path.join(await parent.get('path'), name),
245-
...fsentry_tmp,
246-
bucket_region,
247-
bucket,
248-
associated_app_id: app_id ?? null,
249-
};
250-
251-
svc_event.emit('fs.pending.file', {
252-
fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
253-
context,
254-
});
255-
256-
resourceService.register({
257-
uid,
258-
status: RESOURCE_STATUS_PENDING_CREATE,
259-
});
260-
261-
const filesize = file.size;
262-
sizeService.change_usage(actor.type.user.id, filesize);
263-
264-
// Meter ingress
265-
const ownerId = await parent.get('user_id');
266-
const ownerActor = new Actor({
267-
type: new UserActorType({
268-
user: await get_user({ id: ownerId }),
269-
}),
270-
});
271-
272-
this.#meteringService.incrementUsage(ownerActor, 'filesystem:ingress:bytes', filesize);
273-
274-
const entryOp = await svc_fsEntry.insert(raw_fsentry);
275-
276-
(async () => {
277-
await entryOp.awaitDone();
278-
resourceService.free(uid);
279-
280-
const new_item_node = await fs.node(new NodeUIDSelector(uid));
281-
const new_item = await new_item_node.get('entry');
282-
const store_version_id = storage_resp.VersionId;
283-
if ( store_version_id ) {
284-
// insert version into db
285-
db.write('INSERT INTO `fsentry_versions` (`user_id`, `fsentry_id`, `fsentry_uuid`, `version_id`, `message`, `ts_epoch`) VALUES (?, ?, ?, ?, ?, ?)',
286-
[
287-
actor.type.user.id,
288-
new_item.id,
289-
new_item.uuid,
290-
store_version_id,
291-
message ?? null,
292-
timestamp,
293-
]);
294-
}
295-
})();
296-
297-
const node = await fs.node(new NodeUIDSelector(uid));
298-
299-
svc_event.emit('fs.create.file', {
300-
node,
301-
context,
302-
});
303-
304-
return node;
192+
console.error('This .write_new should not be called!');
193+
throw new Error('This .write_new should not be called!');
305194
}
306195

307196
/**

0 commit comments

Comments
 (0)