Implement Sequential Rate Limiting

This commit is contained in:
Amish Shah 2016-08-19 18:01:24 +01:00
parent 392133f927
commit dcba580d89
9 changed files with 122 additions and 84 deletions

View file

@ -30,7 +30,7 @@
"object.values": "^1.0.3",
"superagent": "^1.5.0",
"unpipe": "^1.0.0",
"ws": "^0.8.1"
"ws": "^1.1.1"
},
"devDependencies": {
"babel-preset-es2015": "^6.6.0",
@ -40,15 +40,15 @@
"eslint-plugin-import": "^1.13.0",
"eslint-plugin-jsx-a11y": "^2.1.0",
"eslint-plugin-react": "^6.0.0",
"fs-extra": "^0.30.0",
"grunt": "^0.4.5",
"grunt-babel": "^6.0.0",
"grunt-browserify": "^4.0.1",
"grunt-contrib-uglify": "^0.11.0",
"grunt-jscs": "^2.8.0",
"jscs": "^2.11.0",
"load-grunt-tasks": "^3.3.0",
"fs-extra": "^0.30.0",
"jsdoc-parse": "^1.2.7"
"jsdoc-parse": "^1.2.7",
"load-grunt-tasks": "^3.3.0"
},
"optionalDependencies": {
"node-opus": "^0.1.11"

View file

@ -11,7 +11,7 @@ class APIRequest {
this.file = file;
}
getBucketName() {
getEndpoint() {
return `${this.method} ${this.url}`;
}

View file

@ -1,62 +0,0 @@
class Bucket {
constructor(rest, limit, remainingRequests = 1, resetTime) {
this.rest = rest;
this.limit = limit;
this.remainingRequests = remainingRequests;
this.resetTime = resetTime;
this.locked = false;
this.queue = [];
this.nextCheck = null;
}
setCheck(time) {
clearTimeout(this.nextCheck);
console.log('going to iterate in', time, 'remaining:', this.queue.length);
this.nextCheck = setTimeout(() => {
this.remainingRequests = this.limit - 1;
this.locked = false;
this.process();
}, time);
}
process() {
if (this.locked) {
return;
}
this.locked = true;
if (this.queue.length === 0) {
return;
}
if (this.remainingRequests === 0) {
return;
}
console.log('bucket is going to iterate', Math.min(this.remainingRequests, this.queue.length), 'items with max', this.limit, 'and remaining', this.remainingRequests);
while (Math.min(this.remainingRequests, this.queue.length) > 0) {
const item = this.queue.shift();
item.request.gen().end((err, res) => {
if (res && res.headers) {
this.limit = res.headers['x-ratelimit-limit'];
this.resetTime = Number(res.headers['x-ratelimit-reset']) * 1000;
this.setCheck((Math.max(500, this.resetTime - Date.now())) + 1000);
}
if (err) {
console.log(err.status, this.remainingRequests);
item.reject(err);
} else {
item.resolve(res && res.body ? res.body : {});
}
});
this.remainingRequests--;
}
}
add(method) {
this.queue.push(method);
this.process();
}
}
module.exports = Bucket;

View file

@ -1,23 +1,21 @@
const request = require('superagent');
const Constants = require('../../util/Constants');
const UserAgentManager = require('./UserAgentManager');
const RESTMethods = require('./RESTMethods');
const Bucket = require('./Bucket');
const SequentialRequestHandler = require('./RequestHandlers/Sequential');
const APIRequest = require('./APIRequest');
class RESTManager {
constructor(client) {
this.client = client;
this.buckets = {};
this.handlers = {};
this.userAgentManager = new UserAgentManager(this);
this.methods = new RESTMethods(this);
this.rateLimitedEndpoints = {};
}
addToBucket(bucket, apiRequest) {
push(handler, apiRequest) {
return new Promise((resolve, reject) => {
bucket.add({
handler.push({
request: apiRequest,
resolve,
reject,
@ -26,17 +24,13 @@ class RESTManager {
}
makeRequest(method, url, auth, data, file) {
/*
file is {file, name}
*/
const apiRequest = new APIRequest(this, method, url, auth, data, file);
if (!this.buckets[apiRequest.getBucketName()]) {
console.log('new bucket', apiRequest.getBucketName());
this.buckets[apiRequest.getBucketName()] = new Bucket(this, 1, 1);
if (!this.handlers[apiRequest.getEndpoint()]) {
this.handlers[apiRequest.getEndpoint()] = new SequentialRequestHandler(this);
}
return this.addToBucket(this.buckets[apiRequest.getBucketName()], apiRequest);
return this.push(this.handlers[apiRequest.getEndpoint()], apiRequest);
}
}

View file

View file

@ -0,0 +1,34 @@
/**
* A base class for different types of rate limiting handlers for the REST API.
* @private
*/
module.exports = class RequestHandler {
constructor(restManager) {
/**
* The RESTManager that instantiated this RequestHandler
* @type {RESTManager}
*/
this.restManager = restManager;
/**
* A list of requests that have yet to be processed.
* @type {Array<APIRequest>}
*/
this.queue = [];
}
/**
* Push a new API request into this bucket
* @param {APIRequest} request the new request to push into the queue
*/
push(request) {
this.queue.push(request);
}
/**
* Attempts to get this RequestHandler to process its current queue
*/
handle() {
}
};

View file

@ -0,0 +1,71 @@
const RequestHandler = require('./RequestHandler');
/**
* Handles API Requests sequentially, i.e. we wait until the current request is finished before moving onto
* the next. This plays a _lot_ nicer in terms of avoiding 429's when there is more than one session of the account,
* but it can be slower.
* @extends {RequestHandler}
*/
module.exports = class SequentialRequestHandler extends RequestHandler {
constructor(restManager) {
super(restManager);
/**
* Whether this rate limiter is waiting for a response from a request
* @type {Boolean}
*/
this.waiting = false;
}
push(request) {
super.push(request);
this.handle();
}
/**
* Performs a request then resolves a promise to indicate its readiness for a new request
* @param {APIRequest} item the item to execute
* @returns {Promise<Object, Error>}
*/
execute(item) {
return new Promise((resolve, reject) => {
item.request.gen().end((err, res) => {
if (res && res.headers) {
this.requestLimit = res.headers['x-ratelimit-limit'];
this.requestResetTime = Number(res.headers['x-ratelimit-reset']) * 1000;
this.requestRemaining = Number(res.headers['x-ratelimit-remaining']);
}
if (err) {
this.waiting = false;
item.reject(err);
reject(err);
} else {
this.queue.shift();
const data = res && res.body ? res.body : {};
item.resolve(data);
if (this.requestRemaining === 0) {
setTimeout(() => {
this.waiting = false;
resolve(data);
}, (this.requestResetTime - Date.now()) + 4000);
} else {
this.waiting = false;
resolve(data);
}
}
});
});
}
handle() {
super.handle();
if (this.waiting || this.queue.length === 0) {
return;
}
this.waiting = true;
const item = this.queue[0];
this.execute(item).then(() => this.handle()).catch(console.log);
}
};

View file

@ -151,7 +151,8 @@ class WebSocketManager {
* Run whenever an error occurs with the WebSocket connection. Tries to reconnect
* @returns {null}
*/
eventError() {
eventError(e) {
console.log(e);
this.tryReconnect();
}

View file

@ -142,8 +142,8 @@ client.on('message', message => {
}
if (message.content === 'ratelimittest') {
let i = 0;
while (i < 20) {
let i = 1;
while (i <= 20) {
message.channel.sendMessage(`Testing my rates, item ${i} of 20`);
i++;
}