diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..e8b23f8 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,9 @@ +# This is the top-most editorconfig +root = true + +[*.js] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +indent_style = space +indent_size = 2 diff --git a/.eslintrc.json b/.eslintrc.json new file mode 100644 index 0000000..c242d84 --- /dev/null +++ b/.eslintrc.json @@ -0,0 +1,7 @@ +{ + "env": { + "es6": true, + "node": true + }, + "extends": ["airbnb-base", "prettier"] +} diff --git a/.prettierrc.json b/.prettierrc.json new file mode 100644 index 0000000..798dc79 --- /dev/null +++ b/.prettierrc.json @@ -0,0 +1,12 @@ +{ + "parser": "babel", + "printWidth": 100, + "tabWidth": 2, + "useTabs": false, + "semi": true, + "singleQuote": true, + "trailingComma": "all", + "bracketSpacing": true, + "arrowParens": "always", + "endOfLine": "lf" +} diff --git a/index.js b/index.js index a2d5592..cc25f64 100644 --- a/index.js +++ b/index.js @@ -1,3 +1,3 @@ // This file is the ES6 module loader, don't edit it, go to src/app.js -require = require("esm")(module/*, options*/) -module.exports = require("./src/app.js") +require = require('esm')(module /*, options*/); +module.exports = require('./src/app.js'); diff --git a/package.json b/package.json index 08969cc..f5d7c49 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ "author": "Maxime Baconnais", "license": "MIT", "dependencies": { + "aws-sdk": "^2.418.0", "color": "^3.1.0", "cors": "^2.8.4", "debug": "^4.0.1", @@ -26,5 +27,11 @@ "sharp": "^0.21.2", "sqlite3": "^4.0.2", "uniqid": "^5.0.3" + }, + "devDependencies": { + "eslint": "^5.15.1", + "eslint-config-airbnb-base": "^13.1.0", + "eslint-config-prettier": "^4.1.0", + "eslint-plugin-import": "^2.16.0" } } diff --git a/src/app.js b/src/app.js index 2757928..e8a5a04 100644 --- a/src/app.js +++ b/src/app.js @@ -14,7 +14,7 @@ import debug from 'debug'; const D = debug('UnicornLoadBalancer'); // Welcome -D('Version: ' + config.version) +D('Version: ' + config.version); // Init Express const app = express(); @@ -24,13 +24,22 @@ app.use(cors()); // Body parsing app.use(bodyParser.json()); -app.use(bodyParser.urlencoded({ - extended: true -})); +app.use( + bodyParser.urlencoded({ + extended: true, + }), +); app.use((err, _, res, next) => { - if (err instanceof SyntaxError && err.status >= 400 && err.status < 500 && err.message.indexOf('JSON')) - return (res.status(400).send({ error: { code: 'INVALID_BODY', message: 'Syntax error in the JSON body' } })); - next(); + if ( + err instanceof SyntaxError && + err.status >= 400 && + err.status < 500 && + err.message.indexOf('JSON') + ) + return res.status(400).send({ + error: { code: 'INVALID_BODY', message: 'Syntax error in the JSON body' }, + }); + next(); }); // Init routes @@ -40,25 +49,27 @@ D('Initializing API routes...'); Router(app); // Load servers available in configuration -((Array.isArray(config.custom.servers.list)) ? config.custom.servers.list : []).map(e => ({ +(Array.isArray(config.custom.servers.list) ? config.custom.servers.list : []) + .map((e) => ({ name: e, - url: ((e.substr(-1) === '/') ? e.substr(0, e.length - 1) : e), + url: e.substr(-1) === '/' ? e.substr(0, e.length - 1) : e, sessions: [], settings: { - maxSessions: 0, - maxDownloads: 0, - maxTranscodes: 0 - } -})).forEach(e => { + maxSessions: 0, + maxDownloads: 0, + maxTranscodes: 0, + }, + })) + .forEach((e) => { ServersManager.update(e); -}); + }); // Create HTTP server const httpServer = app.listen(config.server.port); // Forward websockets httpServer.on('upgrade', (req, res) => { - Proxy.ws(req, res); + Proxy.ws(req, res); }); // Debug diff --git a/src/config.js b/src/config.js index cf24cf0..2638ae3 100644 --- a/src/config.js +++ b/src/config.js @@ -3,52 +3,68 @@ import env from 'getenv'; env.disableErrors(); export default { - version: '2.0.0', - server: { - port: env.int('SERVER_PORT', 3001), - public: env.string('SERVER_PUBLIC', 'http://127.0.0.1:3001/'), - host: env.string('SERVER_HOST', '127.0.0.1') - }, - plex: { - host: env.string('PLEX_HOST', '127.0.0.1'), - port: env.int('PLEX_PORT', 32400), - path: { - usr: env.string('PLEX_PATH_USR', '/usr/lib/plexmediaserver/'), - sessions: env.string('PLEX_PATH_SESSIONS', '/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Cache/Transcode/Sessions/') - } - }, - database: { - mode: env.string('DATABASE_MODE', 'sqlite'), - sqlite: { - path: env.string('DATABASE_SQLITE_PATH', '/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Plug-in Support/Databases/com.plexapp.plugins.library.db') - }, - postgresql: { - host: env.string('DATABASE_POSTGRESQL_HOST', ''), - database: env.string('DATABASE_POSTGRESQL_DATABASE', ''), - user: env.string('DATABASE_POSTGRESQL_USER', ''), - password: env.string('DATABASE_POSTGRESQL_PASSWORD', ''), - port: env.int('DATABASE_POSTGRESQL_PORT', 5432) - } - }, - redis: { - host: env('REDIS_HOST', undefined), - port: env.int('REDIS_PORT', 6379), - password: env.string('REDIS_PASSWORD', ''), - db: env.int('REDIS_DB', 0) - }, - custom: { - scores: { - timeout: env.int('CUSTOM_SCORES_TIMEOUT', 10) - }, - image: { - resizer: env.boolish('CUSTOM_IMAGE_RESIZER', false), - proxy: env.string('CUSTOM_IMAGE_PROXY', '') - }, - download: { - forward: env.boolish('CUSTOM_DOWNLOAD_FORWARD', false) - }, - servers: { - list: env.array('CUSTOM_SERVERS_LIST', 'string', []) - } - } + version: '2.0.0', + server: { + port: env.int('SERVER_PORT', 3001), + public: env.string('SERVER_PUBLIC', 'http://127.0.0.1:3001/'), + host: env.string('SERVER_HOST', '127.0.0.1'), + }, + aws: { + s3: { + bucket: env.string('AWS_S3_BUCKET', ''), + mountPath: env.string('AWS_S3_MOUNTPOINT', ''), + }, + cloudFront: { + distributionUrl: env.string('AWS_CF_URL', ''), + keypairParameterPath: env.string('AWS_CF_KEYPAIR_SSM_PATH', ''), + }, + }, + plex: { + host: env.string('PLEX_HOST', '127.0.0.1'), + port: env.int('PLEX_PORT', 32400), + path: { + usr: env.string('PLEX_PATH_USR', '/usr/lib/plexmediaserver/'), + sessions: env.string( + 'PLEX_PATH_SESSIONS', + '/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Cache/Transcode/Sessions/', + ), + }, + }, + database: { + mode: env.string('DATABASE_MODE', 'sqlite'), + sqlite: { + path: env.string( + 'DATABASE_SQLITE_PATH', + '/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Plug-in Support/Databases/com.plexapp.plugins.library.db', + ), + }, + postgresql: { + host: env.string('DATABASE_POSTGRESQL_HOST', ''), + database: env.string('DATABASE_POSTGRESQL_DATABASE', ''), + user: env.string('DATABASE_POSTGRESQL_USER', ''), + password: env.string('DATABASE_POSTGRESQL_PASSWORD', ''), + port: env.int('DATABASE_POSTGRESQL_PORT', 5432), + }, + }, + redis: { + host: env('REDIS_HOST', undefined), + port: env.int('REDIS_PORT', 6379), + password: env.string('REDIS_PASSWORD', ''), + db: env.int('REDIS_DB', 0), + }, + custom: { + scores: { + timeout: env.int('CUSTOM_SCORES_TIMEOUT', 10), + }, + image: { + resizer: env.boolish('CUSTOM_IMAGE_RESIZER', false), + proxy: env.string('CUSTOM_IMAGE_PROXY', ''), + }, + download: { + forward: env.boolish('CUSTOM_DOWNLOAD_FORWARD', false), + }, + servers: { + list: env.array('CUSTOM_SERVERS_LIST', 'string', []), + }, + }, }; diff --git a/src/core/aws.js b/src/core/aws.js new file mode 100644 index 0000000..50bf922 --- /dev/null +++ b/src/core/aws.js @@ -0,0 +1,263 @@ +import util from 'util'; +import debug from 'debug'; + +import aws from 'aws-sdk'; + +import config from '../config'; + +// Debugger +const D = debug('UnicornLoadBalancer:AWS'); + +const SIGNED_URL_EXPIRE_TIME = 60; // How long a signed URL is valid for in seconds -- 1 minute + +/** + * AWSInterface provides an interface for interacting with AWS. + */ +class AWSInterface { + /** + * Constructs and initializes a new AWSInterface + */ + constructor() { + this.initialize(); + } + + /** + * Initializes the AWSInterface to prepare it for use. This is for internal + * use and is called by the constructor. + */ + async initialize() { + this.initialized = false; + + this.metadataService = new aws.MetadataService(); + this.region = await this.getRegion(); + this.ssm = new aws.SSM({ apiVersion: '2014-11-06', region: this.region }); + this.s3 = new aws.S3({ apiVersion: '2006-03-01', region: this.region }); + + const initializationPromises = []; + initializationPromises.push(this.initializeCloudFront()); + initializationPromises.push(this.initializeS3()); + + await Promise.all(initializationPromises); + + this.initialized = true; + } + + /** + * @return {Boolean} True if the AWSInterface is done initializing, false otherwise. + */ + isInitialized() { + return this.initialized; + } + + /** + * Initializes CloudFront signing. This is for internal use and is called by + * initialize. + */ + async initializeCloudFront() { + if ( + config.aws.cloudFront.distributionUrl === '' || + config.aws.cloudFront.keypairParameterPath === '' + ) { + return; + } + + this.cloudFront = {}; + + this.cloudFront.url = config.aws.cloudFront.distributionUrl; + if (this.cloudFront.url.endsWith('/')) { + // Make sure the distribution URL ends with a slash so that we can simply add a key to it later + this.cloudFront.url = this.cloudFront.url.slice(0, -1); + } + + const keypairPath = config.aws.cloudFront.keypairParameterPath; + const ssmRequest = this.ssm.getParameters({ + Names: [`${keypairPath}/keyId`, `${keypairPath}/privkey`], + WithDecryption: true, + }); + ssmRequest.send(); + const data = await ssmRequest.promise(); + + if (data.InvalidParameters && data.InvalidParameters.length > 0) { + const invalidParameters = data.InvalidParameters.join(', '); + D( + `During request for CloudFront signing key, received invalid parameters: ${invalidParameters}`, + ); + } + + const parameters = data.Parameters; + if (parameters.length !== 2) { + throw new Error( + `Requested 2 parameters from Parameter Store for CloudFront signing key, but received ${ + parameters.length + }`, + ); + } + + let keypairId = null; + let privateKey = null; + + parameters.forEach((parameter) => { + if (parameter.Name.endsWith('/privkey')) { + // This is the private key + privateKey = parameter.Value; + } else if (parameter.Name.endsWith('/keyId')) { + // This is the key ID + keypairId = parameter.Value; + } else { + D(`Received unexpected result from SSM Parameter Store: ${parameter.Name}`); + } + }); + + if (keypairId === null && privateKey === null) { + throw new Error( + 'Did not receive keypair ID or private key for CloudFront signing from Parameter Store', + ); + } else if (keypairId === null) { + throw new Error('Did not receive keypair ID for CloudFront signing from Parameter Store'); + } else if (privateKey === null) { + throw new Error('Did not receive private key for CloudFront signing from Parameter Store'); + } + + this.cloudFront.signer = new aws.CloudFront.Signer(keypairId, privateKey); + } + + /** + * Initiailizes parameters for S3 signing. This is for internal use and is + * called by initialize. + */ + async initializeS3() { + this.s3Bucket = config.aws.s3.bucket !== '' ? config.aws.s3.bucket : null; + this.s3MountPoint = config.aws.s3.mountPath !== '' ? config.aws.s3.mountPath : null; + } + + /** + * @return {Promise} A promise that resolves to a string containing the region + * that this load balancer is in. + */ + async getRegion() { + const requestMetadata = util.promisify(this.metadataService.request.bind(this.metadataService)); + + const availabilityZone = await requestMetadata( + '/2018-09-24/meta-data/placement/availability-zone', + ); + let region; + if (/[a-zA-Z]/.test(availabilityZone)) { + // If it ends with a letter, remove the letter to get the region + region = availabilityZone.slice(0, -1); + } else { + // If it doesn't end with a letter, it's already a region name + region = availabilityZone; + } + + return region; + } + + /** + * getSignedUrlForFile returns a signed S3 URL for a given media file. The given file must be in the S3 mount point. + * @param {string} filePath The file path to get the URL for. + * @return {string} A promise that resolves to the signed URL corresponding to the given path. + */ + async getSignedUrlForFile(filePath) { + if (!this.isInitialized()) { + throw new Error('AWS is still initializing'); + } + + if (this.s3MountPoint === null) { + // If there's no mount path, we can't determine the path to add to a URL + throw new Error('AWS S3 mount path is not configured'); + } + + if (!filePath.startsWith(config.aws.s3.mountPath)) { + throw new Error('Given file path is not in the S3 mount point'); + } + + const key = AWSInterface.getEncodedKey(filePath.slice(config.aws.s3.mountPath.length)); + + try { + return await this.getCloudFrontSignedUrl(key); + } catch (ex) { + if (!(ex instanceof Error && ex.message === 'Unavailable')) { + // This is not a simple Unavailable error from the signing function. + throw ex; + } + } + + try { + return await this.getS3SignedUrl(key); + } catch (ex) { + if (!(ex instanceof Error && ex.message === 'Unavailable')) { + // This is not a simple Unavailable error from the signing function. + throw ex; + } + } + + throw new Error('No AWS signing services are available'); + } + + /** + * Generates a signed URL for the CloudFront distribution fronting the key. + * @param {string} key The key for which a signed URL should be created. + * @return {Promise} A Promise that resolves to the signed URL. + */ + async getCloudFrontSignedUrl(key) { + if (!this.cloudFront || !this.cloudFront.signer) { + throw new Error('Unavailable'); + } + + const expiresTimestamp = Math.ceil((Date.now() + SIGNED_URL_EXPIRE_TIME * 1000) / 1000); + + return new Promise((resolve, reject) => { + this.cloudFront.signer.getSignedUrl( + { url: this.cloudFront.url + key, expires: expiresTimestamp }, + (err, signedUrl) => { + if (err) { + reject(err); + } + + resolve(signedUrl); + }, + ); + }); + } + + /** + * Generates a signed URL for the S3 bucket at the given key. + * @param {string} key The key for which a signed URL should be created. + * @return {Promise} A Promise that resolves to the signed URL. + */ + async getS3SignedUrl(key) { + if (this.s3Bucket === null) { + throw new Error('Unavailable'); + } + + return new Promise((resolve, reject) => { + this.s3.getSignedUrl( + 'getObject', + { Bucket: this.s3Bucket, Key: key, Expires: SIGNED_URL_EXPIRE_TIME }, + (err, signedUrl) => { + if (err) { + reject(err); + return; + } + + resolve(signedUrl); + }, + ); + }); + } + + /** + * Returns an encoded key, as the browser would encode it. It's critical that + * this matches the browser's encoding, because the signed URL will fail + * otherwise. + * @param {string} key The key to be encoded. + * @return {[type]} The encoded key. + */ + static getEncodedKey(key) { + let encodedKey = encodeURI(key); + encodedKey = encodedKey.replace(/'/g, '%27'); + return encodedKey; + } +} + +export default new AWSInterface(); diff --git a/src/core/images.js b/src/core/images.js index 4105068..bb22087 100644 --- a/src/core/images.js +++ b/src/core/images.js @@ -5,171 +5,182 @@ import md5 from 'md5'; import { parseUserAgent } from 'detect-browser'; export const parseArguments = (query, basepath = '/', useragent = '') => { - - // Parse url - let url = query.url || ''; - url = url.replace('http://127.0.0.1/', '/'); - url = url.replace('http://127.0.0.1:32400/', '/'); - url = url.replace(basepath, '/'); - if (query['X-Plex-Token'] && url && url[0] === '/') { - url += (url.indexOf('?') === -1) ? `?X-Plex-Token=${query['X-Plex-Token']}` : `&X-Plex-Token=${query['X-Plex-Token']}` - } - if (url && url[0] === '/') - url = basepath + url.substring(1); - - // Extract parameters - const params = { - ...((query.width) ? { width: parseInt(query.width) } : {}), - ...((query.height) ? { height: parseInt(query.height) } : {}), - ...((query.background) ? { background: query.background } : {}), - ...((query.opacity) ? { opacity: parseInt(query.opacity) } : {}), - ...((query.minSize) ? { minSize: parseInt(query.minSize) } : {}), - ...((query.blur) ? { blur: parseInt(query.blur) } : {}), - ...((query.format && (query.format === 'webp' || query.format === 'png')) ? { format: query.format } : { format: 'jpg' }), - ...((query.upscale) ? { upscale: parseInt(query.upscale) } : {}), - ...((query.quality) ? { quality: parseInt(query.quality) } : ((query.blur) ? { quality: 100 } : { quality: 70 })), - alpha: (query.format === 'png'), - ...((query['X-Plex-Token']) ? { "X-Plex-Token": query['X-Plex-Token'] } : {}), - url - }; - - // Auto select WebP if user-agent support it - const browser = parseUserAgent(useragent); - if (browser && browser.name && browser.name === 'chrome' && !query.format) { - params.format = 'webp'; - } - - // Generate key - params.key = md5(`${(query.url || '').split('?')[0]}|${params.width || ''}|${params.height || ''}|${params.background || ''}|${params.opacity || ''}|${params.minSize || ''}|${params.blur || ''}|${params.format || ''}|${params.upscale || ''}|${params.quality || ''}`.toLowerCase()) - - // Return params - return params; -} + // Parse url + let url = query.url || ''; + url = url.replace('http://127.0.0.1/', '/'); + url = url.replace('http://127.0.0.1:32400/', '/'); + url = url.replace(basepath, '/'); + if (query['X-Plex-Token'] && url && url[0] === '/') { + url += + url.indexOf('?') === -1 + ? `?X-Plex-Token=${query['X-Plex-Token']}` + : `&X-Plex-Token=${query['X-Plex-Token']}`; + } + if (url && url[0] === '/') url = basepath + url.substring(1); + + // Extract parameters + const params = { + ...(query.width ? { width: parseInt(query.width) } : {}), + ...(query.height ? { height: parseInt(query.height) } : {}), + ...(query.background ? { background: query.background } : {}), + ...(query.opacity ? { opacity: parseInt(query.opacity) } : {}), + ...(query.minSize ? { minSize: parseInt(query.minSize) } : {}), + ...(query.blur ? { blur: parseInt(query.blur) } : {}), + ...(query.format && (query.format === 'webp' || query.format === 'png') + ? { format: query.format } + : { format: 'jpg' }), + ...(query.upscale ? { upscale: parseInt(query.upscale) } : {}), + ...(query.quality + ? { quality: parseInt(query.quality) } + : query.blur + ? { quality: 100 } + : { quality: 70 }), + alpha: query.format === 'png', + ...(query['X-Plex-Token'] ? { 'X-Plex-Token': query['X-Plex-Token'] } : {}), + url, + }; + + // Auto select WebP if user-agent support it + const browser = parseUserAgent(useragent); + if (browser && browser.name && browser.name === 'chrome' && !query.format) { + params.format = 'webp'; + } + + // Generate key + params.key = md5( + `${(query.url || '').split('?')[0]}|${params.width || ''}|${params.height || + ''}|${params.background || ''}|${params.opacity || ''}|${params.minSize || + ''}|${params.blur || ''}|${params.format || ''}|${params.upscale || ''}|${params.quality || + ''}`.toLowerCase(), + ); + + // Return params + return params; +}; export const resize = (parameters, headers = {}) => { - return new Promise(async (resolve, reject) => { + return new Promise(async (resolve, reject) => { + try { + const params = { + // Width of the image (px value) + width: false, + + // Height of the image (px value) + height: false, + + // Background color + background: false, + + // Background opacity + opacity: false, + + // Resize constraint (0:height / 1:width) + minSize: 0, + + // Blur on picture (between 0 and 10000) + blur: 0, + + // Output format + format: false, // png / jpg / webp + + // Force upscale + upscale: false, + + // User parameters + ...parameters, + }; + + if (!params.width || !params.height) return reject('Size not provided'); + + // Erase previous host header + delete headers.host; + + // Get image content + const body = await fetch(parameters.url, { + headers, + }).then((res) => res.buffer()); + + // Load body + let s = false; + try { + s = sharp(body).on('error', (err) => { + return reject(err); + }); + } catch (e) { + return reject(e); + } + if (!s) return reject(e); + + // Resize parameters + const opt = { + ...(params.upscale ? { withoutEnlargement: !!params.upscale } : {}), + }; + + // Resize based on width + try { + if (params.minSize === 1) s.resize(params.width, null, opt); + else s.resize(null, params.height, opt); + } catch (e) { + return reject(e); + } + + // Background & opacity support + if (params.background && params.opacity) { + let bgd = false; try { - const params = { - - // Width of the image (px value) - width: false, - - // Height of the image (px value) - height: false, - - // Background color - background: false, - - // Background opacity - opacity: false, - - // Resize constraint (0:height / 1:width) - minSize: 0, - - // Blur on picture (between 0 and 10000) - blur: 0, - - // Output format - format: false, // png / jpg / webp - - // Force upscale - upscale: false, - - // User parameters - ...parameters - } - - if (!params.width || !params.height) - return reject('Size not provided'); - - // Erase previous host header - delete headers.host; - - // Get image content - const body = await fetch(parameters.url, { - headers - }).then(res => res.buffer()); - - // Load body - let s = false; - try { - s = sharp(body).on('error', err => { return reject(err); }); - } - catch (e) { - return reject(e) - } - if (!s) - return reject(e) - - // Resize parameters - const opt = { - ...((params.upscale) ? { withoutEnlargement: !!params.upscale } : {}) - } - - // Resize based on width - try { - if (params.minSize === 1) - s.resize(params.width, null, opt); - else - s.resize(null, params.height, opt); - } - catch (e) { - return reject(e) - } - - // Background & opacity support - if (params.background && params.opacity) { - let bgd = false; - try { - const buff = await s.png().toBuffer(); - s = sharp(buff).on('error', err => { return reject(err); }); - const meta = await s.metadata(); - bgd = await sharp({ - create: { - width: meta.width, - height: meta.height, - channels: 4, - background: { - r: color(`#${params.background}`).r, - g: color(`#${params.background}`).g, - b: color(`#${params.background}`).b, - alpha: ((100 - params.opacity) / 100) - } - } - }).on('error', err => { return reject(err); }).png().toBuffer(); - } - catch (e) { - return reject(e) - } - s.overlayWith(bgd); - } - - // Blur - if (params.blur > 0 && params.blur <= 1000) - s.blur(params.blur * 1.25).gamma(2); - - // Output format - if (params.format === 'jpg') - s.jpeg({ - quality: params.quality - }) - else if (params.format === 'png') - s.png({ - quality: params.quality, - progressive: true, - compressionLevel: 9 - }) - else if (params.format === 'webp') - s.webp({ - quality: params.quality, - ...((parameters.alpha) ? {} : { alphaQuality: 0 }) - }) - - // Return stream - resolve(s); - } - catch (err) { - reject(err); + const buff = await s.png().toBuffer(); + s = sharp(buff).on('error', (err) => { + return reject(err); + }); + const meta = await s.metadata(); + bgd = await sharp({ + create: { + width: meta.width, + height: meta.height, + channels: 4, + background: { + r: color(`#${params.background}`).r, + g: color(`#${params.background}`).g, + b: color(`#${params.background}`).b, + alpha: (100 - params.opacity) / 100, + }, + }, + }) + .on('error', (err) => { + return reject(err); + }) + .png() + .toBuffer(); + } catch (e) { + return reject(e); } - }); -} \ No newline at end of file + s.overlayWith(bgd); + } + + // Blur + if (params.blur > 0 && params.blur <= 1000) s.blur(params.blur * 1.25).gamma(2); + + // Output format + if (params.format === 'jpg') + s.jpeg({ + quality: params.quality, + }); + else if (params.format === 'png') + s.png({ + quality: params.quality, + progressive: true, + compressionLevel: 9, + }); + else if (params.format === 'webp') + s.webp({ + quality: params.quality, + ...(parameters.alpha ? {} : { alphaQuality: 0 }), + }); + + // Return stream + resolve(s); + } catch (err) { + reject(err); + } + }); +}; diff --git a/src/core/servers.js b/src/core/servers.js index 0f478aa..898c88d 100644 --- a/src/core/servers.js +++ b/src/core/servers.js @@ -8,113 +8,135 @@ let ServersManager = {}; // Add or update a server ServersManager.update = (e) => { - const name = (e.name) ? e.name : (e.url) ? e.url : ''; - if (!name) - return (ServersManager.list()); - servers[name] = { - name, - sessions: ((!Array.isArray(e.sessions)) ? [] : e.sessions.map((s) => ({ - id: ((s.id) ? s.id : false), - status: ((s.status && ['DONE', 'DOWNLOAD', 'TRANSCODE'].indexOf(s.status.toUpperCase()) !== -1) ? s.status.toUpperCase() : false), - codec: ((s.codec) ? s.codec : false), - lastChunkDownload: ((s.lastChunkDownload) ? s.lastChunkDownload : 0) - }))).filter((s) => (s.id !== false && s.status !== false)), - settings: { - maxSessions: ((typeof (e.settings) !== 'undefined' && typeof (e.settings.maxSessions) !== 'undefined') ? parseInt(e.settings.maxSessions) : 0), - maxDownloads: ((typeof (e.settings) !== 'undefined' && typeof (e.settings.maxDownloads) !== 'undefined') ? parseInt(e.settings.maxDownloads) : 0), - maxTranscodes: ((typeof (e.settings) !== 'undefined' && typeof (e.settings.maxTranscodes) !== 'undefined') ? parseInt(e.settings.maxTranscodes) : 0), - }, - url: ((e.url) ? e.url : false), - time: time() - }; - return (ServersManager.list()); + const name = e.name ? e.name : e.url ? e.url : ''; + if (!name) return ServersManager.list(); + servers[name] = { + name, + sessions: (!Array.isArray(e.sessions) + ? [] + : e.sessions.map((s) => ({ + id: s.id ? s.id : false, + status: + s.status && ['DONE', 'DOWNLOAD', 'TRANSCODE'].indexOf(s.status.toUpperCase()) !== -1 + ? s.status.toUpperCase() + : false, + codec: s.codec ? s.codec : false, + lastChunkDownload: s.lastChunkDownload ? s.lastChunkDownload : 0, + })) + ).filter((s) => s.id !== false && s.status !== false), + settings: { + maxSessions: + typeof e.settings !== 'undefined' && typeof e.settings.maxSessions !== 'undefined' + ? parseInt(e.settings.maxSessions) + : 0, + maxDownloads: + typeof e.settings !== 'undefined' && typeof e.settings.maxDownloads !== 'undefined' + ? parseInt(e.settings.maxDownloads) + : 0, + maxTranscodes: + typeof e.settings !== 'undefined' && typeof e.settings.maxTranscodes !== 'undefined' + ? parseInt(e.settings.maxTranscodes) + : 0, + }, + url: e.url ? e.url : false, + time: time(), + }; + return ServersManager.list(); }; // Remove a server ServersManager.remove = (e) => { - const name = (e.name) ? e.name : (e.url) ? e.url : ''; - delete servers[name]; - return (ServersManager.list()); + const name = e.name ? e.name : e.url ? e.url : ''; + delete servers[name]; + return ServersManager.list(); }; // List all the servers with scores ServersManager.list = () => { - let output = {}; - Object.keys(servers).forEach((i) => { - output[i] = { ...servers[i], score: ServersManager.score(servers[i]) }; - }); - return (output); -} + let output = {}; + Object.keys(servers).forEach((i) => { + output[i] = { ...servers[i], score: ServersManager.score(servers[i]) }; + }); + return output; +}; // Chose best server ServersManager.chooseServer = (session, ip = false) => { - return (new Promise((resolve, reject) => { - let tab = []; - const list = ServersManager.list(); - Object.keys(list).forEach((i) => { - tab.push(list[i]); - }); - tab.sort((a, b) => (a.score - b.score)); - if (typeof (tab[0]) === 'undefined') - return resolve(false); - const origin = encodeURIComponent(publicUrl()) - fetch(`${tab[0].url}/api/resolve?session=${session}&ip=${ip}&origin=${origin}`) - .then(res => res.json()) - .then(body => { - return resolve(body.client) - }).catch((err) => { return reject(err) }); - })); + return new Promise((resolve, reject) => { + let tab = []; + const list = ServersManager.list(); + Object.keys(list).forEach((i) => { + tab.push(list[i]); + }); + tab.sort((a, b) => a.score - b.score); + if (typeof tab[0] === 'undefined') return resolve(false); + const origin = encodeURIComponent(publicUrl()); + fetch(`${tab[0].url}/api/resolve?session=${session}&ip=${ip}&origin=${origin}`) + .then((res) => res.json()) + .then((body) => { + return resolve(body.client); + }) + .catch((err) => { + return reject(err); + }); + }); }; // Calculate server score ServersManager.score = (e) => { - // The configuration wasn't updated since X seconds, the server is probably unavailable - if (time() - e.time > config.custom.scores.timeout) - return (100); - - // Default load 0 - let load = 0; - - // Add load value for each session - e.sessions.forEach((s) => { - - // Transcode streams - if (s.status === 'TRANSCODE') { - load += 1; - if (s.codec === 'hevc') { - load += 1.5; - } - if (s.codec === 'copy') { - load -= 0.5; - } - } - - // Serving streams - if (s.status === 'DONE') { - load += 0.5; - } - - // Download streams - if (s.status === 'DOWNLOAD') { - load += 0.25; - } - }) - - // Server already have too much sessions - if (e.sessions.filter((s) => (['TRANSCODE', 'DONE'].indexOf(s.status) !== -1)).length > e.settings.maxSessions) - load += 2.5; - - // Server already have too much transcodes - if (e.sessions.filter((s) => (['TRANSCODE'].indexOf(s.status) !== -1)).length > e.settings.maxTranscodes) - load += 5; - - // Server already have too much downloads - if (e.sessions.filter((s) => (['DOWNLOAD'].indexOf(s.status) !== -1)).length > e.settings.maxDownloads) - load += 1; - - // Return load - return (load); -} + // The configuration wasn't updated since X seconds, the server is probably unavailable + if (time() - e.time > config.custom.scores.timeout) return 100; + + // Default load 0 + let load = 0; + + // Add load value for each session + e.sessions.forEach((s) => { + // Transcode streams + if (s.status === 'TRANSCODE') { + load += 1; + if (s.codec === 'hevc') { + load += 1.5; + } + if (s.codec === 'copy') { + load -= 0.5; + } + } + + // Serving streams + if (s.status === 'DONE') { + load += 0.5; + } + + // Download streams + if (s.status === 'DOWNLOAD') { + load += 0.25; + } + }); + + // Server already have too much sessions + if ( + e.sessions.filter((s) => ['TRANSCODE', 'DONE'].indexOf(s.status) !== -1).length > + e.settings.maxSessions + ) + load += 2.5; + + // Server already have too much transcodes + if ( + e.sessions.filter((s) => ['TRANSCODE'].indexOf(s.status) !== -1).length > + e.settings.maxTranscodes + ) + load += 5; + + // Server already have too much downloads + if ( + e.sessions.filter((s) => ['DOWNLOAD'].indexOf(s.status) !== -1).length > e.settings.maxDownloads + ) + load += 1; + + // Return load + return load; +}; // Returns our ServersManager export default ServersManager; diff --git a/src/core/sessions.js b/src/core/sessions.js index fe43081..20b6a4d 100644 --- a/src/core/sessions.js +++ b/src/core/sessions.js @@ -14,135 +14,137 @@ let SessionsManager = {}; let cache = {}; // Table to link session to transcoder url -let urls = {} +let urls = {}; SessionsManager.chooseServer = async (session, ip = false) => { - if (urls[session]) - return (urls[session]); - let url = ''; - try { - url = await ServersManager.chooseServer(session, ip); - } - catch (err) { } - D('SERVER ' + session + ' [' + url + ']'); - if (url.length) - urls[session] = url; - return (url); + if (urls[session]) return urls[session]; + let url = ''; + try { + url = await ServersManager.chooseServer(session, ip); + } catch (err) {} + D('SERVER ' + session + ' [' + url + ']'); + if (url.length) urls[session] = url; + return url; }; SessionsManager.cacheSessionFromRequest = (req) => { - if (typeof (req.query['X-Plex-Session-Identifier']) !== 'undefined' && typeof (req.query.session) !== 'undefined') { - cache[req.query['X-Plex-Session-Identifier']] = req.query.session.toString(); - } -} + if ( + typeof req.query['X-Plex-Session-Identifier'] !== 'undefined' && + typeof req.query.session !== 'undefined' + ) { + cache[req.query['X-Plex-Session-Identifier']] = req.query.session.toString(); + } +}; SessionsManager.getCacheSession = (xplexsessionidentifier) => { - if (cache[xplexsessionidentifier]) - return (cache[xplexsessionidentifier]); - return (false); -} + if (cache[xplexsessionidentifier]) return cache[xplexsessionidentifier]; + return false; +}; SessionsManager.getSessionFromRequest = (req) => { - if (typeof (req.params.sessionId) !== 'undefined') - return (req.params.sessionId); - if (typeof (req.query.session) !== 'undefined') - return (req.query.session); - if (typeof (req.query['X-Plex-Session-Identifier']) !== 'undefined' && typeof (cache[req.query['X-Plex-Session-Identifier']]) !== 'undefined') - return (cache[req.query['X-Plex-Session-Identifier']]); - if (typeof (req.query['X-Plex-Session-Identifier']) !== 'undefined') - return (req.query['X-Plex-Session-Identifier']); - if (typeof (req.query['X-Plex-Client-Identifier']) !== 'undefined') - return (req.query['X-Plex-Client-Identifier']); - return (false); -} + if (typeof req.params.sessionId !== 'undefined') return req.params.sessionId; + if (typeof req.query.session !== 'undefined') return req.query.session; + if ( + typeof req.query['X-Plex-Session-Identifier'] !== 'undefined' && + typeof cache[req.query['X-Plex-Session-Identifier']] !== 'undefined' + ) + return cache[req.query['X-Plex-Session-Identifier']]; + if (typeof req.query['X-Plex-Session-Identifier'] !== 'undefined') + return req.query['X-Plex-Session-Identifier']; + if (typeof req.query['X-Plex-Client-Identifier'] !== 'undefined') + return req.query['X-Plex-Client-Identifier']; + return false; +}; // Parse FFmpeg parameters with internal bindings SessionsManager.parseFFmpegParameters = async (args = [], env = {}) => { - // Extract Session ID - const regex = /^http\:\/\/127.0.0.1:32400\/video\/:\/transcode\/session\/(.*)\/progress$/; - const sessions = args.filter(e => (regex.test(e))).map(e => (e.match(regex)[1])) - const sessionFull = (typeof (sessions[0]) !== 'undefined') ? sessions[0] : false; - const sessionId = (typeof (sessions[0]) !== 'undefined') ? sessions[0].split('/')[0] : false; - - // Check Session Id - if (!sessionId || !sessionFull) - return (false); - - // Debug - D('FFMPEG ' + sessionId + ' [' + sessionFull + ']'); - - // Parse arguments - const parsedArgs = args.map((e) => { - - // Progress - if (e.indexOf('/progress') !== -1) - return (e.replace(plexUrl(), '{INTERNAL_TRANSCODER}')); - - // Manifest and seglist - if (e.indexOf('/manifest') !== -1 || e.indexOf('/seglist') !== -1) - return (e.replace(plexUrl(), '{INTERNAL_TRANSCODER}')); - - // Other - return (e.replace(plexUrl(), publicUrl()).replace(config.plex.path.sessions, publicUrl() + 'api/sessions/').replace(config.plex.path.usr, '{INTERNAL_RESOURCES}')); - }); - - // Add seglist to arguments if needed and resolve links if needed - const segList = '{INTERNAL_TRANSCODER}video/:/transcode/session/' + sessionFull + '/seglist'; - let finalArgs = []; - let segListMode = false; - for (let i = 0; i < parsedArgs.length; i++) { - let e = parsedArgs[i]; - - // Seglist - if (e === '-segment_list') { - segListMode = true; - finalArgs.push(e); - continue; - } - if (segListMode) { - finalArgs.push(segList); - if (parsedArgs[i + 1] !== '-segment_list_type') - finalArgs.push('-segment_list_type', 'csv', '-segment_list_size', '2147483647'); - segListMode = false; - continue; - } - - // Link resolver (Replace filepath to http plex path) - if (i > 0 && parsedArgs[i - 1] === '-i' && !config.custom.download.forward) { - let file = parsedArgs[i]; - try { - const data = await Database.getPartFromPath(parsedArgs[i]); - if (typeof (data.id) !== 'undefined') - file = `${publicUrl()}library/parts/${data.id}/0/file.stream?download=1`; - } catch (e) { - file = parsedArgs[i] - } - finalArgs.push(file); - continue; - } - - // Ignore parameter - finalArgs.push(e); - }; - return ({ - args: finalArgs, - env, - session: sessionId, - sessionFull - }); + // Extract Session ID + const regex = /^http\:\/\/127.0.0.1:32400\/video\/:\/transcode\/session\/(.*)\/progress$/; + const sessions = args.filter((e) => regex.test(e)).map((e) => e.match(regex)[1]); + const sessionFull = typeof sessions[0] !== 'undefined' ? sessions[0] : false; + const sessionId = typeof sessions[0] !== 'undefined' ? sessions[0].split('/')[0] : false; + + // Check Session Id + if (!sessionId || !sessionFull) return false; + + // Debug + D('FFMPEG ' + sessionId + ' [' + sessionFull + ']'); + + // Parse arguments + const parsedArgs = args.map((e) => { + // Progress + if (e.indexOf('/progress') !== -1) return e.replace(plexUrl(), '{INTERNAL_TRANSCODER}'); + + // Manifest and seglist + if (e.indexOf('/manifest') !== -1 || e.indexOf('/seglist') !== -1) + return e.replace(plexUrl(), '{INTERNAL_TRANSCODER}'); + + // Other + return e + .replace(plexUrl(), publicUrl()) + .replace(config.plex.path.sessions, publicUrl() + 'api/sessions/') + .replace(config.plex.path.usr, '{INTERNAL_RESOURCES}'); + }); + + // Add seglist to arguments if needed and resolve links if needed + const segList = '{INTERNAL_TRANSCODER}video/:/transcode/session/' + sessionFull + '/seglist'; + let finalArgs = []; + let segListMode = false; + for (let i = 0; i < parsedArgs.length; i++) { + let e = parsedArgs[i]; + + // Seglist + if (e === '-segment_list') { + segListMode = true; + finalArgs.push(e); + continue; + } + if (segListMode) { + finalArgs.push(segList); + if (parsedArgs[i + 1] !== '-segment_list_type') + finalArgs.push('-segment_list_type', 'csv', '-segment_list_size', '2147483647'); + segListMode = false; + continue; + } + + // Link resolver (Replace filepath to http plex path) + if (i > 0 && parsedArgs[i - 1] === '-i' && !config.custom.download.forward) { + let file = parsedArgs[i]; + try { + const data = await Database.getPartFromPath(parsedArgs[i]); + if (typeof data.id !== 'undefined') + file = `${publicUrl()}library/parts/${data.id}/0/file.stream?download=1`; + } catch (e) { + file = parsedArgs[i]; + } + finalArgs.push(file); + continue; + } + + // Ignore parameter + finalArgs.push(e); + } + return { + args: finalArgs, + env, + session: sessionId, + sessionFull, + }; }; // Store the FFMPEG parameters in RedisCache SessionsManager.storeFFmpegParameters = async (args, env) => { - const parsed = await SessionsManager.parseFFmpegParameters(args, env); - console.log('FFMPEG', parsed.session, parsed); - SessionStore.set(parsed.session, parsed).then(() => { }).catch(() => { }) - return (parsed); + const parsed = await SessionsManager.parseFFmpegParameters(args, env); + console.log('FFMPEG', parsed.session, parsed); + SessionStore.set(parsed.session, parsed) + .then(() => {}) + .catch(() => {}); + return parsed; }; SessionsManager.cleanSession = (sessionId) => { - D('DELETE ' + sessionId); - return SessionStore.delete(sessionId) + D('DELETE ' + sessionId); + return SessionStore.delete(sessionId); }; // Export our SessionsManager diff --git a/src/database/index.js b/src/database/index.js index 37641dd..398d6fd 100644 --- a/src/database/index.js +++ b/src/database/index.js @@ -9,11 +9,11 @@ const D = debug('UnicornLoadBalancer'); let Database; if (config.database.mode === 'sqlite') { - D('Using sqlite as database'); - Database = SqliteDatabase; + D('Using sqlite as database'); + Database = SqliteDatabase; } else if (config.database.mode === 'postgresql') { - D('Using postgresql as database'); - Database = PostgresqlDatabase; + D('Using postgresql as database'); + Database = PostgresqlDatabase; } -export default Database; \ No newline at end of file +export default Database; diff --git a/src/database/postgresql.js b/src/database/postgresql.js index 1e07090..6489dc9 100644 --- a/src/database/postgresql.js +++ b/src/database/postgresql.js @@ -3,54 +3,60 @@ import config from '../config'; let PostgresqlDatabase = {}; -const _getClient = () => (new Promise(async (resolve, reject) => { +const _getClient = () => + new Promise(async (resolve, reject) => { const client = new Client({ - user: config.database.postgresql.user, - host: config.database.postgresql.host, - database: config.database.postgresql.database, - password: config.database.postgresql.password, - port: config.database.postgresql.port, - }) + user: config.database.postgresql.user, + host: config.database.postgresql.host, + database: config.database.postgresql.database, + password: config.database.postgresql.password, + port: config.database.postgresql.port, + }); client.on('error', (err) => { - return reject(err); - }) + return reject(err); + }); await client.connect(); return resolve(client); -})) + }); -PostgresqlDatabase.getPartFromId = (part_id) => (new Promise((resolve, reject) => { - _getClient().then((client) => { +PostgresqlDatabase.getPartFromId = (part_id) => + new Promise((resolve, reject) => { + _getClient() + .then((client) => { client.query('SELECT * FROM media_parts WHERE id=$1 LIMIT 1', [part_id], (err, res) => { - if (err) - return reject(err); - client.end() - if (res.rows.length) { - return resolve(res.rows[0]) - } else { - return reject('FILE_NOT_FOUND'); - } - }) - }).catch((err) => { + if (err) return reject(err); + client.end(); + if (res.rows.length) { + return resolve(res.rows[0]); + } else { + return reject('FILE_NOT_FOUND'); + } + }); + }) + .catch((err) => { return reject('DATABASE_ERROR'); - }) -})) + }); + }); -PostgresqlDatabase.getPartFromPath = (path) => (new Promise((resolve, reject) => { - _getClient().then((client) => { +PostgresqlDatabase.getPartFromPath = (path) => + new Promise((resolve, reject) => { + _getClient() + .then((client) => { client.query('SELECT * FROM media_parts WHERE file=$1 LIMIT 1', [path], (err, res) => { - if (err) { - return reject(err); - } - client.end() - if (res.rows.length) { - return resolve(res.rows[0]) - } else { - return reject('FILE_NOT_FOUND'); - } - }) - }).catch((err) => { + if (err) { + return reject(err); + } + client.end(); + if (res.rows.length) { + return resolve(res.rows[0]); + } else { + return reject('FILE_NOT_FOUND'); + } + }); + }) + .catch((err) => { return reject('DATABASE_ERROR'); - }) -})) + }); + }); -export default PostgresqlDatabase; \ No newline at end of file +export default PostgresqlDatabase; diff --git a/src/database/sqlite.js b/src/database/sqlite.js index 79b7b7f..07b40e9 100644 --- a/src/database/sqlite.js +++ b/src/database/sqlite.js @@ -3,36 +3,32 @@ import config from '../config'; let SqliteDatabase = {}; -SqliteDatabase.getPartFromId = (part_id) => (new Promise((resolve, reject) => { +SqliteDatabase.getPartFromId = (part_id) => + new Promise((resolve, reject) => { try { - const db = new (sqlite3.verbose().Database)(config.database.sqlite.path); - db.get('SELECT * FROM media_parts WHERE id=? LIMIT 0, 1', part_id, (err, row) => { - if (row && row.file) - resolve(row); - else - reject('FILE_NOT_FOUND'); - db.close(); - }); + const db = new (sqlite3.verbose()).Database(config.database.sqlite.path); + db.get('SELECT * FROM media_parts WHERE id=? LIMIT 0, 1', part_id, (err, row) => { + if (row && row.file) resolve(row); + else reject('FILE_NOT_FOUND'); + db.close(); + }); + } catch (err) { + return reject('DATABASE_ERROR'); } - catch (err) { - return reject('DATABASE_ERROR'); - } -})) + }); -SqliteDatabase.getPartFromPath = (path) => (new Promise((resolve, reject) => { +SqliteDatabase.getPartFromPath = (path) => + new Promise((resolve, reject) => { try { - const db = new (sqlite3.verbose().Database)(config.database.sqlite.path); - db.get('SELECT * FROM media_parts WHERE file=? LIMIT 0, 1', path, (err, row) => { - if (row && row.file) - resolve(row); - else - reject('FILE_NOT_FOUND'); - db.close(); - }); - } - catch (err) { - return reject('DATABASE_ERROR'); + const db = new (sqlite3.verbose()).Database(config.database.sqlite.path); + db.get('SELECT * FROM media_parts WHERE file=? LIMIT 0, 1', path, (err, row) => { + if (row && row.file) resolve(row); + else reject('FILE_NOT_FOUND'); + db.close(); + }); + } catch (err) { + return reject('DATABASE_ERROR'); } -})) + }); -export default SqliteDatabase; \ No newline at end of file +export default SqliteDatabase; diff --git a/src/routes/api.js b/src/routes/api.js index edf9bd4..0899fd6 100644 --- a/src/routes/api.js +++ b/src/routes/api.js @@ -14,57 +14,83 @@ let RoutesAPI = {}; // Returns all the stats of all the transcoders RoutesAPI.stats = (req, res) => { - res.send(ServersManager.list()); + res.send(ServersManager.list()); }; // Save the stats of a server RoutesAPI.update = (req, res) => { - res.send(ServersManager.update(req.body)); + res.send(ServersManager.update(req.body)); }; // Save the FFMPEG arguments // Body: {args: [], env: []} RoutesAPI.ffmpeg = (req, res) => { - console.log('FFMPEG CALLED 1') - if (!req.body || !req.body.arg || !req.body.env) - return (res.status(400).send({ error: { code: 'INVALID_ARGUMENTS', message: 'Invalid UnicornFFMPEG parameters' } })); - console.log('FFMPEG CALLED 2') - return (res.send(SessionsManager.storeFFmpegParameters(req.body.arg, req.body.env))); + console.log('FFMPEG CALLED 1'); + if (!req.body || !req.body.arg || !req.body.env) + return res.status(400).send({ + error: { + code: 'INVALID_ARGUMENTS', + message: 'Invalid UnicornFFMPEG parameters', + }, + }); + console.log('FFMPEG CALLED 2'); + return res.send(SessionsManager.storeFFmpegParameters(req.body.arg, req.body.env)); }; // Resolve path from file id RoutesAPI.path = (req, res) => { - Database.getPartFromId(req.params.id).then((data) => { - res.send(JSON.stringify(data)); - }).catch((err) => { - res.status(400).send({ error: { code: 'FILE_NOT_FOUND', message: 'File not found in Plex Database' } }); + Database.getPartFromId(req.params.id) + .then((data) => { + res.send(JSON.stringify(data)); }) + .catch((err) => { + res.status(400).send({ + error: { + code: 'FILE_NOT_FOUND', + message: 'File not found in Plex Database', + }, + }); + }); }; // Proxy to Plex RoutesAPI.plex = (req, res) => { - const proxy = httpProxy.createProxyServer({ - target: { - host: config.plex.host, - port: config.plex.port - } - }).on('error', (err) => { - if (err.code === 'HPE_UNEXPECTED_CONTENT_LENGTH') { - return (res.status(200).send()); - } - res.status(400).send({ error: { code: 'PROXY_TIMEOUT', message: 'Plex not respond in time, proxy request fails' } }); + const proxy = httpProxy + .createProxyServer({ + target: { + host: config.plex.host, + port: config.plex.port, + }, + }) + .on('error', (err) => { + if (err.code === 'HPE_UNEXPECTED_CONTENT_LENGTH') { + return res.status(200).send(); + } + res.status(400).send({ + error: { + code: 'PROXY_TIMEOUT', + message: 'Plex not respond in time, proxy request fails', + }, + }); }); - req.url = req.url.slice('/api/plex'.length); - return (proxy.web(req, res)); + req.url = req.url.slice('/api/plex'.length); + return proxy.web(req, res); }; // Returns session RoutesAPI.session = (req, res) => { - SessionStore.get(req.params.session).then((data) => { - res.send(data); - }).catch(() => { - res.status(400).send({ error: { code: 'SESSION_TIMEOUT', message: 'The session wasn\'t launched in time, request fails' } }); + SessionStore.get(req.params.session) + .then((data) => { + res.send(data); }) + .catch(() => { + res.status(400).send({ + error: { + code: 'SESSION_TIMEOUT', + message: "The session wasn't launched in time, request fails", + }, + }); + }); }; // Export all our API routes diff --git a/src/routes/index.js b/src/routes/index.js index aeb3813..3b7fd3f 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -7,56 +7,71 @@ import RoutesProxy from './proxy'; import RoutesResize from './resize'; export default (app) => { + // Note for future: + // We NEED to 302 the chunk requests because if Plex catchs it with fake transcoder, it stucks - // Note for future: - // We NEED to 302 the chunk requests because if Plex catchs it with fake transcoder, it stucks - - // UnicornLoadBalancer API - app.use('/api/sessions', express.static(config.plex.path.sessions)); - app.get('/api/stats', RoutesAPI.stats); - app.post('/api/ffmpeg', RoutesAPI.ffmpeg); - app.get('/api/path/:id', RoutesAPI.path); - app.post('/api/update', RoutesAPI.update); - app.get('/api/session/:session', RoutesAPI.session); - app.all('/api/plex/*', RoutesAPI.plex); - - // MPEG Dash support - app.get('/:formatType/:/transcode/universal/start.mpd', RoutesTranscode.dashStart); - app.get('/:formatType/:/transcode/universal/dash/:sessionId/:streamId/initial.mp4', RoutesTranscode.redirect); - app.get('/:formatType/:/transcode/universal/dash/:sessionId/:streamId/:partId.m4s', RoutesTranscode.redirect); - - // Long polling support - app.get('/:formatType/:/transcode/universal/start', RoutesTranscode.lpStart); - app.get('/:formatType/:/transcode/universal/subtitles', RoutesTranscode.redirect); - - // M3U8 support - app.get('/:formatType/:/transcode/universal/start.m3u8', RoutesTranscode.hlsStart); - app.get('/:formatType/:/transcode/universal/session/:sessionId/base/index.m3u8', RoutesTranscode.redirect); - app.get('/:formatType/:/transcode/universal/session/:sessionId/base-x-mc/index.m3u8', RoutesTranscode.redirect); - app.get('/:formatType/:/transcode/universal/session/:sessionId/:fileType/:partId.ts', RoutesTranscode.redirect); - app.get('/:formatType/:/transcode/universal/session/:sessionId/:fileType/:partId.vtt', RoutesTranscode.redirect); - - // Control support - app.get('/:formatType/:/transcode/universal/stop', RoutesTranscode.stop); - app.get('/:formatType/:/transcode/universal/ping', RoutesTranscode.ping); - app.get('/:/timeline', RoutesTranscode.timeline); - - // Download - if (config.custom.download.forward) { - app.get('/library/parts/:id1/:id2/file.*', RoutesTranscode.redirect); - } - if (!config.custom.download.forward) { - app.get('/library/parts/:id1/:id2/file.*', RoutesTranscode.download); - } - - // Image Proxy or Image Resizer - if (config.custom.image.proxy && config.custom.image.resizer) { - app.get('/photo/:/transcode', RoutesResize.proxy); - } - else if (config.custom.image.resizer) { - app.get('/photo/:/transcode', RoutesResize.resize); - } - - // Forward other to Plex - app.all('*', RoutesProxy.plex); + // UnicornLoadBalancer API + app.use('/api/sessions', express.static(config.plex.path.sessions)); + app.get('/api/stats', RoutesAPI.stats); + app.post('/api/ffmpeg', RoutesAPI.ffmpeg); + app.get('/api/path/:id', RoutesAPI.path); + app.post('/api/update', RoutesAPI.update); + app.get('/api/session/:session', RoutesAPI.session); + app.all('/api/plex/*', RoutesAPI.plex); + + // MPEG Dash support + app.get('/:formatType/:/transcode/universal/start.mpd', RoutesTranscode.dashStart); + app.get( + '/:formatType/:/transcode/universal/dash/:sessionId/:streamId/initial.mp4', + RoutesTranscode.redirect, + ); + app.get( + '/:formatType/:/transcode/universal/dash/:sessionId/:streamId/:partId.m4s', + RoutesTranscode.redirect, + ); + app.all( + '/:formatType/:/transcode/session/:sessionId/:streamId/progress', + RoutesTranscode.progress, + ); + + // Long polling support + app.get('/:formatType/:/transcode/universal/start', RoutesTranscode.lpStart); + app.get('/:formatType/:/transcode/universal/subtitles', RoutesTranscode.redirect); + + // M3U8 support + app.get('/:formatType/:/transcode/universal/start.m3u8', RoutesTranscode.hlsStart); + app.get( + '/:formatType/:/transcode/universal/session/:sessionId/base/index.m3u8', + RoutesTranscode.redirect, + ); + app.get( + '/:formatType/:/transcode/universal/session/:sessionId/base-x-mc/index.m3u8', + RoutesTranscode.redirect, + ); + app.get( + '/:formatType/:/transcode/universal/session/:sessionId/:fileType/:partId.ts', + RoutesTranscode.redirect, + ); + app.get( + '/:formatType/:/transcode/universal/session/:sessionId/:fileType/:partId.vtt', + RoutesTranscode.redirect, + ); + + // Control support + app.get('/:formatType/:/transcode/universal/stop', RoutesTranscode.stop); + app.get('/:formatType/:/transcode/universal/ping', RoutesTranscode.ping); + app.get('/:/timeline', RoutesTranscode.timeline); + + // Download + app.get('/library/parts/:id1/:id2/file.*', RoutesTranscode.download); + + // Image Proxy or Image Resizer + if (config.custom.image.proxy && config.custom.image.resizer) { + app.get('/photo/:/transcode', RoutesResize.proxy); + } else if (config.custom.image.resizer) { + app.get('/photo/:/transcode', RoutesResize.resize); + } + + // Forward other to Plex + app.all('*', RoutesProxy.plex); }; diff --git a/src/routes/proxy.js b/src/routes/proxy.js index 624d675..74d82d1 100644 --- a/src/routes/proxy.js +++ b/src/routes/proxy.js @@ -4,32 +4,40 @@ import config from '../config'; let RoutesProxy = {}; RoutesProxy.plex = (req, res) => { - const proxy = httpProxy.createProxyServer({ - target: { - host: config.plex.host, - port: config.plex.port - } - }).on('error', (err) => { - // On some Plex request from FFmpeg, Plex don't create a valid request - if (err.code === 'HPE_UNEXPECTED_CONTENT_LENGTH') - return (res.status(200).send()); + const proxy = httpProxy + .createProxyServer({ + target: { + host: config.plex.host, + port: config.plex.port, + }, + }) + .on('error', (err) => { + // On some Plex request from FFmpeg, Plex don't create a valid request + if (err.code === 'HPE_UNEXPECTED_CONTENT_LENGTH') return res.status(200).send(); - // Other error - return (res.status(400).send({ error: { code: 'PROXY_TIMEOUT', message: 'Plex not respond in time, proxy request fails' } })); + // Other error + return res.status(400).send({ + error: { + code: 'PROXY_TIMEOUT', + message: 'Plex not respond in time, proxy request fails', + }, + }); }); - return (proxy.web(req, res)); + return proxy.web(req, res); }; RoutesProxy.ws = (req, res) => { - const proxy = httpProxy.createProxyServer({ - target: { - host: config.plex.host, - port: config.plex.port - } - }).on('error', () => { - // Fail silently + const proxy = httpProxy + .createProxyServer({ + target: { + host: config.plex.host, + port: config.plex.port, + }, + }) + .on('error', () => { + // Fail silently }); - return (proxy.ws(req, res)); + return proxy.ws(req, res); }; export default RoutesProxy; diff --git a/src/routes/resize.js b/src/routes/resize.js index 2c726a6..9af5c4f 100644 --- a/src/routes/resize.js +++ b/src/routes/resize.js @@ -11,44 +11,61 @@ let RoutesResize = {}; /* Forward image request to the image transcode */ RoutesResize.proxy = (req, res) => { - const params = parseArguments(req.query, publicUrl(), req.get('User-Agent')); - const path = Object.keys(params).map(e => (`${e}=${encodeURIComponent(params[e])}`)).join('&'); - req.url = config.custom.image.proxy + 'photo/:/transcode?' + path; - const proxy = httpProxy.createProxyServer({ target: config.custom.image.proxy, changeOrigin: true }); - proxy.on('error', (e) => { - return (res.status(400).send({ error: { code: 'RESIZE_ERROR', message: 'Invalid parameters, resize request fails' } })); + const params = parseArguments(req.query, publicUrl(), req.get('User-Agent')); + const path = Object.keys(params) + .map((e) => `${e}=${encodeURIComponent(params[e])}`) + .join('&'); + req.url = config.custom.image.proxy + 'photo/:/transcode?' + path; + const proxy = httpProxy.createProxyServer({ + target: config.custom.image.proxy, + changeOrigin: true, + }); + proxy.on('error', (e) => { + return res.status(400).send({ + error: { + code: 'RESIZE_ERROR', + message: 'Invalid parameters, resize request fails', + }, }); - proxy.web(req, res); -} + }); + proxy.web(req, res); +}; /* Custom image transcoder */ RoutesResize.resize = (req, res) => { + // Parse params + const params = parseArguments(req.query, publicUrl(), req.get('User-Agent')); - // Parse params - const params = parseArguments(req.query, publicUrl(), req.get('User-Agent')); - - // Check size - if (!params.width || !params.height || !params.url) - return (res.status(400).send({ error: { code: 'RESIZE_ERROR', message: 'Invalid parameters, resize request fails' } })); - - // Debug - D('IMAGE ' + params.url + ' [' + params.format + ']'); + // Check size + if (!params.width || !params.height || !params.url) + return res.status(400).send({ + error: { + code: 'RESIZE_ERROR', + message: 'Invalid parameters, resize request fails', + }, + }); - // Process image - resize(params, req.headers).then((stream) => { + // Debug + D('IMAGE ' + params.url + ' [' + params.format + ']'); - // Mime type - if (params.format === 'webp') - res.type(`image/webp`); - else if (params.format === 'png') - res.type(`image/png`); - else - res.type(`image/jpeg`); + // Process image + resize(params, req.headers) + .then((stream) => { + // Mime type + if (params.format === 'webp') res.type(`image/webp`); + else if (params.format === 'png') res.type(`image/png`); + else res.type(`image/jpeg`); - return stream.pipe(res); - }).catch(err => { - return (res.status(400).send({ error: { code: 'RESIZE_ERROR', message: 'Invalid parameters, resize request fails' } })); + return stream.pipe(res); }) + .catch((err) => { + return res.status(400).send({ + error: { + code: 'RESIZE_ERROR', + message: 'Invalid parameters, resize request fails', + }, + }); + }); }; -export default RoutesResize; \ No newline at end of file +export default RoutesResize; diff --git a/src/routes/transcode.js b/src/routes/transcode.js index 9ceb0a2..0c55ee3 100644 --- a/src/routes/transcode.js +++ b/src/routes/transcode.js @@ -1,8 +1,12 @@ import debug from 'debug'; import fetch from 'node-fetch'; -import RoutesProxy from './proxy'; + +import config from '../config'; import Database from '../database'; import SessionsManager from '../core/sessions'; +import AWS from '../core/aws'; + +import RoutesProxy from './proxy'; // Debugger const D = debug('UnicornLoadBalancer'); @@ -11,170 +15,238 @@ let RoutesTranscode = {}; /* Extract IP */ const getIp = (req) => { - if (req.get('CF-Connecting-IP')) - return req.get('CF-Connecting-IP'); - if (req.get('x-forwarded-for')) - return req.get('x-forwarded-for').split(',')[0]; - return req.connection.remoteAddress + if (req.get('CF-Connecting-IP')) return req.get('CF-Connecting-IP'); + if (req.get('x-forwarded-for')) return req.get('x-forwarded-for').split(',')[0]; + return req.connection.remoteAddress; }; /* Route to send a 302 to another server */ RoutesTranscode.redirect = async (req, res) => { - const session = SessionsManager.getSessionFromRequest(req); - const server = await SessionsManager.chooseServer(session, getIp(req)); - if (server) { - res.redirect(302, server + req.url); - D('REDIRECT ' + session + ' [' + server + ']'); - } else { - res.status(500).send({ error: { code: 'SERVER_UNAVAILABLE', message: 'SERVER_UNAVAILABLE' } }); - D('REDIRECT ' + session + ' [UNKNOWN]'); - } + const session = SessionsManager.getSessionFromRequest(req); + const server = await SessionsManager.chooseServer(session, getIp(req)); + if (server) { + res.redirect(302, server + req.url); + D('REDIRECT ' + session + ' [' + server + ']'); + } else { + res.status(500).send({ + error: { code: 'SERVER_UNAVAILABLE', message: 'SERVER_UNAVAILABLE' }, + }); + D('REDIRECT ' + session + ' [UNKNOWN]'); + } }; /* Route called when a DASH stream starts */ RoutesTranscode.dashStart = (req, res) => { - // By default we don't have the session identifier - let sessionId = false; + // By default we don't have the session identifier + let sessionId = false; - // If we have a cached X-Plex-Session-Identifier, we use it - if (req.query['X-Plex-Session-Identifier'] && SessionsManager.getCacheSession(req.query['X-Plex-Session-Identifier'])) - sessionId = SessionsManager.getCacheSession(req.query['X-Plex-Session-Identifier']); + // If we have a cached X-Plex-Session-Identifier, we use it + if ( + req.query['X-Plex-Session-Identifier'] && + SessionsManager.getCacheSession(req.query['X-Plex-Session-Identifier']) + ) + sessionId = SessionsManager.getCacheSession(req.query['X-Plex-Session-Identifier']); - // Log - D('START ' + SessionsManager.getSessionFromRequest(req) + ' [DASH]'); + // Log + D('START ' + SessionsManager.getSessionFromRequest(req) + ' [DASH]'); - // Save session - SessionsManager.cacheSessionFromRequest(req); + // Save session + SessionsManager.cacheSessionFromRequest(req); - // If session id available - if (sessionId) - SessionsManager.cleanSession(sessionId); + // If session id available + if (sessionId) SessionsManager.cleanSession(sessionId); - // Redirect - RoutesTranscode.redirect(req, res); -} + // Redirect + RoutesTranscode.redirect(req, res); +}; /* Routes called when a long polling stream starts */ RoutesTranscode.lpStart = (req, res) => { - // Save session - SessionsManager.cacheSessionFromRequest(req); + // Save session + SessionsManager.cacheSessionFromRequest(req); - // Get sessionId - const sessionId = SessionsManager.getSessionFromRequest(req); + // Get sessionId + const sessionId = SessionsManager.getSessionFromRequest(req); - // Log - D('START ' + sessionId + ' [LP]'); + // Log + D('START ' + sessionId + ' [LP]'); - // If sessionId is defined - if (sessionId) - SessionsManager.cleanSession(sessionId); + // If sessionId is defined + if (sessionId) SessionsManager.cleanSession(sessionId); - // Redirect - RoutesTranscode.redirect(req, res); -} + // Redirect + RoutesTranscode.redirect(req, res); +}; /* Route called when a HLS stream starts */ RoutesTranscode.hlsStart = (req, res) => { - // Proxy to Plex - RoutesProxy.plex(req, res); + // Proxy to Plex + RoutesProxy.plex(req, res); - // Save session - SessionsManager.cacheSessionFromRequest(req); + // Save session + SessionsManager.cacheSessionFromRequest(req); - // Get sessionId - const sessionId = SessionsManager.getSessionFromRequest(req); + // Get sessionId + const sessionId = SessionsManager.getSessionFromRequest(req); - // Log - D('START ' + sessionId + ' [HLS]'); + // Log + D('START ' + sessionId + ' [HLS]'); - // If sessionId is defined - if (sessionId) - SessionsManager.cleanSession(sessionId); + // If sessionId is defined + if (sessionId) SessionsManager.cleanSession(sessionId); }; /* Route ping */ RoutesTranscode.ping = async (req, res) => { - // Proxy to Plex - RoutesProxy.plex(req, res); + // Proxy to Plex + RoutesProxy.plex(req, res); + + // Extract sessionId from request parameter + const sessionId = SessionsManager.getSessionFromRequest(req); + + // Choose or get the server url + const serverUrl = await SessionsManager.chooseServer(sessionId, getIp(req)); + + // If a server url is defined, we ping the session + if (serverUrl) { + D('PING ' + sessionId + ' [' + serverUrl + ']'); + fetch(serverUrl + '/api/ping?session=' + sessionId); + } else { + D('PING ' + sessionId + ' [UNKNOWN]'); + } +}; + +/* Route timeline */ +RoutesTranscode.timeline = async (req, res) => { + // Proxy to Plex + RoutesProxy.plex(req, res); - // Extract sessionId from request parameter - const sessionId = SessionsManager.getSessionFromRequest(req); + // Extract sessionId from request parameter + const sessionId = SessionsManager.getSessionFromRequest(req); - // Choose or get the server url - const serverUrl = await SessionsManager.chooseServer(sessionId, getIp(req)); + // Choose or get the server url + const serverUrl = await SessionsManager.chooseServer(sessionId, getIp(req)); - // If a server url is defined, we ping the session + // It's a stop request + if (req.query.state === 'stopped') { + // If a server url is defined, we stop the session + if (serverUrl) { + D('STOP ' + sessionId + ' [' + serverUrl + ']'); + fetch(serverUrl + '/api/stop?session=' + sessionId); + } else { + D('STOP ' + sessionId + ' [UNKNOWN]'); + } + } + // It's a ping request + else { if (serverUrl) { - D('PING ' + sessionId + ' [' + serverUrl + ']'); - fetch(serverUrl + '/api/ping?session=' + sessionId); + D('PING ' + sessionId + ' [' + serverUrl + ']'); + fetch(serverUrl + '/api/ping?session=' + sessionId); } else { - D('PING ' + sessionId + ' [UNKNOWN]'); + D('PING ' + sessionId + ' [UNKNOWN]'); } + } }; -/* Route timeline */ -RoutesTranscode.timeline = async (req, res) => { - // Proxy to Plex - RoutesProxy.plex(req, res); - - // Extract sessionId from request parameter - const sessionId = SessionsManager.getSessionFromRequest(req); - - // Choose or get the server url - const serverUrl = await SessionsManager.chooseServer(sessionId, getIp(req)); - - // It's a stop request - if (req.query.state === 'stopped') { - // If a server url is defined, we stop the session - if (serverUrl) { - D('STOP ' + sessionId + ' [' + serverUrl + ']'); - fetch(serverUrl + '/api/stop?session=' + sessionId); - } else { - D('STOP ' + sessionId + ' [UNKNOWN]'); - } - } - // It's a ping request - else { - if (serverUrl) { - D('PING ' + sessionId + ' [' + serverUrl + ']'); - fetch(serverUrl + '/api/ping?session=' + sessionId); - } else { - D('PING ' + sessionId + ' [UNKNOWN]'); - } - } +RoutesTranscode.progress = (req, res) => { + RoutesProxy.plex(req, res); + + D('Progress ' + req.method); + D('\tURL: ' + req.originalUrl); + D('\tBody: ' + req.body); }; /* Route stop */ RoutesTranscode.stop = async (req, res) => { - // Proxy to plex - RoutesProxy.plex(req, res); - - // Extract sessionId from request parameter - const sessionId = SessionsManager.getSessionFromRequest(req); + // Proxy to plex + RoutesProxy.plex(req, res); + + // Extract sessionId from request parameter + const sessionId = SessionsManager.getSessionFromRequest(req); + + // Choose or get the server url + const serverUrl = await SessionsManager.chooseServer(sessionId, getIp(req)); + + // If a server url is defined, we stop the session + if (serverUrl) { + D('STOP ' + sessionId + ' [' + serverUrl + ']'); + fetch(serverUrl + '/api/stop?session=' + sessionId); + } else { + D('STOP ' + sessionId + ' [UNKNOWN]'); + } +}; - // Choose or get the server url - const serverUrl = await SessionsManager.chooseServer(sessionId, getIp(req)); +/* Route download */ +RoutesTranscode.download = async (req, res) => { + let data; + try { + data = await Database.getPartFromId(req.params.id1); + } catch (ex) { + if (ex !== 'FILE_NOT_FOUND') { + // rethrow; we don't handle this + throw ex; + } - // If a server url is defined, we stop the session - if (serverUrl) { - D('STOP ' + sessionId + ' [' + serverUrl + ']'); - fetch(serverUrl + '/api/stop?session=' + sessionId); + res.status(400).send({ error: { code: 'NOT_FOUND', message: 'File not available' } }); + return; + } + + try { + const awsUrl = await AWS.getSignedUrlForFile(data.file); + D('DOWNLOAD ' + req.params.id1 + ' [AWS]'); + res.redirect(302, awsUrl); + return; + } catch (ex) { + if (ex instanceof Error) { + switch (ex.message) { + case 'AWS is still initializing': + // Retry in 3 seconds + await new Promise((resolve) => { + setTimeout(resolve, 3000); + }); + return RoutesTranscode.download(req, res); + break; + case 'No AWS signing services are available': + D('ERROR: DOWNLOAD ' + req.params.id1 + ' [AWS]: No signing services available'); + break; + case 'AWS S3 mount path is not configured': + // Ignore; AWS just isn't configured + break; + default: + D('ERROR: DOWNLOAD ' + req.params.id1 + ' [AWS]: ' + ex.message); + } } else { - D('STOP ' + sessionId + ' [UNKNOWN]'); + D('ERROR: DOWNLOAD ' + req.params.id1 + ' [AWS]: ' + ex); } + } + + // AWS can't handle the download; check if we should send the download to a transcoder + if (config.custom.download.forward) { + return RoutesTranscode.redirect(req, res); + } + + // We don't send downloads to transcoders; send the file ourselves + D('DOWNLOAD ' + req.params.id1 + ' [LB]'); + const sendFilePromise = new Promise((resolve, reject) => { + res.sendFile(data.file, {}, (err) => { + if (err) { + reject(err); + return; + } + + resolve(); + }); + }); + + try { + await sendFilePromise; + } catch (ex) { + if (ex.code !== 'ECONNABORTED') { + // rethrow; we don't handle this + D('DOWNLOAD FAILED ' + req.params.id1 + ' [LB]: ' + ex); + throw ex; + } + } }; -/* Route download */ -RoutesTranscode.download = (req, res) => { - D('DOWNLOAD ' + req.params.id1 + ' [LB]'); - Database.getPartFromId(req.params.id1).then((data) => { - res.sendFile(data.file, {}, (err) => { - if (err && err.code !== 'ECONNABORTED') - D('DOWNLOAD FAILED ' + req.params.id1 + ' [LB]'); - }) - }).catch((err) => { - res.status(400).send({ error: { code: 'NOT_FOUND', message: 'File not available' } }); - }) -} - export default RoutesTranscode; diff --git a/src/store/index.js b/src/store/index.js index b3cad86..22654bf 100644 --- a/src/store/index.js +++ b/src/store/index.js @@ -9,12 +9,12 @@ const D = debug('UnicornLoadBalancer'); let SessionStore; if (config.redis.host !== 'undefined') { - D('Using redis as session store'); - SessionStore = new RedisSessionStore(); + D('Using redis as session store'); + SessionStore = new RedisSessionStore(); } else { - D('Redis not found, fallback on LocalSessionStore'); - D('WARNING: On restart all sessions will be lost'); - SessionStore = new LocalSessionStore(); + D('Redis not found, fallback on LocalSessionStore'); + D('WARNING: On restart all sessions will be lost'); + SessionStore = new LocalSessionStore(); } export default SessionStore; diff --git a/src/store/local.js b/src/store/local.js index efa6993..5154a40 100644 --- a/src/store/local.js +++ b/src/store/local.js @@ -1,64 +1,63 @@ import EventEmitter from 'events'; class LocalSessionStore { - constructor() { - this.sessionEvents = new EventEmitter(); - this.sessionStore = {}; - } - - /** - * Get a session, or wait for it for 10s - * @param sessionId - * @returns {Promise} - */ - get(sessionId) { - return new Promise((resolve, reject) => { - if (sessionId in this.sessionStore) - return resolve(this.sessionStore[sessionId]); - - let timeout = null; - - let eventCb = (...args) => { - clearTimeout(timeout); - this.sessionEvents.removeListener(sessionId, eventCb); - resolve(...args); - }; - - let timeoutCb = () => { - this.sessionEvents.removeListener(sessionId, eventCb); - reject('timeout'); - }; - - timeout = setTimeout(timeoutCb, 20000); - this.sessionEvents.on(sessionId, eventCb); - }) - } - - /** - * Store a value in the store and trigger the pending gets - * @param sessionId - * @param value - * @returns {Promise} - */ - set(sessionId, value) { - return new Promise((resolve) => { - this.sessionStore[sessionId] = value; - this.sessionEvents.emit(sessionId, value); - resolve('OK'); - }) - } - - /** - * Delete a session from the store - * @param sessionId - * @returns {Promise} - */ - delete(sessionId) { - return new Promise((resolve) => { - delete this.sessionStore[sessionId]; - resolve('OK'); - }) - } + constructor() { + this.sessionEvents = new EventEmitter(); + this.sessionStore = {}; + } + + /** + * Get a session, or wait for it for 10s + * @param sessionId + * @returns {Promise} + */ + get(sessionId) { + return new Promise((resolve, reject) => { + if (sessionId in this.sessionStore) return resolve(this.sessionStore[sessionId]); + + let timeout = null; + + let eventCb = (...args) => { + clearTimeout(timeout); + this.sessionEvents.removeListener(sessionId, eventCb); + resolve(...args); + }; + + let timeoutCb = () => { + this.sessionEvents.removeListener(sessionId, eventCb); + reject('timeout'); + }; + + timeout = setTimeout(timeoutCb, 20000); + this.sessionEvents.on(sessionId, eventCb); + }); + } + + /** + * Store a value in the store and trigger the pending gets + * @param sessionId + * @param value + * @returns {Promise} + */ + set(sessionId, value) { + return new Promise((resolve) => { + this.sessionStore[sessionId] = value; + this.sessionEvents.emit(sessionId, value); + resolve('OK'); + }); + } + + /** + * Delete a session from the store + * @param sessionId + * @returns {Promise} + */ + delete(sessionId) { + return new Promise((resolve) => { + delete this.sessionStore[sessionId]; + resolve('OK'); + }); + } } -export default LocalSessionStore; \ No newline at end of file +export default LocalSessionStore; diff --git a/src/store/redis.js b/src/store/redis.js index c6fc52c..6a6245c 100644 --- a/src/store/redis.js +++ b/src/store/redis.js @@ -1,89 +1,83 @@ -import {getRedisClient} from '../utils'; -import config from "../config"; +import { getRedisClient } from '../utils'; +import config from '../config'; class RedisSessionStore { - constructor() { - this.redis = getRedisClient(); - this.redisSubscriber = this.redis.duplicate(); - } + constructor() { + this.redis = getRedisClient(); + this.redisSubscriber = this.redis.duplicate(); + } - _parseSession(session) { - return new Promise((resolve, reject) => { - try { - resolve(JSON.parse(session)) - } catch(err) { - reject(err) - } - }) - } + _parseSession(session) { + return new Promise((resolve, reject) => { + try { + resolve(JSON.parse(session)); + } catch (err) { + reject(err); + } + }); + } - /** - * Get a session, or wait for it for 10s - * @param sessionId - * @returns {Promise} - */ - get(sessionId) { - return new Promise((resolve, reject) => { - this.redis.get(sessionId, (err, session) => { - if (err) - return reject(err); - if (session != null) - return resolve(this._parseSession(session)); + /** + * Get a session, or wait for it for 10s + * @param sessionId + * @returns {Promise} + */ + get(sessionId) { + return new Promise((resolve, reject) => { + this.redis.get(sessionId, (err, session) => { + if (err) return reject(err); + if (session != null) return resolve(this._parseSession(session)); - let redisSubKey = "__keyspace@" + config.redis.db + "__:" + sessionId; + let redisSubKey = '__keyspace@' + config.redis.db + '__:' + sessionId; - let timeout = setTimeout(() => { - this.redisSubscriber.unsubscribe(redisSubKey); - reject('timeout'); - }, 20000); + let timeout = setTimeout(() => { + this.redisSubscriber.unsubscribe(redisSubKey); + reject('timeout'); + }, 20000); - this.redisSubscriber.on("message", (eventKey, action) => { - if (action !== 'set' || eventKey !== redisSubKey) - return; + this.redisSubscriber.on('message', (eventKey, action) => { + if (action !== 'set' || eventKey !== redisSubKey) return; - clearTimeout(timeout); - this.redisSubscriber.unsubscribe(redisSubKey); - this.redis.get(sessionId, (err, session) => { - if (err) - return reject(err); - return resolve(this._parseSession(session)); - }) - }); - this.redisSubscriber.subscribe(redisSubKey) - }) - }) - } + clearTimeout(timeout); + this.redisSubscriber.unsubscribe(redisSubKey); + this.redis.get(sessionId, (err, session) => { + if (err) return reject(err); + return resolve(this._parseSession(session)); + }); + }); + this.redisSubscriber.subscribe(redisSubKey); + }); + }); + } - /** - * Store a value in the store and trigger the pending gets - * @param sessionId - * @param value - * @returns {Promise} - */ - set(sessionId, value) { - return new Promise((resolve, reject) => { - this.redis.set(sessionId, JSON.stringify(value), (err) => { - if (err) - return reject(err); - resolve('OK'); - }) - }) - } + /** + * Store a value in the store and trigger the pending gets + * @param sessionId + * @param value + * @returns {Promise} + */ + set(sessionId, value) { + return new Promise((resolve, reject) => { + this.redis.set(sessionId, JSON.stringify(value), (err) => { + if (err) return reject(err); + resolve('OK'); + }); + }); + } - /** - * Delete a session from the store - * @param sessionId - * @returns {Promise} - */ - delete(sessionId) { - return new Promise((resolve, reject) => { - this.redis.del(sessionId, (err) => { - if (err) - return reject(err); - resolve('OK') - }) - }) - } + /** + * Delete a session from the store + * @param sessionId + * @returns {Promise} + */ + delete(sessionId) { + return new Promise((resolve, reject) => { + this.redis.del(sessionId, (err) => { + if (err) return reject(err); + resolve('OK'); + }); + }); + } } -export default RedisSessionStore; \ No newline at end of file +export default RedisSessionStore; diff --git a/src/utils.js b/src/utils.js index 1f39ad0..50fe038 100644 --- a/src/utils.js +++ b/src/utils.js @@ -3,32 +3,31 @@ import redisClient from 'redis'; import config from './config'; export const publicUrl = () => { - return (config.server.public) + return config.server.public; }; export const internalUrl = () => { - return ('http://127.0.0.1:' + config.server.port + '/') + return 'http://127.0.0.1:' + config.server.port + '/'; }; export const plexUrl = () => { - return ('http://' + config.plex.host + ':' + config.plex.port + '/') + return 'http://' + config.plex.host + ':' + config.plex.port + '/'; }; export const getRedisClient = () => { - if (config.redis.password === '') - delete config.redis.password; - - let redis = redisClient.createClient(config.redis); - redis.on('error', (err) => { - if (err.errno === 'ECONNREFUSED') - return console.error('Failed to connect to REDIS, please check your configuration'); - return console.error(err.errno); - }); - - redis.on('connect', () => { - redis.send_command('config', ['set', 'notify-keyspace-events', 'KEA']) - }); - return redis; + if (config.redis.password === '') delete config.redis.password; + + let redis = redisClient.createClient(config.redis); + redis.on('error', (err) => { + if (err.errno === 'ECONNREFUSED') + return console.error('Failed to connect to REDIS, please check your configuration'); + return console.error(err.errno); + }); + + redis.on('connect', () => { + redis.send_command('config', ['set', 'notify-keyspace-events', 'KEA']); + }); + return redis; }; -export const time = () => (Math.floor((new Date().getTime()) / 1000)); \ No newline at end of file +export const time = () => Math.floor(new Date().getTime() / 1000);