Skip to content

Commit 521a24b

Browse files
authored
fix unhandled promise reject during fetch process (teambit#3968)
Fetch process sends the objects to a queue to be written to the filesystem and doesn't wait to completion. Originally it was done to make the process faster and let the queue control the concurrency. However, it causes issues with error handling. This PR changes the implementation to wait for the queue to complete processing before continuing to the next object. The performance had been tested before and after this change and there was no noticeable difference. (on bit-bin with more than 10K objects it took 1:10 min in both cases, which is mostly the download time)
1 parent 6cd472b commit 521a24b

File tree

6 files changed

+25
-53
lines changed

6 files changed

+25
-53
lines changed

e2e/functionalities/merge.e2e.3.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ describe('merge functionality', function () {
5252
it('should throw MergeConflict error when importing the component', () => {
5353
const importFunc = () => helper.command.importComponent('bar/foo');
5454
const error = new MergeConflict(`${helper.scopes.remote}/bar/foo`, ['0.0.2']);
55-
helper.general.expectToThrow(importFunc, error);
55+
expect(importFunc).to.throw(error.message);
56+
expect(importFunc).to.not.throw('unhandled rejection found');
5657
});
5758
});
5859
describe('importing a component with --merge flag', () => {

src/cli/default-error-handler.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ import {
8080
HashNotFound,
8181
HeadNotFound,
8282
InvalidIndexJson,
83-
MergeConflict,
8483
OutdatedIndexJson,
8584
ParentNotFound,
8685
ResolutionException,
@@ -219,18 +218,6 @@ const errorsMap: Array<[Class<Error>, (err: Class<Error>) => string]> = [
219218
],
220219
[HashNotFound, (err) => `hash ${chalk.bold(err.hash)} not found`],
221220
[HeadNotFound, (err) => `head snap ${chalk.bold(err.headHash)} was not found for a component ${chalk.bold(err.id)}`],
222-
[
223-
MergeConflict,
224-
(err) =>
225-
`error: merge conflict occurred while importing the component ${err.id}. conflict version(s): ${err.versions.join(
226-
', '
227-
)}
228-
to resolve it and merge your local and remote changes, please do the following:
229-
1) bit untag ${err.id} ${err.versions.join(' ')}
230-
2) bit import
231-
3) bit checkout ${err.versions.join(' ')} ${err.id}
232-
once your changes are merged with the new remote version, you can tag and export a new version of the component to the remote scope.`,
233-
],
234221
[
235222
OutdatedIndexJson,
236223
(err) => `error: ${chalk.bold(err.id)} found in the index.json file, however, is missing from the scope.

src/scope/exceptions/merge-conflict.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
import AbstractError from '../../error/abstract-error';
1+
import { BitError } from '@teambit/bit-error';
22

3-
export default class MergeConflict extends AbstractError {
3+
export default class MergeConflict extends BitError {
44
id: string;
55
versions: string[];
66

77
constructor(id: string, versions: string[]) {
8-
super();
8+
super(`error: merge conflict occurred while importing the component ${id}. conflict version(s): ${versions.join(
9+
', '
10+
)}
11+
to resolve it and merge your local and remote changes, please do the following:
12+
1) bit untag ${id} ${versions.join(' ')}
13+
2) bit import
14+
3) bit checkout ${versions.join(' ')} ${id}
15+
once your changes are merged with the new remote version, you can tag and export a new version of the component to the remote scope.`);
916
this.id = id;
1017
this.versions = versions;
1118
}

src/scope/objects-fetcher/objects-writable-stream.ts

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import pMap from 'p-map';
21
import { Writable } from 'stream';
32
import { BitObject, Repository } from '../objects';
4-
import { CONCURRENT_COMPONENTS_LIMIT, DEFAULT_LANE } from '../../constants';
3+
import { DEFAULT_LANE } from '../../constants';
54
import { RemoteLaneId } from '../../lane-id/lane-id';
65
import logger from '../../logger/logger';
76
import { ModelComponentMerger } from '../component-ops/model-components-merger';
@@ -21,7 +20,6 @@ import { WriteComponentsQueue } from './write-components-queue';
2120
* remotes are processed. see @writeManyObjectListToModel.
2221
*/
2322
export class ObjectsWritable extends Writable {
24-
private mutableObjects: BitObject[] = [];
2523
constructor(
2624
private repo: Repository,
2725
private sources: SourceRepository,
@@ -37,46 +35,27 @@ export class ObjectsWritable extends Writable {
3735
return callback(new Error('objectItem expected to have "ref" and "buffer" props'));
3836
}
3937
try {
40-
await this.writeImmutableObjectToFs(obj);
38+
await this.writeObjectToFs(obj);
4139
return callback();
4240
} catch (err) {
4341
return callback(err);
4442
}
4543
}
46-
async _final(callback) {
47-
try {
48-
await this.writeMutableObjectsToFS();
49-
callback();
50-
} catch (err) {
51-
callback(err);
52-
}
53-
}
54-
private async writeImmutableObjectToFs(obj: ObjectItem) {
44+
45+
private async writeObjectToFs(obj: ObjectItem) {
5546
const bitObject = await BitObject.parseObject(obj.buffer);
5647
if (bitObject instanceof Lane) {
5748
throw new Error('ObjectsWritable does not support lanes');
5849
}
5950
if (bitObject instanceof ModelComponent) {
60-
this.componentsQueue.addComponent(bitObject.id(), () => this.writeComponentObject(bitObject));
51+
await this.componentsQueue.addComponent(bitObject.id(), () => this.writeComponentObject(bitObject));
6152
} else {
62-
this.objectsQueue.addImmutableObject(obj.ref.toString(), () => this.repo.writeObjectsToTheFS([bitObject]));
53+
await this.objectsQueue.addImmutableObject(obj.ref.toString(), () => this.writeImmutableObject(bitObject));
6354
}
64-
// else this.mutableObjects.push(bitObject);
6555
}
66-
private async writeMutableObjectsToFS() {
67-
const components = this.mutableObjects.filter((obj) => obj instanceof ModelComponent);
68-
const mergedComponents = await pMap(
69-
components,
70-
(component) => this.mergeModelComponent(component as ModelComponent, this.remoteName),
71-
{
72-
concurrency: CONCURRENT_COMPONENTS_LIMIT,
73-
}
74-
);
75-
await this.repo.writeObjectsToTheFS(mergedComponents);
76-
await this.repo.remoteLanes.addEntriesFromModelComponents(
77-
RemoteLaneId.from(DEFAULT_LANE, this.remoteName),
78-
mergedComponents
79-
);
56+
57+
private async writeImmutableObject(bitObject: BitObject) {
58+
await this.repo.writeObjectsToTheFS([bitObject]);
8059
}
8160

8261
private async writeComponentObject(modelComponent: ModelComponent) {

src/scope/objects-fetcher/write-components-queue.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ export class WriteComponentsQueue {
88
}
99
addComponent(id: string, fn: () => Promise<void>) {
1010
this.processedIds.push(id);
11-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
12-
this.add(fn);
11+
return this.add(fn);
1312
}
1413
getQueue() {
1514
return this.queue;

src/scope/objects-fetcher/write-objects-queue.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ export class WriteObjectsQueue {
77
constructor(concurrency = CONCURRENT_IO_LIMIT) {
88
this.queue = new PQueue({ concurrency, autoStart: true });
99
}
10-
addImmutableObject(hash: string, fn: () => Promise<void>) {
10+
addImmutableObject<T>(hash: string, fn: () => Promise<T | null>) {
1111
if (this.addedHashes.includes(hash)) {
12-
return;
12+
return null;
1313
}
1414
this.addedHashes.push(hash);
15-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
16-
this.add(fn);
15+
return this.add(fn);
1716
}
1817
getQueue() {
1918
return this.queue;

0 commit comments

Comments
 (0)