diff --git a/src/cmds/ghMirrorUpload.ts b/src/cmds/ghMirrorUpload.ts index a4728bf..0c35b34 100644 --- a/src/cmds/ghMirrorUpload.ts +++ b/src/cmds/ghMirrorUpload.ts @@ -139,27 +139,56 @@ async function processMirrorQueue(configAuth: any, client: Octokit) { logger.info('Mirroring process completed and pending list cleared'); } -async function processMirrorResQueue(configAuth: any, client: Octokit) { +function getSelectedTag(tags: string[], dbs: { mirror: string }[][]) { + const regexp = /github\.com\/.+?\/.+?\/releases\/download\/(.+?)\//; + const uniqueMirrors = [...new Set(dbs.flat().map((e) => e.mirror))]; + for (const tag of tags) { + if (uniqueMirrors.filter((m) => m.match(regexp)?.[1] === tag).length <= 997) return tag; + } + return false; +} + +async function processGenericResQueue< + TAsset extends { size: number; url: string }, + TEntry extends { mirror: string; chunk: { start: number; length: number } | null }, +>( + client: Octokit, + owner: string, + repo: string, + tags: string[], + dbs: { current: TEntry[]; other: { mirror: string }[] }, + paths: { db: string; pending: string }, + options: { + isPatch: boolean; + chunkPrefix: string; + getBigFileName: (asset: TAsset) => string; + isDuplicate: (asset: TAsset, db: TEntry[]) => boolean; + pushToDb: ( + asset: TAsset, + db: TEntry[], + mirror: string, + chunk: { start: number; length: number } | null, + ) => void; + logPrefix: string; + downloadLogFn: (asset: TAsset, index: number, total: number, dataLength: number) => string; + bigFileDownloadLogFn: (asset: TAsset) => string; + }, +) { const ghFileSizeLimit = 2 * 1024 ** 3 - 1; - const owner = configAuth.github.relArchiveRes.owner; - const repo = configAuth.github.relArchiveRes.repo; - const outputDir = argvUtils.getArgv()['outputDir']; - const dbPath = path.join(outputDir, 'mirror_file_res_list.json.zst'); - const patchDbPath = path.join(outputDir, 'mirror_file_res_patch_list.json.zst'); - const pendingDbPath = path.join(outputDir, 'mirror_file_res_list_pending.json'); - const patchPendingDbPath = path.join(outputDir, 'mirror_file_res_patch_list_pending.json'); - const db: MirrorFileResEntry[] = JSON.parse(Bun.zstdDecompressSync(await Bun.file(dbPath).bytes()).toString('utf-8')); - const patchDb: MirrorFileResPatchEntry[] = JSON.parse( - Bun.zstdDecompressSync(await Bun.file(patchDbPath).bytes()).toString('utf-8'), - ); - const pendingDb: AssetToMirrorRes[] = (await Bun.file(pendingDbPath).json()) ?? []; - const validPendingDb: AssetToMirrorRes[] = []; - const newPendingDb: AssetToMirrorRes[] = []; + const chunkThresholdSize = 500 * 1024 ** 2; + + if (!(await Bun.file(paths.pending).exists())) return; + const pendingDb: TAsset[] = (await Bun.file(paths.pending).json()) ?? []; + + const validPendingDb: TAsset[] = []; + const newPendingDb: TAsset[] = []; for (const entry of pendingDb) { - if (db.some((e) => e.md5 === entry.md5)) continue; + if (options.isDuplicate(entry, dbs.current)) continue; if (entry.size >= ghFileSizeLimit) { - logger.warn(`File size is larger than limit. Skipped: ${entry.name}`); + logger.warn( + `File size is larger than limit. Skipped${options.isPatch ? ' patch' : ''}: ${options.getBigFileName(entry)}`, + ); newPendingDb.push(entry); continue; } @@ -167,148 +196,130 @@ async function processMirrorResQueue(configAuth: any, client: Octokit) { } if (validPendingDb.length === 0) { - logger.info('Res valid pending list is empty'); + logger.info(`${options.logPrefix} valid pending list is empty`); + await Bun.write(paths.pending, JSON.stringify([...newPendingDb], null, 2)); + return; + } + + logger.info(`Processing ${validPendingDb.length} pending ${options.logPrefix.toLowerCase()} ...`); + + const tagPicker = () => getSelectedTag(tags, [dbs.current, dbs.other]); + + if (!tagPicker()) { + logger.error('GitHub tag assets file count limit reached'); + return; + } + + const chunkedFiles = validPendingDb.filter((e) => e.size < chunkThresholdSize); + const pendingFileChunks = chunkedFiles.reduce( + (acc, item) => { + const lastChunk = acc.at(-1)!; + const currentChunkSize = lastChunk.reduce((sum, i) => sum + i.size, 0); + if (currentChunkSize + item.size <= ghFileSizeLimit) { + lastChunk.push(item); + } else { + acc.push([item]); + } + return acc; + }, + [[]] as TAsset[][], + ); + + if (pendingFileChunks.length === 1 && pendingFileChunks[0]!.length === 0) { + logger.info(`${options.isPatch ? 'Patch chunk' : 'Chunk'} upload skipped`); } else { - logger.info(`Processing ${validPendingDb.length} pending res ...`); - - const getSelectedTag = () => { - const regexp = /github\.com\/.+?\/.+?\/releases\/download\/(.+?)\//; - for (const tag of configAuth.github.relArchiveRes.tags) { - if ( - [...new Set([...db, ...patchDb].map((e) => e.mirror))].filter( - (e) => e.match(regexp) && e.match(regexp)![1] && e.match(regexp)![1] === tag, - ).length <= 997 - ) - return tag as string; - } - return false; - }; - - if (!getSelectedTag()) { - logger.error('GitHub tag assets file count limit reached'); - return; - } - - const pendingFileChunkSizeLimit = ghFileSizeLimit; - const chunkThresholdSize = 500 * 1024 ** 2; - const pendingFileChunks = validPendingDb - .filter((e) => e.size < chunkThresholdSize) - .reduce( - (acc, item) => { - const lastChunk = acc.at(-1)!; - const currentChunkSize = lastChunk.reduce((sum, i) => sum + i.size, 0); - if (currentChunkSize + item.size <= pendingFileChunkSizeLimit) { - lastChunk.push(item); - } else { - acc.push([item]); - } - return acc; - }, - [[]] as AssetToMirrorRes[][], - ); - if (pendingFileChunks.length === 1 && pendingFileChunks[0]!.length === 0) { - logger.info('Chunk upload skipped'); - await Bun.write(pendingDbPath, JSON.stringify(validPendingDb, null, 2)); - } else { - for (const chunk of pendingFileChunks) { - const buffers: { index: number; data: Uint8Array }[] = []; - console.log(''); - chunk.forEach((e, index) => { - networkQueue.add(async () => { - const data = await ky - .get(e.url, { - headers: { 'User-Agent': appConfig.network.userAgent.minimum }, - timeout: appConfig.network.timeout, - retry: { limit: appConfig.network.retryCount }, - }) - .bytes(); - buffers.push({ index, data }); - process.stdout.write('\x1b[1A\x1b[2K'); - logger.trace( - `Downloaded: ${buffers.length.toString().padStart(chunk.length.toString().length, ' ')} / ${chunk.length}, ${new URL(e.url).pathname.split('/').at(-1)}, ${formatBytes(data.length)}`, - ); - }); - }); - await networkQueue.onIdle(); - buffers.sort((a, b) => a.index - b.index); - - const chunkTotalSize = mathUtils.arrayTotal(buffers.map((e) => e.data.length)); - const combinedBuffer = new Uint8Array(chunkTotalSize); - let offset = 0; - for (const item of buffers) { - combinedBuffer.set(item.data, offset); - offset += item.data.length; - } - const combinedBufferMd5 = new Bun.CryptoHasher('md5').update(combinedBuffer).digest('hex'); - const chunkFileName = `VFS_Chunk_${combinedBufferMd5}.bin`; - if (getSelectedTag() === false) throw new Error('GitHub tag assets file count limit reached'); - await githubUtils.uploadAssetWithBuffer( - client, - owner, - repo, - getSelectedTag() as string, - chunkFileName, - combinedBuffer, - ); - - offset = 0; - for (const item of chunk) { - db.push({ - md5: item.md5, - mirror: `https://github.com/${owner}/${repo}/releases/download/${getSelectedTag()}/${chunkFileName}`, - chunk: { start: offset, length: item.size }, - }); - offset += item.size; - } - await Bun.write(dbPath, Bun.zstdCompressSync(JSON.stringify(db), { level: 16 })); - } - } - - const bigFiles = validPendingDb.filter((e) => e.size >= chunkThresholdSize); - await Bun.write(pendingDbPath, JSON.stringify([...newPendingDb, ...bigFiles], null, 2)); - - { - if (bigFiles.length > 0) logger.info('Processing big pending res ...'); - networkQueue.concurrency = 4; - for (const file of bigFiles) { + for (const chunk of pendingFileChunks) { + const buffers: { index: number; data: Uint8Array }[] = []; + console.log(''); + chunk.forEach((e, index) => { networkQueue.add(async () => { - const buffer: Uint8Array = await ky - .get(file.url, { + const data = await ky + .get(e.url, { headers: { 'User-Agent': appConfig.network.userAgent.minimum }, timeout: appConfig.network.timeout, retry: { limit: appConfig.network.retryCount }, }) .bytes(); - logger.trace('Downloaded: ' + file.name); - if (getSelectedTag() === false) throw new Error('GitHub tag assets file count limit reached'); - - await githubUtils.uploadAssetWithBuffer(client, owner, repo, getSelectedTag() as string, file.name, buffer); - db.push({ - md5: file.md5, - mirror: `https://github.com/${owner}/${repo}/releases/download/${getSelectedTag()}/${file.name}`, - chunk: null, - }); - await Bun.write(dbPath, Bun.zstdCompressSync(JSON.stringify(db), { level: 16 })); + buffers.push({ index, data }); + process.stdout.write('\x1b[1A\x1b[2K'); + logger.trace(options.downloadLogFn(e, buffers.length, chunk.length, data.length)); }); - } + }); await networkQueue.onIdle(); - networkQueue.concurrency = appConfig.threadCount.network; + buffers.sort((a, b) => a.index - b.index); + + const chunkTotalSize = mathUtils.arrayTotal(buffers.map((e) => e.data.length)); + const combinedBuffer = new Uint8Array(chunkTotalSize); + let offset = 0; + for (const item of buffers) { + combinedBuffer.set(item.data, offset); + offset += item.data.length; + } + const combinedBufferMd5 = new Bun.CryptoHasher('md5').update(combinedBuffer).digest('hex'); + const chunkFileName = `${options.chunkPrefix}${combinedBufferMd5}.bin`; + const currentTag = tagPicker(); + if (!currentTag) throw new Error('GitHub tag assets file count limit reached'); + + await githubUtils.uploadAssetWithBuffer(client, owner, repo, currentTag, chunkFileName, combinedBuffer); + + offset = 0; + for (const item of chunk) { + options.pushToDb( + item, + dbs.current, + `https://github.com/${owner}/${repo}/releases/download/${currentTag}/${chunkFileName}`, + { start: offset, length: item.size }, + ); + offset += item.size; + } + await Bun.write(paths.db, Bun.zstdCompressSync(JSON.stringify(dbs.current), { level: 16 })); } } - await Bun.write(pendingDbPath, JSON.stringify([...newPendingDb], null, 2)); + const bigFiles = validPendingDb.filter((e) => e.size >= chunkThresholdSize); + await Bun.write(paths.pending, JSON.stringify([...newPendingDb, ...bigFiles], null, 2)); + + if (bigFiles.length > 0) { + logger.info(`Processing big pending ${options.logPrefix.toLowerCase()} ...`); + networkQueue.concurrency = 4; + for (const file of bigFiles) { + networkQueue.add(async () => { + const buffer: Uint8Array = await ky + .get(file.url, { + headers: { 'User-Agent': appConfig.network.userAgent.minimum }, + timeout: appConfig.network.timeout, + retry: { limit: appConfig.network.retryCount }, + }) + .bytes(); + logger.trace(options.bigFileDownloadLogFn(file)); + const currentTag = tagPicker(); + if (!currentTag) throw new Error('GitHub tag assets file count limit reached'); + + const fileName = options.getBigFileName(file); + await githubUtils.uploadAssetWithBuffer(client, owner, repo, currentTag, fileName, buffer); + options.pushToDb( + file, + dbs.current, + `https://github.com/${owner}/${repo}/releases/download/${currentTag}/${fileName}`, + null, + ); + await Bun.write(paths.db, Bun.zstdCompressSync(JSON.stringify(dbs.current), { level: 16 })); + }); + } + await networkQueue.onIdle(); + networkQueue.concurrency = appConfig.threadCount.network; + } + + await Bun.write(paths.pending, JSON.stringify([...newPendingDb], null, 2)); } -async function processMirrorResPatchQueue(configAuth: any, client: Octokit) { - const ghFileSizeLimit = 2 * 1024 ** 3 - 1; +async function processMirrorResQueue(configAuth: any, client: Octokit) { const owner = configAuth.github.relArchiveRes.owner; const repo = configAuth.github.relArchiveRes.repo; const outputDir = argvUtils.getArgv()['outputDir']; const dbPath = path.join(outputDir, 'mirror_file_res_list.json.zst'); const patchDbPath = path.join(outputDir, 'mirror_file_res_patch_list.json.zst'); - const pendingDbPath = path.join(outputDir, 'mirror_file_res_patch_list_pending.json'); - - if (!(await Bun.file(pendingDbPath).exists())) return; + const pendingDbPath = path.join(outputDir, 'mirror_file_res_list_pending.json'); const db: MirrorFileResEntry[] = (await Bun.file(dbPath).exists()) ? JSON.parse(Bun.zstdDecompressSync(await Bun.file(dbPath).bytes()).toString('utf-8')) @@ -316,146 +327,66 @@ async function processMirrorResPatchQueue(configAuth: any, client: Octokit) { const patchDb: MirrorFileResPatchEntry[] = (await Bun.file(patchDbPath).exists()) ? JSON.parse(Bun.zstdDecompressSync(await Bun.file(patchDbPath).bytes()).toString('utf-8')) : []; - const pendingDb: AssetToMirrorResPatch[] = (await Bun.file(pendingDbPath).json()) ?? []; - const validPendingDb: AssetToMirrorResPatch[] = []; - const newPendingDb: AssetToMirrorResPatch[] = []; - for (const entry of pendingDb) { - if (patchDb.some((e) => e.md5Old === entry.md5Old && e.md5New === entry.md5New)) continue; - if (entry.size >= ghFileSizeLimit) { - logger.warn(`File size is larger than limit. Skipped patch: ${entry.md5Old} -> ${entry.md5New}`); - newPendingDb.push(entry); - continue; - } - validPendingDb.push(entry); - } + await processGenericResQueue( + client, + owner, + repo, + configAuth.github.relArchiveRes.tags, + { current: db, other: patchDb }, + { db: dbPath, pending: pendingDbPath }, + { + isPatch: false, + chunkPrefix: 'VFS_Chunk_', + getBigFileName: (file) => file.name, + isDuplicate: (entry, db) => db.some((e) => e.md5 === entry.md5), + pushToDb: (item, db, mirror, chunk) => { + db.push({ md5: item.md5, mirror, chunk }); + }, + logPrefix: 'Res', + downloadLogFn: (e, current, total, dataLength) => + `Downloaded: ${current.toString().padStart(total.toString().length, ' ')} / ${total}, ${new URL(e.url).pathname.split('/').at(-1)}, ${formatBytes(dataLength)}`, + bigFileDownloadLogFn: (file) => `Downloaded: ${file.name}`, + }, + ); +} - if (validPendingDb.length === 0) { - logger.info('Res patch valid pending list is empty'); - } else { - logger.info(`Processing ${validPendingDb.length} pending res patches ...`); +async function processMirrorResPatchQueue(configAuth: any, client: Octokit) { + const owner = configAuth.github.relArchiveRes.owner; + const repo = configAuth.github.relArchiveRes.repo; + const outputDir = argvUtils.getArgv()['outputDir']; + const dbPath = path.join(outputDir, 'mirror_file_res_list.json.zst'); + const patchDbPath = path.join(outputDir, 'mirror_file_res_patch_list.json.zst'); + const pendingDbPath = path.join(outputDir, 'mirror_file_res_patch_list_pending.json'); - const getSelectedTag = () => { - const regexp = /github\.com\/.+?\/.+?\/releases\/download\/(.+?)\//; - for (const tag of configAuth.github.relArchiveRes.tags) { - if ( - [...new Set([...db, ...patchDb].map((e) => e.mirror))].filter( - (e) => e.match(regexp) && e.match(regexp)![1] && e.match(regexp)![1] === tag, - ).length <= 997 - ) - return tag as string; - } - return false; - }; + const db: MirrorFileResEntry[] = (await Bun.file(dbPath).exists()) + ? JSON.parse(Bun.zstdDecompressSync(await Bun.file(dbPath).bytes()).toString('utf-8')) + : []; + const patchDb: MirrorFileResPatchEntry[] = (await Bun.file(patchDbPath).exists()) + ? JSON.parse(Bun.zstdDecompressSync(await Bun.file(patchDbPath).bytes()).toString('utf-8')) + : []; - if (!getSelectedTag()) { - logger.error('GitHub tag assets file count limit reached'); - return; - } - - const chunkThresholdSize = 500 * 1024 ** 2; - const pendingFileChunks = validPendingDb - .filter((e) => e.size < chunkThresholdSize) - .reduce( - (acc, item) => { - const lastChunk = acc.at(-1)!; - const currentChunkSize = lastChunk.reduce((sum, i) => sum + i.size, 0); - if (currentChunkSize + item.size <= ghFileSizeLimit) { - lastChunk.push(item); - } else { - acc.push([item]); - } - return acc; - }, - [[]] as AssetToMirrorResPatch[][], - ); - - if (pendingFileChunks.length === 1 && pendingFileChunks[0]!.length === 0) { - logger.info('Patch chunk upload skipped'); - } else { - for (const chunk of pendingFileChunks) { - const buffers: { index: number; data: Uint8Array }[] = []; - console.log(''); - chunk.forEach((e, index) => { - networkQueue.add(async () => { - const data = await ky - .get(e.url, { - headers: { 'User-Agent': appConfig.network.userAgent.minimum }, - timeout: appConfig.network.timeout, - retry: { limit: appConfig.network.retryCount }, - }) - .bytes(); - buffers.push({ index, data }); - process.stdout.write('\x1b[1A\x1b[2K'); - logger.trace( - `Downloaded Patch: ${buffers.length.toString().padStart(chunk.length.toString().length, ' ')} / ${chunk.length}, ${e.md5Old.slice(0, 8)}... -> ${e.md5New.slice(0, 8)}..., ${formatBytes(data.length)}`, - ); - }); - }); - await networkQueue.onIdle(); - buffers.sort((a, b) => a.index - b.index); - - const combinedBuffer = new Uint8Array(mathUtils.arrayTotal(buffers.map((e) => e.data.length))); - let offset = 0; - for (const item of buffers) { - combinedBuffer.set(item.data, offset); - offset += item.data.length; - } - const combinedBufferMd5 = new Bun.CryptoHasher('md5').update(combinedBuffer).digest('hex'); - const chunkFileName = `VFS_Patch_Chunk_${combinedBufferMd5}.bin`; - const tag = getSelectedTag(); - if (!tag) throw new Error('GitHub tag assets file count limit reached'); - - await githubUtils.uploadAssetWithBuffer(client, owner, repo, tag, chunkFileName, combinedBuffer); - - offset = 0; - for (const item of chunk) { - patchDb.push({ - md5Old: item.md5Old, - md5New: item.md5New, - mirror: `https://github.com/${owner}/${repo}/releases/download/${tag}/${chunkFileName}`, - chunk: { start: offset, length: item.size }, - }); - offset += item.size; - } - await Bun.write(patchDbPath, Bun.zstdCompressSync(JSON.stringify(patchDb), { level: 16 })); - } - } - - const bigFiles = validPendingDb.filter((e) => e.size >= chunkThresholdSize); - if (bigFiles.length > 0) { - logger.info('Processing big pending patches ...'); - networkQueue.concurrency = 4; - for (const file of bigFiles) { - networkQueue.add(async () => { - const buffer = await ky - .get(file.url, { - headers: { 'User-Agent': appConfig.network.userAgent.minimum }, - timeout: appConfig.network.timeout, - retry: { limit: appConfig.network.retryCount }, - }) - .bytes(); - logger.trace(`Downloaded Patch: ${file.md5Old} -> ${file.md5New}`); - const tag = getSelectedTag(); - if (!tag) throw new Error('GitHub tag assets file count limit reached'); - - const fileName = `VFS_Patch_${file.md5Old}_${file.md5New}.bin`; - await githubUtils.uploadAssetWithBuffer(client, owner, repo, tag, fileName, buffer); - patchDb.push({ - md5Old: file.md5Old, - md5New: file.md5New, - mirror: `https://github.com/${owner}/${repo}/releases/download/${tag}/${fileName}`, - chunk: null, - }); - await Bun.write(patchDbPath, Bun.zstdCompressSync(JSON.stringify(patchDb), { level: 16 })); - }); - } - await networkQueue.onIdle(); - networkQueue.concurrency = appConfig.threadCount.network; - } - } - - await Bun.write(pendingDbPath, JSON.stringify([...newPendingDb], null, 2)); + await processGenericResQueue( + client, + owner, + repo, + configAuth.github.relArchiveRes.tags, + { current: patchDb, other: db }, + { db: patchDbPath, pending: pendingDbPath }, + { + isPatch: true, + chunkPrefix: 'VFS_Patch_Chunk_', + getBigFileName: (file) => `VFS_Patch_${file.md5Old}_${file.md5New}.bin`, + isDuplicate: (entry, db) => db.some((e) => e.md5Old === entry.md5Old && e.md5New === entry.md5New), + pushToDb: (item, db, mirror, chunk) => { + db.push({ md5Old: item.md5Old, md5New: item.md5New, mirror, chunk }); + }, + logPrefix: 'Res patch', + downloadLogFn: (e, current, total, dataLength) => + `Downloaded Patch: ${current.toString().padStart(total.toString().length, ' ')} / ${total}, ${e.md5Old.slice(0, 8)}... -> ${e.md5New.slice(0, 8)}..., ${formatBytes(dataLength)}`, + bigFileDownloadLogFn: (file) => `Downloaded Patch: ${file.md5Old} -> ${file.md5New}`, + }, + ); } async function mainCmdHandler() {