diff options
| author | Max Isom <[email protected]> | 2021-11-28 19:42:32 -0500 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-11-28 19:42:32 -0500 |
| commit | 9afca25866830dacc668b1861bab5c12260de639 (patch) | |
| tree | 282a853f37e21af72d355c38a8acca480011d82b /src/services | |
| parent | f146a2a57c83aa37b4601cf093acb1cfa700a41e (diff) | |
| parent | 9f9469f682e68c45d2c20251bfa5726e7c415b80 (diff) | |
| download | muse-9afca25866830dacc668b1861bab5c12260de639.tar.xz muse-9afca25866830dacc668b1861bab5c12260de639.zip | |
Merge pull request #415 from codetheweb/feature/better-file-caching
Diffstat (limited to 'src/services')
| -rw-r--r-- | src/services/config.ts | 12 | ||||
| -rw-r--r-- | src/services/file-cache.ts | 117 | ||||
| -rw-r--r-- | src/services/get-songs.ts | 6 | ||||
| -rw-r--r-- | src/services/key-value-cache.ts (renamed from src/services/cache.ts) | 8 | ||||
| -rw-r--r-- | src/services/player.ts | 71 |
5 files changed, 157 insertions, 57 deletions
diff --git a/src/services/config.ts b/src/services/config.ts index 759a27a3..96b161c 100644 --- a/src/services/config.ts +++ b/src/services/config.ts @@ -1,6 +1,8 @@ import dotenv from 'dotenv'; import {injectable} from 'inversify'; import path from 'path'; +import xbytes from 'xbytes'; +import {ConditionalKeys} from 'type-fest'; dotenv.config(); export const DATA_DIR = path.resolve(process.env.DATA_DIR ? process.env.DATA_DIR : './data'); @@ -12,6 +14,7 @@ const CONFIG_MAP = { SPOTIFY_CLIENT_SECRET: process.env.SPOTIFY_CLIENT_SECRET, DATA_DIR, CACHE_DIR: path.join(DATA_DIR, 'cache'), + CACHE_LIMIT_IN_BYTES: xbytes.parseSize(process.env.CACHE_LIMIT ?? '2GB'), } as const; @injectable() @@ -22,6 +25,7 @@ export default class Config { readonly SPOTIFY_CLIENT_SECRET!: string; readonly DATA_DIR!: string; readonly CACHE_DIR!: string; + readonly CACHE_LIMIT_IN_BYTES!: number; constructor() { for (const [key, value] of Object.entries(CONFIG_MAP)) { @@ -30,7 +34,13 @@ export default class Config { process.exit(1); } - this[key as keyof typeof CONFIG_MAP] = value; + if (typeof value === 'number') { + this[key as ConditionalKeys<typeof CONFIG_MAP, number>] = value; + } else if (typeof value === 'string') { + this[key as ConditionalKeys<typeof CONFIG_MAP, string>] = value; + } else { + throw new Error(`Unsupported type for ${key}`); + } } } } diff --git a/src/services/file-cache.ts b/src/services/file-cache.ts new file mode 100644 index 0000000..fa4f1f7 --- /dev/null +++ b/src/services/file-cache.ts @@ -0,0 +1,117 @@ +import {promises as fs, createWriteStream} from 'fs'; +import path from 'path'; +import {inject, injectable} from 'inversify'; +import sequelize from 'sequelize'; +import {FileCache} from '../models/index.js'; +import {TYPES} from '../types.js'; +import Config from './config.js'; + +@injectable() +export default class FileCacheProvider { + private readonly config: Config; + + constructor(@inject(TYPES.Config) config: Config) { + this.config = config; + } + + /** + * Returns path to cached file if it exists, otherwise throws an error. + * Updates the `accessedAt` property of the cached file. + * @param hash lookup key + */ + async getPathFor(hash: string): Promise<string> { + const model = await FileCache.findByPk(hash); + + if (!model) { + throw new Error('File is not cached'); + } + + const resolvedPath = path.join(this.config.CACHE_DIR, hash); + + try { + await fs.access(resolvedPath); + } catch (_: unknown) { + await FileCache.destroy({where: {hash}}); + + throw new Error('File is not cached'); + } + + await model.update({accessedAt: new Date()}); + + return resolvedPath; + } + + /** + * Returns a write stream for the given hash key. + * The stream handles saving a new file and will + * update the database after the stream is closed. + * @param hash lookup key + */ + createWriteStream(hash: string) { + const tmpPath = path.join(this.config.CACHE_DIR, 'tmp', hash); + const finalPath = path.join(this.config.CACHE_DIR, hash); + + const stream = createWriteStream(tmpPath); + + stream.on('close', async () => { + // Only move if size is non-zero (may have errored out) + const stats = await fs.stat(tmpPath); + + if (stats.size !== 0) { + await fs.rename(tmpPath, finalPath); + } + + await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()}); + + await this.evictOldestIfNecessary(); + }); + + return stream; + } + + /** + * Deletes orphaned cache files and evicts files if + * necessary. Should be run on program startup so files + * will be evicted if the cache limit has changed. + */ + async cleanup() { + await this.removeOrphans(); + await this.evictOldestIfNecessary(); + } + + private async evictOldestIfNecessary() { + const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({ + attributes: [ + [sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'], + ], + }) as unknown as [{dataValues: {totalSizeBytes: number}}]; + + if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) { + const oldest = await FileCache.findOne({ + order: [ + ['accessedAt', 'ASC'], + ], + }); + + if (oldest) { + await oldest.destroy(); + await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash)); + } + + // Continue to evict until we're under the limit + await this.evictOldestIfNecessary(); + } + } + + private async removeOrphans() { + for await (const dirent of await fs.opendir(this.config.CACHE_DIR)) { + if (dirent.isFile()) { + const model = await FileCache.findByPk(dirent.name); + + if (!model) { + await fs.unlink(path.join(this.config.CACHE_DIR, dirent.name)); + } + } + } + } +} diff --git a/src/services/get-songs.ts b/src/services/get-songs.ts index ba4401c..e7e3d5a 100644 --- a/src/services/get-songs.ts +++ b/src/services/get-songs.ts @@ -14,7 +14,7 @@ import {TYPES} from '../types.js'; import {cleanUrl} from '../utils/url.js'; import ThirdParty from './third-party.js'; import Config from './config.js'; -import CacheProvider from './cache.js'; +import KeyValueCacheProvider from './key-value-cache.js'; type QueuedSongWithoutChannel = Except<QueuedSong, 'addedInChannelId'>; @@ -26,14 +26,14 @@ export default class { private readonly youtube: YouTube; private readonly youtubeKey: string; private readonly spotify: Spotify; - private readonly cache: CacheProvider; + private readonly cache: KeyValueCacheProvider; private readonly ytsrQueue: PQueue; constructor( @inject(TYPES.ThirdParty) thirdParty: ThirdParty, @inject(TYPES.Config) config: Config, - @inject(TYPES.Cache) cache: CacheProvider) { + @inject(TYPES.KeyValueCache) cache: KeyValueCacheProvider) { this.youtube = thirdParty.youtube; this.youtubeKey = config.YOUTUBE_API_KEY; this.spotify = thirdParty.spotify; diff --git a/src/services/cache.ts b/src/services/key-value-cache.ts index e877c56..7f1164d 100644 --- a/src/services/cache.ts +++ b/src/services/key-value-cache.ts @@ -1,5 +1,5 @@ import {injectable} from 'inversify'; -import {Cache} from '../models/index.js'; +import {KeyValueCache} from '../models/index.js'; import debug from '../utils/debug.js'; type Seconds = number; @@ -12,7 +12,7 @@ type Options = { const futureTimeToDate = (time: Seconds) => new Date(new Date().getTime() + (time * 1000)); @injectable() -export default class CacheProvider { +export default class KeyValueCacheProvider { async wrap<T extends [...any[], Options], F>(func: (...options: any) => Promise<F>, ...options: T): Promise<F> { if (options.length === 0) { throw new Error('Missing cache options'); @@ -29,7 +29,7 @@ export default class CacheProvider { throw new Error(`Cache key ${key} is too short.`); } - const cachedResult = await Cache.findByPk(key); + const cachedResult = await KeyValueCache.findByPk(key); if (cachedResult) { if (new Date() < cachedResult.expiresAt) { @@ -45,7 +45,7 @@ export default class CacheProvider { const result = await func(...options as any[]); // Save result - await Cache.upsert({ + await KeyValueCache.upsert({ key, value: JSON.stringify(result), expiresAt: futureTimeToDate(expiresIn), diff --git a/src/services/player.ts b/src/services/player.ts index 33cb4c2..67b8b38 100644 --- a/src/services/player.ts +++ b/src/services/player.ts @@ -1,7 +1,5 @@ import {VoiceChannel, Snowflake, Client, TextChannel} from 'discord.js'; -import {promises as fs, createWriteStream} from 'fs'; -import {Readable, PassThrough} from 'stream'; -import path from 'path'; +import {Readable} from 'stream'; import hasha from 'hasha'; import ytdl from 'ytdl-core'; import {WriteStream} from 'fs-capacitor'; @@ -9,6 +7,7 @@ import ffmpeg from 'fluent-ffmpeg'; import shuffle from 'array-shuffle'; import errorMsg from '../utils/error-msg.js'; import {AudioPlayer, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus} from '@discordjs/voice'; +import FileCacheProvider from './file-cache.js'; export interface QueuedPlaylist { title: string; @@ -35,7 +34,6 @@ export default class { public voiceConnection: VoiceConnection | null = null; private queue: QueuedSong[] = []; private queuePosition = 0; - private readonly cacheDir: string; private audioPlayer: AudioPlayer | null = null; private nowPlaying: QueuedSong | null = null; private playPositionInterval: NodeJS.Timeout | undefined; @@ -44,10 +42,11 @@ export default class { private positionInSeconds = 0; private readonly discordClient: Client; + private readonly fileCache: FileCacheProvider; - constructor(cacheDir: string, client: Client) { - this.cacheDir = cacheDir; + constructor(client: Client, fileCache: FileCacheProvider) { this.discordClient = client; + this.fileCache = fileCache; } async connect(channel: VoiceChannel): Promise<void> { @@ -287,40 +286,24 @@ export default class { return this.queueSize() === 0; } - private getCachedPath(url: string): string { - return path.join(this.cacheDir, hasha(url)); - } - - private getCachedPathTemp(url: string): string { - return path.join(this.cacheDir, 'tmp', hasha(url)); - } - - private async isCached(url: string): Promise<boolean> { - try { - await fs.access(this.getCachedPath(url)); - - return true; - } catch (_: unknown) { - return false; - } + private getHashForCache(url: string): string { + return hasha(url); } private async getStream(url: string, options: {seek?: number} = {}): Promise<Readable> { - const cachedPath = this.getCachedPath(url); - let ffmpegInput = ''; const ffmpegInputOptions: string[] = []; let shouldCacheVideo = false; let format: ytdl.videoFormat | undefined; - if (await this.isCached(url)) { - ffmpegInput = cachedPath; + try { + ffmpegInput = await this.fileCache.getPathFor(this.getHashForCache(url)); if (options.seek) { ffmpegInputOptions.push('-ss', options.seek.toString()); } - } else { + } catch { // Not yet cached, must download const info = await ytdl.getInfo(url); @@ -371,7 +354,6 @@ export default class { '1', '-reconnect_delay_max', '5', - '-re', ]); if (options.seek) { @@ -382,6 +364,17 @@ export default class { // Create stream and pipe to capacitor return new Promise((resolve, reject) => { + const capacitor = new WriteStream(); + + // Cache video if necessary + if (shouldCacheVideo) { + const cacheStream = this.fileCache.createWriteStream(this.getHashForCache(url)); + + capacitor.createReadStream().pipe(cacheStream); + } else { + ffmpegInputOptions.push('-re'); + } + const youtubeStream = ffmpeg(ffmpegInput) .inputOptions(ffmpegInputOptions) .noVideo() @@ -390,29 +383,9 @@ export default class { .on('error', error => { console.error(error); reject(error); - }) - .pipe() as PassThrough; - - const capacitor = new WriteStream(); - - youtubeStream.pipe(capacitor); - - // Cache video if necessary - if (shouldCacheVideo) { - const cacheTempPath = this.getCachedPathTemp(url); - const cacheStream = createWriteStream(cacheTempPath); - - cacheStream.on('finish', async () => { - // Only move if size is non-zero (may have errored out) - const stats = await fs.stat(cacheTempPath); - - if (stats.size !== 0) { - await fs.rename(cacheTempPath, cachedPath); - } }); - capacitor.createReadStream().pipe(cacheStream); - } + youtubeStream.pipe(capacitor); resolve(capacitor.createReadStream()); }); |
