From 289c5d251882c8e78df9f962307c4a6c48dbb641 Mon Sep 17 00:00:00 2001
From: Josh Gross <jogros@microsoft.com>
Date: Tue, 17 Dec 2019 16:59:18 -0500
Subject: [PATCH] Concurrency take 2

---
 dist/restore/index.js  | 65 ++++++++++++++++-----------------------
 dist/save/index.js     | 65 ++++++++++++++++-----------------------
 src/cacheHttpClient.ts | 69 +++++++++++++++++-------------------------
 3 files changed, 80 insertions(+), 119 deletions(-)

diff --git a/dist/restore/index.js b/dist/restore/index.js
index cb0ace4..ca3896b 100644
--- a/dist/restore/index.js
+++ b/dist/restore/index.js
@@ -1615,53 +1615,40 @@ function commitCache(restClient, cacheId, filesize) {
         return yield restClient.create(`caches/${cacheId.toString()}`, commitCacheRequest, requestOptions);
     });
 }
-function parallelAwait(queue, concurrency) {
-    var _a;
+function uploadFile(restClient, cacheId, archivePath) {
     return __awaiter(this, void 0, void 0, function* () {
-        const workQueue = queue.reverse();
-        let completedWork = [];
-        let entries = queue.length;
-        while (entries > 0) {
-            if (entries < concurrency) {
-                completedWork.push(yield Promise.all(workQueue));
-            }
-            else {
-                let promises = [];
-                let i;
-                for (i = 0; i < concurrency; i++) {
-                    promises.push((_a = workQueue.pop(), (_a !== null && _a !== void 0 ? _a : Promise.resolve())));
-                }
-                completedWork.push(yield Promise.all(promises));
+        // Upload Chunks
+        const fileSize = fs.statSync(archivePath).size;
+        const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString();
+        const responses = [];
+        const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too?
+        const concurrency = 4; // # of HTTP requests in parallel
+        const threads = new Array(concurrency);
+        core.debug("Awaiting all uploads");
+        let offset = 0;
+        Promise.all(threads.map(() => __awaiter(this, void 0, void 0, function* () {
+            while (offset < fileSize) {
+                const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
+                const start = offset;
+                const end = offset + chunkSize - 1;
+                offset += MAX_CHUNK_SIZE; // Do this before losing thread during await?
+                const chunk = fs.createReadStream(archivePath, { fd, start, end, autoClose: false });
+                responses.push(yield uploadChunk(restClient, resourceUrl, chunk, start, end));
             }
+        })));
+        fs.closeSync(fd);
+        const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode));
+        if (failedResponse) {
+            throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`);
         }
-        return completedWork;
+        return;
     });
 }
 function saveCache(cacheId, archivePath) {
     return __awaiter(this, void 0, void 0, function* () {
         const restClient = createRestClient();
-        core.debug("Uploading chunks");
-        // Upload Chunks
-        const fileSize = fs.statSync(archivePath).size;
-        const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString();
-        const uploads = [];
-        const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too?
-        let offset = 0;
-        while (offset < fileSize) {
-            const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
-            const end = offset + chunkSize - 1;
-            const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false });
-            uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end));
-            offset += MAX_CHUNK_SIZE;
-        }
-        core.debug("Awaiting all uploads");
-        const responses = yield parallelAwait(uploads, 4);
-        fs.closeSync(fd);
-        //const responses = await Promise.all(uploads);
-        const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode));
-        if (failedResponse) {
-            throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`);
-        }
+        core.debug("Upload cache");
+        yield uploadFile(restClient, cacheId, archivePath);
         core.debug("Commiting cache");
         // Commit Cache
         const cacheSize = utils.getArchiveFileSize(archivePath);
diff --git a/dist/save/index.js b/dist/save/index.js
index 0451f3f..7cd1df9 100644
--- a/dist/save/index.js
+++ b/dist/save/index.js
@@ -1615,53 +1615,40 @@ function commitCache(restClient, cacheId, filesize) {
         return yield restClient.create(`caches/${cacheId.toString()}`, commitCacheRequest, requestOptions);
     });
 }
-function parallelAwait(queue, concurrency) {
-    var _a;
+function uploadFile(restClient, cacheId, archivePath) {
     return __awaiter(this, void 0, void 0, function* () {
-        const workQueue = queue.reverse();
-        let completedWork = [];
-        let entries = queue.length;
-        while (entries > 0) {
-            if (entries < concurrency) {
-                completedWork.push(yield Promise.all(workQueue));
-            }
-            else {
-                let promises = [];
-                let i;
-                for (i = 0; i < concurrency; i++) {
-                    promises.push((_a = workQueue.pop(), (_a !== null && _a !== void 0 ? _a : Promise.resolve())));
-                }
-                completedWork.push(yield Promise.all(promises));
+        // Upload Chunks
+        const fileSize = fs.statSync(archivePath).size;
+        const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString();
+        const responses = [];
+        const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too?
+        const concurrency = 4; // # of HTTP requests in parallel
+        const threads = new Array(concurrency);
+        core.debug("Awaiting all uploads");
+        let offset = 0;
+        Promise.all(threads.map(() => __awaiter(this, void 0, void 0, function* () {
+            while (offset < fileSize) {
+                const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
+                const start = offset;
+                const end = offset + chunkSize - 1;
+                offset += MAX_CHUNK_SIZE; // Do this before losing thread during await?
+                const chunk = fs.createReadStream(archivePath, { fd, start, end, autoClose: false });
+                responses.push(yield uploadChunk(restClient, resourceUrl, chunk, start, end));
             }
+        })));
+        fs.closeSync(fd);
+        const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode));
+        if (failedResponse) {
+            throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`);
         }
-        return completedWork;
+        return;
     });
 }
 function saveCache(cacheId, archivePath) {
     return __awaiter(this, void 0, void 0, function* () {
         const restClient = createRestClient();
-        core.debug("Uploading chunks");
-        // Upload Chunks
-        const fileSize = fs.statSync(archivePath).size;
-        const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString();
-        const uploads = [];
-        const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too?
-        let offset = 0;
-        while (offset < fileSize) {
-            const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
-            const end = offset + chunkSize - 1;
-            const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false });
-            uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end));
-            offset += MAX_CHUNK_SIZE;
-        }
-        core.debug("Awaiting all uploads");
-        const responses = yield parallelAwait(uploads, 4);
-        fs.closeSync(fd);
-        //const responses = await Promise.all(uploads);
-        const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode));
-        if (failedResponse) {
-            throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`);
-        }
+        core.debug("Upload cache");
+        yield uploadFile(restClient, cacheId, archivePath);
         core.debug("Commiting cache");
         // Commit Cache
         const cacheSize = utils.getArchiveFileSize(archivePath);
diff --git a/src/cacheHttpClient.ts b/src/cacheHttpClient.ts
index db74710..00dcc7d 100644
--- a/src/cacheHttpClient.ts
+++ b/src/cacheHttpClient.ts
@@ -174,55 +174,30 @@ async function commitCache(
     );
 }
 
-async function parallelAwait(queue: Promise<any>[], concurrency: number): Promise<any[]> {
-    const workQueue = queue.reverse();
-    let completedWork: any[] = [];
-    let entries = queue.length;
-    while (entries > 0) {
-        if (entries < concurrency) {
-            completedWork.push(await Promise.all(workQueue));
-        } else {
-            let promises: Promise<any>[] = [];
-            let i: number;
-            for (i = 0; i < concurrency; i++) {
-                promises.push(workQueue.pop() ?? Promise.resolve());
-            }
-            completedWork.push(await Promise.all(promises));
-        }
-    }
-
-    return completedWork;
-}
-
-export async function saveCache(
-    cacheId: number,
-    archivePath: string
-): Promise<void> {
-    const restClient = createRestClient();
-
-    core.debug("Uploading chunks");
+async function uploadFile(restClient: RestClient, cacheId: number, archivePath: string): Promise<void> {
     // Upload Chunks
     const fileSize = fs.statSync(archivePath).size;
     const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString();
-    const uploads: Promise<IRestResponse<void>>[] = [];
-
+    const responses: IRestResponse<void>[] = [];
     const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too?
-    let offset = 0;
-    while (offset < fileSize) {
-        const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
-        const end = offset + chunkSize - 1;
-        const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false });
-        uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end));
-        offset += MAX_CHUNK_SIZE;
-    }
 
+    const concurrency = 4; // # of HTTP requests in parallel
+    const threads = new Array(concurrency);
     core.debug("Awaiting all uploads");
-    const responses = await parallelAwait(uploads, 4);
+    let offset = 0;
+    Promise.all(threads.map(async () => { // This might not work cause something something closures
+        while (offset < fileSize) {
+            const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
+            const start = offset;
+            const end = offset + chunkSize - 1;
+            offset += MAX_CHUNK_SIZE; // Do this before losing thread during await?
+            const chunk = fs.createReadStream(archivePath, { fd, start, end, autoClose: false });
+            responses.push(await uploadChunk(restClient, resourceUrl, chunk, start, end));
+        }
+    }));
+
     fs.closeSync(fd);
 
-
-    //const responses = await Promise.all(uploads);
-
     const failedResponse = responses.find(
         x => !isSuccessStatusCode(x.statusCode)
     );
@@ -232,6 +207,18 @@ export async function saveCache(
         );
     }
 
+    return;
+}
+
+export async function saveCache(
+    cacheId: number,
+    archivePath: string
+): Promise<void> {
+    const restClient = createRestClient();
+
+    core.debug("Upload cache");
+    await uploadFile(restClient, cacheId, archivePath);
+
     core.debug("Commiting cache");
     // Commit Cache
     const cacheSize = utils.getArchiveFileSize(archivePath);