aboutsummaryrefslogtreecommitdiff
path: root/src/services
diff options
context:
space:
mode:
authorMax Isom <[email protected]>2021-11-28 19:42:32 -0500
committerGitHub <[email protected]>2021-11-28 19:42:32 -0500
commit9afca25866830dacc668b1861bab5c12260de639 (patch)
tree282a853f37e21af72d355c38a8acca480011d82b /src/services
parentf146a2a57c83aa37b4601cf093acb1cfa700a41e (diff)
parent9f9469f682e68c45d2c20251bfa5726e7c415b80 (diff)
downloadmuse-9afca25866830dacc668b1861bab5c12260de639.tar.xz
muse-9afca25866830dacc668b1861bab5c12260de639.zip
Merge pull request #415 from codetheweb/feature/better-file-caching
Diffstat (limited to 'src/services')
-rw-r--r--src/services/config.ts12
-rw-r--r--src/services/file-cache.ts117
-rw-r--r--src/services/get-songs.ts6
-rw-r--r--src/services/key-value-cache.ts (renamed from src/services/cache.ts)8
-rw-r--r--src/services/player.ts71
5 files changed, 157 insertions, 57 deletions
diff --git a/src/services/config.ts b/src/services/config.ts
index 759a27a3..96b161c 100644
--- a/src/services/config.ts
+++ b/src/services/config.ts
@@ -1,6 +1,8 @@
import dotenv from 'dotenv';
import {injectable} from 'inversify';
import path from 'path';
+import xbytes from 'xbytes';
+import {ConditionalKeys} from 'type-fest';
dotenv.config();
export const DATA_DIR = path.resolve(process.env.DATA_DIR ? process.env.DATA_DIR : './data');
@@ -12,6 +14,7 @@ const CONFIG_MAP = {
SPOTIFY_CLIENT_SECRET: process.env.SPOTIFY_CLIENT_SECRET,
DATA_DIR,
CACHE_DIR: path.join(DATA_DIR, 'cache'),
+ CACHE_LIMIT_IN_BYTES: xbytes.parseSize(process.env.CACHE_LIMIT ?? '2GB'),
} as const;
@injectable()
@@ -22,6 +25,7 @@ export default class Config {
readonly SPOTIFY_CLIENT_SECRET!: string;
readonly DATA_DIR!: string;
readonly CACHE_DIR!: string;
+ readonly CACHE_LIMIT_IN_BYTES!: number;
constructor() {
for (const [key, value] of Object.entries(CONFIG_MAP)) {
@@ -30,7 +34,13 @@ export default class Config {
process.exit(1);
}
- this[key as keyof typeof CONFIG_MAP] = value;
+ if (typeof value === 'number') {
+ this[key as ConditionalKeys<typeof CONFIG_MAP, number>] = value;
+ } else if (typeof value === 'string') {
+ this[key as ConditionalKeys<typeof CONFIG_MAP, string>] = value;
+ } else {
+ throw new Error(`Unsupported type for ${key}`);
+ }
}
}
}
diff --git a/src/services/file-cache.ts b/src/services/file-cache.ts
new file mode 100644
index 0000000..fa4f1f7
--- /dev/null
+++ b/src/services/file-cache.ts
@@ -0,0 +1,117 @@
+import {promises as fs, createWriteStream} from 'fs';
+import path from 'path';
+import {inject, injectable} from 'inversify';
+import sequelize from 'sequelize';
+import {FileCache} from '../models/index.js';
+import {TYPES} from '../types.js';
+import Config from './config.js';
+
+@injectable()
+export default class FileCacheProvider {
+ private readonly config: Config;
+
+ constructor(@inject(TYPES.Config) config: Config) {
+ this.config = config;
+ }
+
+ /**
+ * Returns path to cached file if it exists, otherwise throws an error.
+ * Updates the `accessedAt` property of the cached file.
+ * @param hash lookup key
+ */
+ async getPathFor(hash: string): Promise<string> {
+ const model = await FileCache.findByPk(hash);
+
+ if (!model) {
+ throw new Error('File is not cached');
+ }
+
+ const resolvedPath = path.join(this.config.CACHE_DIR, hash);
+
+ try {
+ await fs.access(resolvedPath);
+ } catch (_: unknown) {
+ await FileCache.destroy({where: {hash}});
+
+ throw new Error('File is not cached');
+ }
+
+ await model.update({accessedAt: new Date()});
+
+ return resolvedPath;
+ }
+
+ /**
+ * Returns a write stream for the given hash key.
+ * The stream handles saving a new file and will
+ * update the database after the stream is closed.
+ * @param hash lookup key
+ */
+ createWriteStream(hash: string) {
+ const tmpPath = path.join(this.config.CACHE_DIR, 'tmp', hash);
+ const finalPath = path.join(this.config.CACHE_DIR, hash);
+
+ const stream = createWriteStream(tmpPath);
+
+ stream.on('close', async () => {
+ // Only move if size is non-zero (may have errored out)
+ const stats = await fs.stat(tmpPath);
+
+ if (stats.size !== 0) {
+ await fs.rename(tmpPath, finalPath);
+ }
+
+ await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()});
+
+ await this.evictOldestIfNecessary();
+ });
+
+ return stream;
+ }
+
+ /**
+ * Deletes orphaned cache files and evicts files if
+ * necessary. Should be run on program startup so files
+ * will be evicted if the cache limit has changed.
+ */
+ async cleanup() {
+ await this.removeOrphans();
+ await this.evictOldestIfNecessary();
+ }
+
+ private async evictOldestIfNecessary() {
+ const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({
+ attributes: [
+ [sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'],
+ ],
+ }) as unknown as [{dataValues: {totalSizeBytes: number}}];
+
+ if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) {
+ const oldest = await FileCache.findOne({
+ order: [
+ ['accessedAt', 'ASC'],
+ ],
+ });
+
+ if (oldest) {
+ await oldest.destroy();
+ await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash));
+ }
+
+ // Continue to evict until we're under the limit
+ await this.evictOldestIfNecessary();
+ }
+ }
+
+ private async removeOrphans() {
+ for await (const dirent of await fs.opendir(this.config.CACHE_DIR)) {
+ if (dirent.isFile()) {
+ const model = await FileCache.findByPk(dirent.name);
+
+ if (!model) {
+ await fs.unlink(path.join(this.config.CACHE_DIR, dirent.name));
+ }
+ }
+ }
+ }
+}
diff --git a/src/services/get-songs.ts b/src/services/get-songs.ts
index ba4401c..e7e3d5a 100644
--- a/src/services/get-songs.ts
+++ b/src/services/get-songs.ts
@@ -14,7 +14,7 @@ import {TYPES} from '../types.js';
import {cleanUrl} from '../utils/url.js';
import ThirdParty from './third-party.js';
import Config from './config.js';
-import CacheProvider from './cache.js';
+import KeyValueCacheProvider from './key-value-cache.js';
type QueuedSongWithoutChannel = Except<QueuedSong, 'addedInChannelId'>;
@@ -26,14 +26,14 @@ export default class {
private readonly youtube: YouTube;
private readonly youtubeKey: string;
private readonly spotify: Spotify;
- private readonly cache: CacheProvider;
+ private readonly cache: KeyValueCacheProvider;
private readonly ytsrQueue: PQueue;
constructor(
@inject(TYPES.ThirdParty) thirdParty: ThirdParty,
@inject(TYPES.Config) config: Config,
- @inject(TYPES.Cache) cache: CacheProvider) {
+ @inject(TYPES.KeyValueCache) cache: KeyValueCacheProvider) {
this.youtube = thirdParty.youtube;
this.youtubeKey = config.YOUTUBE_API_KEY;
this.spotify = thirdParty.spotify;
diff --git a/src/services/cache.ts b/src/services/key-value-cache.ts
index e877c56..7f1164d 100644
--- a/src/services/cache.ts
+++ b/src/services/key-value-cache.ts
@@ -1,5 +1,5 @@
import {injectable} from 'inversify';
-import {Cache} from '../models/index.js';
+import {KeyValueCache} from '../models/index.js';
import debug from '../utils/debug.js';
type Seconds = number;
@@ -12,7 +12,7 @@ type Options = {
const futureTimeToDate = (time: Seconds) => new Date(new Date().getTime() + (time * 1000));
@injectable()
-export default class CacheProvider {
+export default class KeyValueCacheProvider {
async wrap<T extends [...any[], Options], F>(func: (...options: any) => Promise<F>, ...options: T): Promise<F> {
if (options.length === 0) {
throw new Error('Missing cache options');
@@ -29,7 +29,7 @@ export default class CacheProvider {
throw new Error(`Cache key ${key} is too short.`);
}
- const cachedResult = await Cache.findByPk(key);
+ const cachedResult = await KeyValueCache.findByPk(key);
if (cachedResult) {
if (new Date() < cachedResult.expiresAt) {
@@ -45,7 +45,7 @@ export default class CacheProvider {
const result = await func(...options as any[]);
// Save result
- await Cache.upsert({
+ await KeyValueCache.upsert({
key,
value: JSON.stringify(result),
expiresAt: futureTimeToDate(expiresIn),
diff --git a/src/services/player.ts b/src/services/player.ts
index 33cb4c2..67b8b38 100644
--- a/src/services/player.ts
+++ b/src/services/player.ts
@@ -1,7 +1,5 @@
import {VoiceChannel, Snowflake, Client, TextChannel} from 'discord.js';
-import {promises as fs, createWriteStream} from 'fs';
-import {Readable, PassThrough} from 'stream';
-import path from 'path';
+import {Readable} from 'stream';
import hasha from 'hasha';
import ytdl from 'ytdl-core';
import {WriteStream} from 'fs-capacitor';
@@ -9,6 +7,7 @@ import ffmpeg from 'fluent-ffmpeg';
import shuffle from 'array-shuffle';
import errorMsg from '../utils/error-msg.js';
import {AudioPlayer, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus} from '@discordjs/voice';
+import FileCacheProvider from './file-cache.js';
export interface QueuedPlaylist {
title: string;
@@ -35,7 +34,6 @@ export default class {
public voiceConnection: VoiceConnection | null = null;
private queue: QueuedSong[] = [];
private queuePosition = 0;
- private readonly cacheDir: string;
private audioPlayer: AudioPlayer | null = null;
private nowPlaying: QueuedSong | null = null;
private playPositionInterval: NodeJS.Timeout | undefined;
@@ -44,10 +42,11 @@ export default class {
private positionInSeconds = 0;
private readonly discordClient: Client;
+ private readonly fileCache: FileCacheProvider;
- constructor(cacheDir: string, client: Client) {
- this.cacheDir = cacheDir;
+ constructor(client: Client, fileCache: FileCacheProvider) {
this.discordClient = client;
+ this.fileCache = fileCache;
}
async connect(channel: VoiceChannel): Promise<void> {
@@ -287,40 +286,24 @@ export default class {
return this.queueSize() === 0;
}
- private getCachedPath(url: string): string {
- return path.join(this.cacheDir, hasha(url));
- }
-
- private getCachedPathTemp(url: string): string {
- return path.join(this.cacheDir, 'tmp', hasha(url));
- }
-
- private async isCached(url: string): Promise<boolean> {
- try {
- await fs.access(this.getCachedPath(url));
-
- return true;
- } catch (_: unknown) {
- return false;
- }
+ private getHashForCache(url: string): string {
+ return hasha(url);
}
private async getStream(url: string, options: {seek?: number} = {}): Promise<Readable> {
- const cachedPath = this.getCachedPath(url);
-
let ffmpegInput = '';
const ffmpegInputOptions: string[] = [];
let shouldCacheVideo = false;
let format: ytdl.videoFormat | undefined;
- if (await this.isCached(url)) {
- ffmpegInput = cachedPath;
+ try {
+ ffmpegInput = await this.fileCache.getPathFor(this.getHashForCache(url));
if (options.seek) {
ffmpegInputOptions.push('-ss', options.seek.toString());
}
- } else {
+ } catch {
// Not yet cached, must download
const info = await ytdl.getInfo(url);
@@ -371,7 +354,6 @@ export default class {
'1',
'-reconnect_delay_max',
'5',
- '-re',
]);
if (options.seek) {
@@ -382,6 +364,17 @@ export default class {
// Create stream and pipe to capacitor
return new Promise((resolve, reject) => {
+ const capacitor = new WriteStream();
+
+ // Cache video if necessary
+ if (shouldCacheVideo) {
+ const cacheStream = this.fileCache.createWriteStream(this.getHashForCache(url));
+
+ capacitor.createReadStream().pipe(cacheStream);
+ } else {
+ ffmpegInputOptions.push('-re');
+ }
+
const youtubeStream = ffmpeg(ffmpegInput)
.inputOptions(ffmpegInputOptions)
.noVideo()
@@ -390,29 +383,9 @@ export default class {
.on('error', error => {
console.error(error);
reject(error);
- })
- .pipe() as PassThrough;
-
- const capacitor = new WriteStream();
-
- youtubeStream.pipe(capacitor);
-
- // Cache video if necessary
- if (shouldCacheVideo) {
- const cacheTempPath = this.getCachedPathTemp(url);
- const cacheStream = createWriteStream(cacheTempPath);
-
- cacheStream.on('finish', async () => {
- // Only move if size is non-zero (may have errored out)
- const stats = await fs.stat(cacheTempPath);
-
- if (stats.size !== 0) {
- await fs.rename(cacheTempPath, cachedPath);
- }
});
- capacitor.createReadStream().pipe(cacheStream);
- }
+ youtubeStream.pipe(capacitor);
resolve(capacitor.createReadStream());
});