refactor: unify resource and patch mirroring logic into a generic function

This commit is contained in:
daydreamer-json
2026-04-04 01:32:53 +09:00
parent c815a012c0
commit d2ebb64455

View File

@@ -139,27 +139,56 @@ async function processMirrorQueue(configAuth: any, client: Octokit) {
logger.info('Mirroring process completed and pending list cleared');
}
async function processMirrorResQueue(configAuth: any, client: Octokit) {
function getSelectedTag(tags: string[], dbs: { mirror: string }[][]) {
const regexp = /github\.com\/.+?\/.+?\/releases\/download\/(.+?)\//;
const uniqueMirrors = [...new Set(dbs.flat().map((e) => e.mirror))];
for (const tag of tags) {
if (uniqueMirrors.filter((m) => m.match(regexp)?.[1] === tag).length <= 997) return tag;
}
return false;
}
async function processGenericResQueue<
TAsset extends { size: number; url: string },
TEntry extends { mirror: string; chunk: { start: number; length: number } | null },
>(
client: Octokit,
owner: string,
repo: string,
tags: string[],
dbs: { current: TEntry[]; other: { mirror: string }[] },
paths: { db: string; pending: string },
options: {
isPatch: boolean;
chunkPrefix: string;
getBigFileName: (asset: TAsset) => string;
isDuplicate: (asset: TAsset, db: TEntry[]) => boolean;
pushToDb: (
asset: TAsset,
db: TEntry[],
mirror: string,
chunk: { start: number; length: number } | null,
) => void;
logPrefix: string;
downloadLogFn: (asset: TAsset, index: number, total: number, dataLength: number) => string;
bigFileDownloadLogFn: (asset: TAsset) => string;
},
) {
const ghFileSizeLimit = 2 * 1024 ** 3 - 1;
const owner = configAuth.github.relArchiveRes.owner;
const repo = configAuth.github.relArchiveRes.repo;
const outputDir = argvUtils.getArgv()['outputDir'];
const dbPath = path.join(outputDir, 'mirror_file_res_list.json.zst');
const patchDbPath = path.join(outputDir, 'mirror_file_res_patch_list.json.zst');
const pendingDbPath = path.join(outputDir, 'mirror_file_res_list_pending.json');
const patchPendingDbPath = path.join(outputDir, 'mirror_file_res_patch_list_pending.json');
const db: MirrorFileResEntry[] = JSON.parse(Bun.zstdDecompressSync(await Bun.file(dbPath).bytes()).toString('utf-8'));
const patchDb: MirrorFileResPatchEntry[] = JSON.parse(
Bun.zstdDecompressSync(await Bun.file(patchDbPath).bytes()).toString('utf-8'),
);
const pendingDb: AssetToMirrorRes[] = (await Bun.file(pendingDbPath).json()) ?? [];
const validPendingDb: AssetToMirrorRes[] = [];
const newPendingDb: AssetToMirrorRes[] = [];
const chunkThresholdSize = 500 * 1024 ** 2;
if (!(await Bun.file(paths.pending).exists())) return;
const pendingDb: TAsset[] = (await Bun.file(paths.pending).json()) ?? [];
const validPendingDb: TAsset[] = [];
const newPendingDb: TAsset[] = [];
for (const entry of pendingDb) {
if (db.some((e) => e.md5 === entry.md5)) continue;
if (options.isDuplicate(entry, dbs.current)) continue;
if (entry.size >= ghFileSizeLimit) {
logger.warn(`File size is larger than limit. Skipped: ${entry.name}`);
logger.warn(
`File size is larger than limit. Skipped${options.isPatch ? ' patch' : ''}: ${options.getBigFileName(entry)}`,
);
newPendingDb.push(entry);
continue;
}
@@ -167,48 +196,37 @@ async function processMirrorResQueue(configAuth: any, client: Octokit) {
}
if (validPendingDb.length === 0) {
logger.info('Res valid pending list is empty');
} else {
logger.info(`Processing ${validPendingDb.length} pending res ...`);
const getSelectedTag = () => {
const regexp = /github\.com\/.+?\/.+?\/releases\/download\/(.+?)\//;
for (const tag of configAuth.github.relArchiveRes.tags) {
if (
[...new Set([...db, ...patchDb].map((e) => e.mirror))].filter(
(e) => e.match(regexp) && e.match(regexp)![1] && e.match(regexp)![1] === tag,
).length <= 997
)
return tag as string;
logger.info(`${options.logPrefix} valid pending list is empty`);
await Bun.write(paths.pending, JSON.stringify([...newPendingDb], null, 2));
return;
}
return false;
};
if (!getSelectedTag()) {
logger.info(`Processing ${validPendingDb.length} pending ${options.logPrefix.toLowerCase()} ...`);
const tagPicker = () => getSelectedTag(tags, [dbs.current, dbs.other]);
if (!tagPicker()) {
logger.error('GitHub tag assets file count limit reached');
return;
}
const pendingFileChunkSizeLimit = ghFileSizeLimit;
const chunkThresholdSize = 500 * 1024 ** 2;
const pendingFileChunks = validPendingDb
.filter((e) => e.size < chunkThresholdSize)
.reduce(
const chunkedFiles = validPendingDb.filter((e) => e.size < chunkThresholdSize);
const pendingFileChunks = chunkedFiles.reduce(
(acc, item) => {
const lastChunk = acc.at(-1)!;
const currentChunkSize = lastChunk.reduce((sum, i) => sum + i.size, 0);
if (currentChunkSize + item.size <= pendingFileChunkSizeLimit) {
if (currentChunkSize + item.size <= ghFileSizeLimit) {
lastChunk.push(item);
} else {
acc.push([item]);
}
return acc;
},
[[]] as AssetToMirrorRes[][],
[[]] as TAsset[][],
);
if (pendingFileChunks.length === 1 && pendingFileChunks[0]!.length === 0) {
logger.info('Chunk upload skipped');
await Bun.write(pendingDbPath, JSON.stringify(validPendingDb, null, 2));
logger.info(`${options.isPatch ? 'Patch chunk' : 'Chunk'} upload skipped`);
} else {
for (const chunk of pendingFileChunks) {
const buffers: { index: number; data: Uint8Array }[] = [];
@@ -224,9 +242,7 @@ async function processMirrorResQueue(configAuth: any, client: Octokit) {
.bytes();
buffers.push({ index, data });
process.stdout.write('\x1b[1A\x1b[2K');
logger.trace(
`Downloaded: ${buffers.length.toString().padStart(chunk.length.toString().length, ' ')} / ${chunk.length}, ${new URL(e.url).pathname.split('/').at(-1)}, ${formatBytes(data.length)}`,
);
logger.trace(options.downloadLogFn(e, buffers.length, chunk.length, data.length));
});
});
await networkQueue.onIdle();
@@ -240,35 +256,31 @@ async function processMirrorResQueue(configAuth: any, client: Octokit) {
offset += item.data.length;
}
const combinedBufferMd5 = new Bun.CryptoHasher('md5').update(combinedBuffer).digest('hex');
const chunkFileName = `VFS_Chunk_${combinedBufferMd5}.bin`;
if (getSelectedTag() === false) throw new Error('GitHub tag assets file count limit reached');
await githubUtils.uploadAssetWithBuffer(
client,
owner,
repo,
getSelectedTag() as string,
chunkFileName,
combinedBuffer,
);
const chunkFileName = `${options.chunkPrefix}${combinedBufferMd5}.bin`;
const currentTag = tagPicker();
if (!currentTag) throw new Error('GitHub tag assets file count limit reached');
await githubUtils.uploadAssetWithBuffer(client, owner, repo, currentTag, chunkFileName, combinedBuffer);
offset = 0;
for (const item of chunk) {
db.push({
md5: item.md5,
mirror: `https://github.com/${owner}/${repo}/releases/download/${getSelectedTag()}/${chunkFileName}`,
chunk: { start: offset, length: item.size },
});
options.pushToDb(
item,
dbs.current,
`https://github.com/${owner}/${repo}/releases/download/${currentTag}/${chunkFileName}`,
{ start: offset, length: item.size },
);
offset += item.size;
}
await Bun.write(dbPath, Bun.zstdCompressSync(JSON.stringify(db), { level: 16 }));
await Bun.write(paths.db, Bun.zstdCompressSync(JSON.stringify(dbs.current), { level: 16 }));
}
}
const bigFiles = validPendingDb.filter((e) => e.size >= chunkThresholdSize);
await Bun.write(pendingDbPath, JSON.stringify([...newPendingDb, ...bigFiles], null, 2));
await Bun.write(paths.pending, JSON.stringify([...newPendingDb, ...bigFiles], null, 2));
{
if (bigFiles.length > 0) logger.info('Processing big pending res ...');
if (bigFiles.length > 0) {
logger.info(`Processing big pending ${options.logPrefix.toLowerCase()} ...`);
networkQueue.concurrency = 4;
for (const file of bigFiles) {
networkQueue.add(async () => {
@@ -279,36 +291,35 @@ async function processMirrorResQueue(configAuth: any, client: Octokit) {
retry: { limit: appConfig.network.retryCount },
})
.bytes();
logger.trace('Downloaded: ' + file.name);
if (getSelectedTag() === false) throw new Error('GitHub tag assets file count limit reached');
logger.trace(options.bigFileDownloadLogFn(file));
const currentTag = tagPicker();
if (!currentTag) throw new Error('GitHub tag assets file count limit reached');
await githubUtils.uploadAssetWithBuffer(client, owner, repo, getSelectedTag() as string, file.name, buffer);
db.push({
md5: file.md5,
mirror: `https://github.com/${owner}/${repo}/releases/download/${getSelectedTag()}/${file.name}`,
chunk: null,
});
await Bun.write(dbPath, Bun.zstdCompressSync(JSON.stringify(db), { level: 16 }));
const fileName = options.getBigFileName(file);
await githubUtils.uploadAssetWithBuffer(client, owner, repo, currentTag, fileName, buffer);
options.pushToDb(
file,
dbs.current,
`https://github.com/${owner}/${repo}/releases/download/${currentTag}/${fileName}`,
null,
);
await Bun.write(paths.db, Bun.zstdCompressSync(JSON.stringify(dbs.current), { level: 16 }));
});
}
await networkQueue.onIdle();
networkQueue.concurrency = appConfig.threadCount.network;
}
await Bun.write(paths.pending, JSON.stringify([...newPendingDb], null, 2));
}
await Bun.write(pendingDbPath, JSON.stringify([...newPendingDb], null, 2));
}
async function processMirrorResPatchQueue(configAuth: any, client: Octokit) {
const ghFileSizeLimit = 2 * 1024 ** 3 - 1;
async function processMirrorResQueue(configAuth: any, client: Octokit) {
const owner = configAuth.github.relArchiveRes.owner;
const repo = configAuth.github.relArchiveRes.repo;
const outputDir = argvUtils.getArgv()['outputDir'];
const dbPath = path.join(outputDir, 'mirror_file_res_list.json.zst');
const patchDbPath = path.join(outputDir, 'mirror_file_res_patch_list.json.zst');
const pendingDbPath = path.join(outputDir, 'mirror_file_res_patch_list_pending.json');
if (!(await Bun.file(pendingDbPath).exists())) return;
const pendingDbPath = path.join(outputDir, 'mirror_file_res_list_pending.json');
const db: MirrorFileResEntry[] = (await Bun.file(dbPath).exists())
? JSON.parse(Bun.zstdDecompressSync(await Bun.file(dbPath).bytes()).toString('utf-8'))
@@ -316,146 +327,66 @@ async function processMirrorResPatchQueue(configAuth: any, client: Octokit) {
const patchDb: MirrorFileResPatchEntry[] = (await Bun.file(patchDbPath).exists())
? JSON.parse(Bun.zstdDecompressSync(await Bun.file(patchDbPath).bytes()).toString('utf-8'))
: [];
const pendingDb: AssetToMirrorResPatch[] = (await Bun.file(pendingDbPath).json()) ?? [];
const validPendingDb: AssetToMirrorResPatch[] = [];
const newPendingDb: AssetToMirrorResPatch[] = [];
for (const entry of pendingDb) {
if (patchDb.some((e) => e.md5Old === entry.md5Old && e.md5New === entry.md5New)) continue;
if (entry.size >= ghFileSizeLimit) {
logger.warn(`File size is larger than limit. Skipped patch: ${entry.md5Old} -> ${entry.md5New}`);
newPendingDb.push(entry);
continue;
}
validPendingDb.push(entry);
}
if (validPendingDb.length === 0) {
logger.info('Res patch valid pending list is empty');
} else {
logger.info(`Processing ${validPendingDb.length} pending res patches ...`);
const getSelectedTag = () => {
const regexp = /github\.com\/.+?\/.+?\/releases\/download\/(.+?)\//;
for (const tag of configAuth.github.relArchiveRes.tags) {
if (
[...new Set([...db, ...patchDb].map((e) => e.mirror))].filter(
(e) => e.match(regexp) && e.match(regexp)![1] && e.match(regexp)![1] === tag,
).length <= 997
)
return tag as string;
}
return false;
};
if (!getSelectedTag()) {
logger.error('GitHub tag assets file count limit reached');
return;
}
const chunkThresholdSize = 500 * 1024 ** 2;
const pendingFileChunks = validPendingDb
.filter((e) => e.size < chunkThresholdSize)
.reduce(
(acc, item) => {
const lastChunk = acc.at(-1)!;
const currentChunkSize = lastChunk.reduce((sum, i) => sum + i.size, 0);
if (currentChunkSize + item.size <= ghFileSizeLimit) {
lastChunk.push(item);
} else {
acc.push([item]);
}
return acc;
await processGenericResQueue<AssetToMirrorRes, MirrorFileResEntry>(
client,
owner,
repo,
configAuth.github.relArchiveRes.tags,
{ current: db, other: patchDb },
{ db: dbPath, pending: pendingDbPath },
{
isPatch: false,
chunkPrefix: 'VFS_Chunk_',
getBigFileName: (file) => file.name,
isDuplicate: (entry, db) => db.some((e) => e.md5 === entry.md5),
pushToDb: (item, db, mirror, chunk) => {
db.push({ md5: item.md5, mirror, chunk });
},
logPrefix: 'Res',
downloadLogFn: (e, current, total, dataLength) =>
`Downloaded: ${current.toString().padStart(total.toString().length, ' ')} / ${total}, ${new URL(e.url).pathname.split('/').at(-1)}, ${formatBytes(dataLength)}`,
bigFileDownloadLogFn: (file) => `Downloaded: ${file.name}`,
},
[[]] as AssetToMirrorResPatch[][],
);
}
if (pendingFileChunks.length === 1 && pendingFileChunks[0]!.length === 0) {
logger.info('Patch chunk upload skipped');
} else {
for (const chunk of pendingFileChunks) {
const buffers: { index: number; data: Uint8Array }[] = [];
console.log('');
chunk.forEach((e, index) => {
networkQueue.add(async () => {
const data = await ky
.get(e.url, {
headers: { 'User-Agent': appConfig.network.userAgent.minimum },
timeout: appConfig.network.timeout,
retry: { limit: appConfig.network.retryCount },
})
.bytes();
buffers.push({ index, data });
process.stdout.write('\x1b[1A\x1b[2K');
logger.trace(
`Downloaded Patch: ${buffers.length.toString().padStart(chunk.length.toString().length, ' ')} / ${chunk.length}, ${e.md5Old.slice(0, 8)}... -> ${e.md5New.slice(0, 8)}..., ${formatBytes(data.length)}`,
async function processMirrorResPatchQueue(configAuth: any, client: Octokit) {
const owner = configAuth.github.relArchiveRes.owner;
const repo = configAuth.github.relArchiveRes.repo;
const outputDir = argvUtils.getArgv()['outputDir'];
const dbPath = path.join(outputDir, 'mirror_file_res_list.json.zst');
const patchDbPath = path.join(outputDir, 'mirror_file_res_patch_list.json.zst');
const pendingDbPath = path.join(outputDir, 'mirror_file_res_patch_list_pending.json');
const db: MirrorFileResEntry[] = (await Bun.file(dbPath).exists())
? JSON.parse(Bun.zstdDecompressSync(await Bun.file(dbPath).bytes()).toString('utf-8'))
: [];
const patchDb: MirrorFileResPatchEntry[] = (await Bun.file(patchDbPath).exists())
? JSON.parse(Bun.zstdDecompressSync(await Bun.file(patchDbPath).bytes()).toString('utf-8'))
: [];
await processGenericResQueue<AssetToMirrorResPatch, MirrorFileResPatchEntry>(
client,
owner,
repo,
configAuth.github.relArchiveRes.tags,
{ current: patchDb, other: db },
{ db: patchDbPath, pending: pendingDbPath },
{
isPatch: true,
chunkPrefix: 'VFS_Patch_Chunk_',
getBigFileName: (file) => `VFS_Patch_${file.md5Old}_${file.md5New}.bin`,
isDuplicate: (entry, db) => db.some((e) => e.md5Old === entry.md5Old && e.md5New === entry.md5New),
pushToDb: (item, db, mirror, chunk) => {
db.push({ md5Old: item.md5Old, md5New: item.md5New, mirror, chunk });
},
logPrefix: 'Res patch',
downloadLogFn: (e, current, total, dataLength) =>
`Downloaded Patch: ${current.toString().padStart(total.toString().length, ' ')} / ${total}, ${e.md5Old.slice(0, 8)}... -> ${e.md5New.slice(0, 8)}..., ${formatBytes(dataLength)}`,
bigFileDownloadLogFn: (file) => `Downloaded Patch: ${file.md5Old} -> ${file.md5New}`,
},
);
});
});
await networkQueue.onIdle();
buffers.sort((a, b) => a.index - b.index);
const combinedBuffer = new Uint8Array(mathUtils.arrayTotal(buffers.map((e) => e.data.length)));
let offset = 0;
for (const item of buffers) {
combinedBuffer.set(item.data, offset);
offset += item.data.length;
}
const combinedBufferMd5 = new Bun.CryptoHasher('md5').update(combinedBuffer).digest('hex');
const chunkFileName = `VFS_Patch_Chunk_${combinedBufferMd5}.bin`;
const tag = getSelectedTag();
if (!tag) throw new Error('GitHub tag assets file count limit reached');
await githubUtils.uploadAssetWithBuffer(client, owner, repo, tag, chunkFileName, combinedBuffer);
offset = 0;
for (const item of chunk) {
patchDb.push({
md5Old: item.md5Old,
md5New: item.md5New,
mirror: `https://github.com/${owner}/${repo}/releases/download/${tag}/${chunkFileName}`,
chunk: { start: offset, length: item.size },
});
offset += item.size;
}
await Bun.write(patchDbPath, Bun.zstdCompressSync(JSON.stringify(patchDb), { level: 16 }));
}
}
const bigFiles = validPendingDb.filter((e) => e.size >= chunkThresholdSize);
if (bigFiles.length > 0) {
logger.info('Processing big pending patches ...');
networkQueue.concurrency = 4;
for (const file of bigFiles) {
networkQueue.add(async () => {
const buffer = await ky
.get(file.url, {
headers: { 'User-Agent': appConfig.network.userAgent.minimum },
timeout: appConfig.network.timeout,
retry: { limit: appConfig.network.retryCount },
})
.bytes();
logger.trace(`Downloaded Patch: ${file.md5Old} -> ${file.md5New}`);
const tag = getSelectedTag();
if (!tag) throw new Error('GitHub tag assets file count limit reached');
const fileName = `VFS_Patch_${file.md5Old}_${file.md5New}.bin`;
await githubUtils.uploadAssetWithBuffer(client, owner, repo, tag, fileName, buffer);
patchDb.push({
md5Old: file.md5Old,
md5New: file.md5New,
mirror: `https://github.com/${owner}/${repo}/releases/download/${tag}/${fileName}`,
chunk: null,
});
await Bun.write(patchDbPath, Bun.zstdCompressSync(JSON.stringify(patchDb), { level: 16 }));
});
}
await networkQueue.onIdle();
networkQueue.concurrency = appConfig.threadCount.network;
}
}
await Bun.write(pendingDbPath, JSON.stringify([...newPendingDb], null, 2));
}
async function mainCmdHandler() {