-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
151 lines (134 loc) · 3.53 KB
/
index.js
File metadata and controls
151 lines (134 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
'use strict';
const request = require('request-promise');
const EventEmitter = require('events');
const assert = require('assert');
const EVENTS = {
RESOLVED: 'resolved',
REJECTED: 'rejected',
COMPLETED: 'completed'
};
/**
* Utility to perform multiple requests [see: http://www.npmjs.com/request]
* Configurable with number of parallel requests to be performed at once
* and a waitTime between start-up of each new request.
*
* emits events:
* 'resolved' : When request was resolved.
* 'rejected' : When request was rejected.
* 'completed' : When the queue is empty.
*
* @extends EventEmitter
*/
class RequestQueue extends EventEmitter {
/**
* Create a new RequestQueue with a fixed number of parallel requests
* and waitTime between each request.
* @param {Number} [parallel=1] Number of parallel requests to be run at once.
* @param {Number} [waitTime=0] Time in ms before staring a new request when one is completed.
*/
constructor(parallel = 1, waitTime = 0) {
super();
assert.ok(Number.isInteger(parallel) && parallel >= 1, 'Parallel needs to be a integer >= 1.');
assert.ok(Number.isInteger(waitTime) && waitTime >= 0, 'waitTime needs to be a integer >= 0.');
this._requests = [];
this._parallel = parallel;
this._waitTime = waitTime;
this._running = 0; // The number of currently running requests.
this._completed = false; // To tell if we need to emit 'completed' event.
}
/**
* PRIVATE
* Handle the next request.
*/
_next() {
while (this._running < this._parallel && this._requests.length !== 0) {
this._completed = false; // We have a new request about to start so we are not completed.
this._running++;
request(this._requests.shift())
.then(res => {
this._running--;
this.emit(EVENTS.RESOLVED, res);
this._wait().then(() => this._next());
})
.catch(err => {
this._running--;
this.emit(EVENTS.REJECTED, err);
this._wait().then(() => this._next());
});
}
this._emitIfCompleted();
}
/**
* PRIVATE
* Helper to emit when we have emptied the queue.
*/
_emitIfCompleted() {
if (this._running === 0 && this._requests.length === 0 && !this._completed) {
this._completed = true;
this.emit(EVENTS.COMPLETED);
}
}
/**
* PRIVATE
* Wait for specific time then resolve promise.
*/
_wait() {
if (this._waitTime === 0) return Promise.resolve();
return new Promise(resolve => {
setTimeout(resolve, this._waitTime);
});
}
/**
* Add a new request to the queue of requests to be executed.
* @param {Object} req Can be any request object or string to perform the request to.
* @return {this}
*/
push(req) {
this._requests.push(req);
this._next();
return this;
}
/**
* Add multiple requests to the queue.
* @param {Array} reqs Array of requests to be executed.
* @return {this}
*/
pushAll(reqs) {
this._requests = this._requests.concat(reqs);
this._next();
return this;
}
/**
* Get the current size of the queue.
* @return {Number} length of the queue.
*/
size() {
return this._requests.length;
}
/**
* Add a new item to the front of the queue.
* @return {this}
*/
unshift(req) {
this._requests.unshift(req);
this._next();
return this;
}
/**
* Add a new items to the front of the queue.
* @return {this}
*/
unshiftAll(reqs) {
this._requests = reqs.concat(this._requests);
this._next();
return this;
}
/**
* Clears the queue (all ongoing requests will still be completed though)
* @return {this}
*/
clear() {
this._requests = [];
}
}
module.exports = RequestQueue;