errory voice

This commit is contained in:
Amish Shah 2016-08-24 21:38:48 +01:00
parent 48444a5444
commit 8683f45816
20 changed files with 476 additions and 13 deletions

File diff suppressed because one or more lines are too long

View file

@ -26,14 +26,15 @@
},
"homepage": "https://github.com/hydrabolt/discord.js#readme",
"dependencies": {
"node-opus": "^0.1.13",
"object.values": "^1.0.3",
"superagent": "^1.5.0",
"tweetnacl": "^0.14.3",
"ws": "^1.1.1"
},
"devDependencies": {
"fs-extra": "^0.30.0",
"jsdoc-parse": "^1.2.7",
"load-grunt-tasks": "^3.3.0"
"jsdoc-parse": "^1.2.7"
},
"optionalDependencies": {
"node-opus": "^0.1.11"

View file

@ -39,8 +39,9 @@ class ClientVoiceManager {
if (pendingRequest.token && pendingRequest.sessionID && pendingRequest.endpoint) {
const { channel, token, sessionID, endpoint, resolve, reject } = pendingRequest;
const voiceConnection = new VoiceConnection(this, channel, token, sessionID, endpoint, resolve, reject);
this.pending.delete(guildID);
this.connections.set(guildID, voiceConnection);
voiceConnection.on('disconnected', () => {
voiceConnection.once('disconnected', () => {
this.connections.delete(guildID);
});
}
@ -102,6 +103,17 @@ class ClientVoiceManager {
*/
joinChannel(channel) {
return new Promise((resolve, reject) => {
if (this.pending.get(channel.guild.id)) {
return reject(new Error('already connecting to a channel in this guild'));
}
const existingConn = this.connections.get(channel.guild.id);
if (existingConn) {
if (existingConn.channel.id !== channel.id) {
this._sendWSJoin(channel);
this.connections.get(channel.guild.id).channel = channel;
}
resolve(existingConn);
}
this.pending.set(channel.guild.id, {
channel,
sessionID: null,

View file

@ -2,6 +2,7 @@ const VoiceConnectionWebSocket = require('./VoiceConnectionWebSocket');
const VoiceConnectionUDPClient = require('./VoiceConnectionUDPClient');
const Constants = require('../../util/Constants');
const EventEmitter = require('events').EventEmitter;
const DefaultPlayer = require('./player/DefaultPlayer');
/**
* Represents a connection to a Voice Channel in Discord
@ -16,6 +17,11 @@ class VoiceConnection extends EventEmitter {
* @private
*/
this.manager = manager;
/**
* The player
* @type {BasePlayer}
*/
this.player = new DefaultPlayer(this);
/**
* The endpoint of the connection
* @type {String}
@ -83,12 +89,15 @@ class VoiceConnection extends EventEmitter {
}
_onClose(e) {
e = e && e.code === 1000 ? null : e;
return this._shutdown(e);
}
_shutdown(e) {
console.log('being shut down! D:');
this.ready = false;
this.websocket._shutdown();
this.player._shutdown();
if (this.udp) {
this.udp._shutdown();
}
@ -105,14 +114,20 @@ class VoiceConnection extends EventEmitter {
this.websocket.on('close', err => this._onClose(err));
this.websocket.on('ready-for-udp', data => {
this.udp = new VoiceConnectionUDPClient(this, data);
this.data = data;
this.udp.on('error', err => this._onError(err));
this.udp.on('close', err => this._onClose(err));
});
this.websocket.on('ready', () => {
this.websocket.on('ready', secretKey => {
this.data.secret = secretKey;
this.ready = true;
this.emit('ready');
this._resolve(this);
});
this.websocket.on('speaking', data => {
const guild = this.channel.guild;
guild._memberSpeakUpdate(data.user_id, data.speaking);
});
}
}

View file

@ -54,8 +54,13 @@ class VoiceConnectionWebSocket extends EventEmitter {
this.heartbeat = setInterval(() => {
this.send({
op: Constants.VoiceOPCodes.HEARTBEAT,
d: null,
});
}, interval);
this.send({
op: Constants.VoiceOPCodes.HEARTBEAT,
d: null,
});
}
_onMessage(event) {
@ -77,7 +82,14 @@ class VoiceConnectionWebSocket extends EventEmitter {
for (const index in packet.d.secret_key) {
this.secretKey[index] = packet.d.secret_key[index];
}
this.emit('ready');
this.emit('ready', this.secretKey);
break;
case Constants.VoiceOPCodes.SPEAKING:
/*
{ op: 5,
d: { user_id: '123123', ssrc: 1, speaking: true } }
*/
this.emit('speaking', packet.d);
break;
default:
this.emit('unknown', packet);

View file

@ -0,0 +1,161 @@
const EventEmitter = require('events').EventEmitter;
const NaCl = require('tweetnacl');
const nonce = new Buffer(24);
nonce.fill(0);
class StreamDispatcher extends EventEmitter {
constructor(player, stream) {
super();
this.player = player;
this.stream = stream;
this.streamingData = {
channels: 2,
};
this._startStreaming();
this._triggered = false;
}
_setSpeaking(value) {
this.speaking = value;
this.emit('speaking', value);
}
_sendBuffer(buffer, sequence, timestamp) {
this.player.connection.udp.send(
this._createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer))
);
}
_createPacket(sequence, timestamp, buffer) {
const packetBuffer = new Buffer(buffer.length + 28);
packetBuffer.fill(0);
packetBuffer[0] = 0x80;
packetBuffer[1] = 0x78;
packetBuffer.writeUIntBE(sequence, 2, 2);
packetBuffer.writeUIntBE(timestamp, 4, 4);
packetBuffer.writeUIntBE(this.player.connection.data.ssrc, 8, 4);
packetBuffer.copy(nonce, 0, 0, 12);
buffer = NaCl.secretbox(buffer, nonce, this.player.connection.data.secret);
for (let i = 0; i < buffer.length; i++) {
packetBuffer[i + 12] = buffer[i];
}
return packetBuffer;
}
_send() {
try {
if (this._triggered) {
return this._setSpeaking(false);
}
const data = this.streamingData;
if (data.missed >= 5) {
return this._triggerTerminalState('error', new Error('stream is not generating fast enough'));
}
if (this.paused) {
data.timestamp = (data.timestamp + 4294967295) ? data.timestamp + 960 : 0;
return setTimeout(() => this._send(), data.length * 10);
}
const bufferLength = 1920 * data.channels;
this._setSpeaking(true);
let buffer = this.stream.read(bufferLength);
if (!buffer) {
data.missed++;
return setTimeout(() => this._send(), data.length * 10);
}
data.missed = 0;
if (buffer.length !== bufferLength) {
const newBuffer = new Buffer(bufferLength).fill(0);
buffer.copy(newBuffer);
buffer = newBuffer;
}
data.count++;
data.sequence = (data.sequence + 1) < (65536) ? data.sequence + 1 : 0;
data.timestamp = (data.timestamp + 4294967295) ? data.timestamp + 960 : 0;
this._sendBuffer(buffer, data.sequence, data.timestamp);
const nextTime = data.startTime + (data.count * data.length);
setTimeout(() => this._send(), data.length + (nextTime - Date.now()));
} catch (e) {
this._triggerTerminalState('error', e);
}
}
_triggerEnd() {
this.emit('end');
}
_triggerError(e) {
this.emit('error', e);
}
_triggerTerminalState(state, e) {
if (this._triggered) {
return;
}
this.emit('debug', `triggered terminal state ${state} - stream is now dead`);
this._triggered = true;
this._setSpeaking(false);
switch (state) {
case 'end':
this._triggerEnd(e);
break;
case 'error':
this._triggerError(e);
break;
default:
this.emit('error', 'unknown trigger state');
break;
}
}
_startStreaming() {
if (!this.stream) {
return this.emit('error', 'no stream');
}
this.stream.on('end', e => this._triggerTerminalState('end', e));
this.stream.on('error', e => this._triggerTerminalState('error', e));
const data = this.streamingData;
data.count = 0;
data.sequence = 0;
data.timestamp = 0;
data.length = 20;
data.missed = 0;
data.startTime = Date.now();
this._send();
}
_pause(value) {
if (value) {
this.paused = true;
this._setSpeaking(false);
} else {
this.paused = false;
this._setSpeaking(true);
}
}
end() {
this._triggerTerminalState('end', 'user requested');
}
pause() {
this._pause(true);
}
resume() {
this._pause(false);
}
}
module.exports = StreamDispatcher;

View file

@ -0,0 +1,15 @@
class BaseOpus {
constructor(player) {
this.player = player;
}
encode(buffer) {
return buffer;
}
decode(buffer) {
return buffer;
}
}
module.exports = BaseOpus;

View file

@ -0,0 +1,27 @@
const OpusEngine = require('./BaseOpusEngine');
let opus;
class NodeOpusEngine extends OpusEngine {
constructor(player) {
super(player);
try {
opus = require('node-opus');
} catch (err) {
throw err;
}
this.encoder = new opus.OpusEncoder(48000, 2);
}
encode(buffer) {
super.encode(buffer);
return this.encoder.encode(buffer, 1920);
}
decode(buffer) {
super.encode(buffer);
return this.encoder.decode(buffer, 1920);
}
}
module.exports = NodeOpusEngine;

View file

@ -0,0 +1,25 @@
const list = [
require('./NodeOpusEngine'),
];
exports.add = encoder => {
list.push(encoder);
};
function fetch(Encoder) {
try {
return new Encoder();
} catch (err) {
return;
}
}
exports.fetch = () => {
for (const encoder of list) {
const success = fetch(encoder);
if (success) {
return success;
}
}
throw new Error('could not find an opus engine');
};

View file

@ -0,0 +1,13 @@
class ConverterEngine {
constructor(player) {
this.player = player;
}
createConvertStream() {
return;
}
}
module.exports = ConverterEngine;

View file

@ -0,0 +1 @@
exports.fetch = () => require('./FfmpegConverterEngine');

View file

@ -0,0 +1,33 @@
const ConverterEngine = require('./ConverterEngine');
const ChildProcess = require('child_process');
function chooseCommand() {
for (const cmd of ['ffmpeg', 'avconv', './ffmpeg', './avconv']) {
if (!ChildProcess.spawnSync(cmd, ['-h']).error) {
return cmd;
}
}
}
class FfmpegConverterEngine extends ConverterEngine {
constructor(player) {
super(player);
this.command = chooseCommand();
}
createConvertStream() {
super.createConvertStream();
const encoder = ChildProcess.spawn(this.command, [
'-analyzeduration', '0',
'-loglevel', '0',
'-i', '-',
'-f', 's16le',
'-ar', '48000',
'-ss', '0',
'pipe:1',
], { stdio: ['pipe', 'pipe', 'ignore'] });
return encoder;
}
}
module.exports = FfmpegConverterEngine;

View file

@ -0,0 +1,93 @@
const OpusEngines = require('../opus/OpusEngineList');
const ConverterEngines = require('../pcm/ConverterEngineList');
const Constants = require('../../../util/Constants');
const StreamDispatcher = require('../dispatcher/StreamDispatcher');
const EventEmitter = require('events').EventEmitter;
class VoiceConnectionPlayer extends EventEmitter {
constructor(connection) {
super();
this.connection = connection;
this.opusEncoder = OpusEngines.fetch();
const Engine = ConverterEngines.fetch();
this.converterEngine = new Engine(this);
this.speaking = false;
this.processMap = new Map();
}
convertStream(stream) {
const encoder = this.converterEngine.createConvertStream();
stream.pipe(encoder.stdin);
this.processMap.set(encoder.stdout, {
pcmConverter: encoder,
inputStream: stream,
});
return encoder.stdout;
}
_shutdown() {
for (const stream of this.processMap.keys()) {
this.killStream(stream);
}
}
killStream(stream) {
const streams = this.processMap.get(stream);
this.emit('debug', 'cleaning up streams after end/error');
if (streams) {
if (streams.inputStream && streams.pcmConverter) {
try {
if (streams.pcmConverter.stdin) {
streams.pcmConverter.stdin.end();
this.emit('debug', 'stream kill part 1/5 pass');
}
if (streams.pcmConverter.stdout.destroy) {
streams.pcmConverter.stdout.destroy();
this.emit('debug', 'stream kill part 2/5 pass');
}
if (streams.pcmConverter && streams.pcmConverter.kill) {
streams.pcmConverter.kill('SIGINT');
this.emit('debug', 'stream kill part 3/5 pass');
}
if (streams.inputStream.unpipe) {
streams.inputStream.unpipe(streams.pcmConverter.stdin);
this.emit('debug', 'stream kill part 4/5 pass');
}
if (streams.inputStream.destroy) {
streams.inputStream.destroy();
this.emit('debug', 'stream kill part 5/5 pass');
}
} catch (e) {
console.log(e);
return e;
}
}
}
}
setSpeaking(value) {
if (this.speaking === value) {
return;
}
this.speaking = value;
this.connection.websocket.send({
op: Constants.VoiceOPCodes.SPEAKING,
d: {
speaking: value,
delay: 0,
},
});
}
playPCMStream(pcmStream) {
const dispatcher = new StreamDispatcher(this, pcmStream);
dispatcher.on('speaking', value => this.setSpeaking(value));
dispatcher.on('end', () => this.killStream(pcmStream));
dispatcher.on('error', () => this.killStream(pcmStream));
return dispatcher;
}
}
module.exports = VoiceConnectionPlayer;

View file

@ -0,0 +1,15 @@
const BasePlayer = require('./BasePlayer');
const fs = require('fs-extra');
class DefaultPlayer extends BasePlayer {
playFile(file) {
const fileStream = fs.createReadStream(file).on('error', console.log);
const pcmStream = this.convertStream(fileStream).on('error', console.log);
const dispatcher = this.playPCMStream(pcmStream);
return dispatcher;
}
}
module.exports = DefaultPlayer;

View file

@ -18,7 +18,12 @@ class VoiceStateUpdateHandler extends AbstractHandler {
member.voiceChannel.members.delete(oldVoiceChannelMember.id);
}
if (client.voice.pending.has(guild.id)) {
// if the member left the voice channel, unset their speaking property
if (!data.channel_id) {
member.speaking = null;
}
if (client.voice.pending.has(guild.id) && member.user.id === client.user.id && data.channel_id) {
client.voice._receivedVoiceStateUpdate(data.guild_id, data.session_id);
}

View file

@ -192,6 +192,14 @@ class Guild {
return base;
}
_memberSpeakUpdate(user, speaking) {
const member = this.members.get(user);
if (member) {
member.speaking = speaking;
}
this.client.emit(Constants.Events.GUILD_MEMBER_SPEAKING, member, speaking);
}
/**
* Sets up the Guild
* @param {any} data
@ -288,12 +296,6 @@ class Guild {
}
}
/**
* The embed channel of the Guild.
* @type {GuildChannel}
*/
this.embedChannel = this.channels.get(data.embed_channel_id);
if (data.roles) {
this.roles.clear();
for (const role of data.roles) {

View file

@ -64,6 +64,11 @@ class GuildMember {
* @type {Date}
*/
this.joinDate = new Date(data.joined_at);
/**
* Whether this meember is speaking
* @type {?Boolean}
*/
this.speaking = this.speaking;
this._roles = data.roles;
}

View file

@ -55,6 +55,15 @@ class VoiceChannel extends GuildChannel {
join() {
return this.client.voice.joinChannel(this);
}
leave() {
const exists = this.client.voice.connections.get(this.guild.id);
if (exists) {
if (exists.channel.id === this.id) {
exists.disconnect();
}
}
}
}
module.exports = VoiceChannel;

View file

@ -159,6 +159,7 @@ exports.Events = {
MESSAGE_DELETE: 'messageDelete',
MESSAGE_UPDATE: 'messageUpdate',
RECONNECTING: 'reconnecting',
GUILD_MEMBER_SPEAKING: 'guildMemberSpeaking',
};
exports.WSEvents = {

View file

@ -177,6 +177,11 @@ client.on('messageUpdate', (old, message) => {
console.log('Message updated from', old.content, 'to', message.content);
});
client.on('guildMemberSpeaking', (member, speaking) => {
const message = speaking ? `${member.user.username} is speaking` : `${member.user.username} is not speaking`;
member.guild.channels.get(member.guild.id).sendMessage(message);
});
client.on('message', msg => {
if (msg.content.startsWith('?raw')) {
msg.channel.sendMessage('```' + msg.content + '```');
@ -191,3 +196,16 @@ client.on('message', msg => {
}
}
});
let disp;
client.on('message', msg => {
if (msg.content.startsWith('/join')) {
const chan = msg.content.split(' ').slice(1).join(' ');
msg.channel.guild.channels.get(chan).join()
.then(conn => {
msg.reply('done');
})
.catch(console.log);
}
})