1717 * along with this program. If not, see <https://www.gnu.org/licenses/>.
1818 */
1919
20+ const STUCK_STATUS_TIMEOUT = 10 * 1000 ;
21+ const STUCK_ALARM_TIMEOUT = 20 * 1000 ;
22+
2023const uuidv4 = require ( 'uuid' ) . v4 ;
2124const path_ = require ( 'node:path' ) ;
2225
26+ const { db } = extension . import ( 'data' ) ;
27+
2328const svc_metering = extension . import ( 'service:meteringService' ) ;
2429const svc_trace = extension . import ( 'service:traceService' ) ;
2530const svc_fs = extension . import ( 'service:filesystem' ) ;
31+ const { stuck_detector_stream, hashing_stream } = extension . import ( 'core' ) . util . streamutil ;
2632
2733// TODO: filesystem providers should not need to call EventService
2834const svc_event = extension . import ( 'service:event' ) ;
@@ -37,6 +43,9 @@ const svc_fsEntryFetcher = extension.import('service:fsEntryFetcher');
3743const svc_resource = extension . import ( 'service:resourceService' ) ;
3844const svc_fsLock = extension . import ( 'service:fslock' ) ;
3945
46+ // Not sure where these really belong yet
47+ const svc_fileCache = extension . import ( 'service:file-cache' ) ;
48+
4049// TODO: depending on mountpoint service will not be necessary
4150// once the storage provider is moved to this extension
4251const svc_mountpoint = extension . import ( 'service:mountpoint' ) ;
@@ -496,6 +505,225 @@ class PuterFSProvider {
496505 return child_uuids ;
497506 }
498507
508+ /**
509+ * Write a new file to the filesystem. Throws an error if the destination
510+ * already exists.
511+ *
512+ * @param {Object } param
513+ * @param {Context } param.context
514+ * @param {FSNode } param.parent: The parent directory of the file.
515+ * @param {string } param.name: The name of the file.
516+ * @param {File } param.file: The file to write.
517+ * @returns {Promise<FSNode> }
518+ */
519+ async write_new ( { context, parent, name, file } ) {
520+ console . log ( 'calling write new' ) ;
521+ const {
522+ tmp, fsentry_tmp, message, actor : inputActor , app_id,
523+ } = context . values ;
524+ const actor = inputActor ?? Context . get ( 'actor' ) ;
525+
526+ const uid = uuidv4 ( ) ;
527+
528+ // determine bucket region
529+ let bucket_region = config . s3_region ?? config . region ;
530+ let bucket = config . s3_bucket ;
531+
532+ if ( ! await svc_acl . check ( actor , parent , 'write' ) ) {
533+ throw await svc_acl . get_safe_acl_error ( actor , parent , 'write' ) ;
534+ }
535+
536+ const storage_resp = await this . #storage_upload( {
537+ uuid : uid ,
538+ bucket,
539+ bucket_region,
540+ file,
541+ tmp : {
542+ ...tmp ,
543+ path : path_ . join ( await parent . get ( 'path' ) , name ) ,
544+ } ,
545+ } ) ;
546+
547+ fsentry_tmp . thumbnail = await fsentry_tmp . thumbnail_promise ;
548+ delete fsentry_tmp . thumbnail_promise ;
549+
550+ const timestamp = Math . round ( Date . now ( ) / 1000 ) ;
551+ const raw_fsentry = {
552+ uuid : uid ,
553+ is_dir : 0 ,
554+ user_id : actor . type . user . id ,
555+ created : timestamp ,
556+ accessed : timestamp ,
557+ modified : timestamp ,
558+ parent_uid : await parent . get ( 'uid' ) ,
559+ name,
560+ size : file . size ,
561+ path : path_ . join ( await parent . get ( 'path' ) , name ) ,
562+ ...fsentry_tmp ,
563+ bucket_region,
564+ bucket,
565+ associated_app_id : app_id ?? null ,
566+ } ;
567+
568+ svc_event . emit ( 'fs.pending.file' , {
569+ fsentry : FSNodeContext . sanitize_pending_entry_info ( raw_fsentry ) ,
570+ context,
571+ } ) ;
572+
573+ svc_resource . register ( {
574+ uid,
575+ status : RESOURCE_STATUS_PENDING_CREATE ,
576+ } ) ;
577+
578+ const filesize = file . size ;
579+ svc_size . change_usage ( actor . type . user . id , filesize ) ;
580+
581+ // Meter ingress
582+ const ownerId = await parent . get ( 'user_id' ) ;
583+ const ownerActor = new Actor ( {
584+ type : new UserActorType ( {
585+ user : await get_user ( { id : ownerId } ) ,
586+ } ) ,
587+ } ) ;
588+
589+ svc_metering . incrementUsage ( ownerActor , 'filesystem:ingress:bytes' , filesize ) ;
590+
591+ const entryOp = await svc_fsEntry . insert ( raw_fsentry ) ;
592+
593+ ( async ( ) => {
594+ await entryOp . awaitDone ( ) ;
595+ svc_resource . free ( uid ) ;
596+
597+ const new_item_node = await svc_fs . node ( new NodeUIDSelector ( uid ) ) ;
598+ const new_item = await new_item_node . get ( 'entry' ) ;
599+ const store_version_id = storage_resp . VersionId ;
600+ if ( store_version_id ) {
601+ // insert version into db
602+ db . write ( 'INSERT INTO `fsentry_versions` (`user_id`, `fsentry_id`, `fsentry_uuid`, `version_id`, `message`, `ts_epoch`) VALUES (?, ?, ?, ?, ?, ?)' ,
603+ [
604+ actor . type . user . id ,
605+ new_item . id ,
606+ new_item . uuid ,
607+ store_version_id ,
608+ message ?? null ,
609+ timestamp ,
610+ ] ) ;
611+ }
612+ } ) ( ) ;
613+
614+ const node = await svc_fs . node ( new NodeUIDSelector ( uid ) ) ;
615+
616+ svc_event . emit ( 'fs.create.file' , {
617+ node,
618+ context,
619+ } ) ;
620+
621+ return node ;
622+ }
623+
624+ /**
625+ * @param {Object } param
626+ * @param {File } param.file: The file to write.
627+ * @returns
628+ */
629+ async #storage_upload ( {
630+ uuid,
631+ bucket,
632+ bucket_region,
633+ file,
634+ tmp,
635+ } ) {
636+ const storage = svc_mountpoint . get_storage ( this . constructor . name ) ;
637+
638+ bucket ??= config . s3_bucket ;
639+ bucket_region ??= config . s3_region ?? config . region ;
640+
641+ let upload_tracker = new UploadProgressTracker ( ) ;
642+
643+ svc_event . emit ( 'fs.storage.upload-progress' , {
644+ upload_tracker,
645+ context : Context . get ( ) ,
646+ meta : {
647+ item_uid : uuid ,
648+ item_path : tmp . path ,
649+ } ,
650+ } ) ;
651+
652+ if ( ! file . buffer ) {
653+ let stream = file . stream ;
654+ let alarm_timeout = null ;
655+ stream = stuck_detector_stream ( stream , {
656+ timeout : STUCK_STATUS_TIMEOUT ,
657+ on_stuck : ( ) => {
658+ this . frame . status = OperationFrame . FRAME_STATUS_STUCK ;
659+ console . warn ( 'Upload stream stuck might be stuck' , {
660+ bucket_region,
661+ bucket,
662+ uuid,
663+ } ) ;
664+ alarm_timeout = setTimeout ( ( ) => {
665+ extension . errors . report ( 'fs.write.s3-upload' , {
666+ message : 'Upload stream stuck for too long' ,
667+ alarm : true ,
668+ extra : {
669+ bucket_region,
670+ bucket,
671+ uuid,
672+ } ,
673+ } ) ;
674+ } , STUCK_ALARM_TIMEOUT ) ;
675+ } ,
676+ on_unstuck : ( ) => {
677+ clearTimeout ( alarm_timeout ) ;
678+ this . frame . status = OperationFrame . FRAME_STATUS_WORKING ;
679+ } ,
680+ } ) ;
681+ file = { ...file , stream } ;
682+ }
683+
684+ let hashPromise ;
685+ if ( file . buffer ) {
686+ const hash = crypto . createHash ( 'sha256' ) ;
687+ hash . update ( file . buffer ) ;
688+ hashPromise = Promise . resolve ( hash . digest ( 'hex' ) ) ;
689+ } else {
690+ const hs = hashing_stream ( file . stream ) ;
691+ file . stream = hs . stream ;
692+ hashPromise = hs . hashPromise ;
693+ }
694+
695+ hashPromise . then ( hash => {
696+ svc_event . emit ( 'outer.fs.write-hash' , {
697+ hash, uuid,
698+ } ) ;
699+ } ) ;
700+
701+ const state_upload = storage . create_upload ( ) ;
702+
703+ try {
704+ await state_upload . run ( {
705+ uid : uuid ,
706+ file,
707+ storage_meta : { bucket, bucket_region } ,
708+ storage_api : { progress_tracker : upload_tracker } ,
709+ } ) ;
710+ } catch ( e ) {
711+ extension . errors . report ( 'fs.write.storage-upload' , {
712+ source : e || new Error ( 'unknown' ) ,
713+ trace : true ,
714+ alarm : true ,
715+ extra : {
716+ bucket_region,
717+ bucket,
718+ uuid,
719+ } ,
720+ } ) ;
721+ throw APIError . create ( 'upload_failed' ) ;
722+ }
723+
724+ return state_upload ;
725+ }
726+
499727 async #rmnode ( { node, options } ) {
500728 // Services
501729 if ( ! options . override_immutable && await node . get ( 'immutable' ) ) {
0 commit comments