/**
iZ³ | Izzzio blockchain - https://izzz.io
@author: iZ³ Team (info@izzz.io)
*/
/**
* moment js required
* @type {number}
*/
'use strict';
const MESSAGE_MUTEX_TIMEOUT = 1000;
const LATENCY_TIME = 100 * 1000; //time on obsolescence of message
//unify browser and node
if(typeof _this === 'undefined') {
var _this = this;
}
/**
* StarWave Protocol
* StarWave is a self-configurable hi-speed messages interaction protocol
*/
class starwaveProtocol {
constructor(candy, messageType) {
this.candy = candy;
this.candy.MessageType = messageType;
/**
* Input message mutex
* @type {{}}
* @private
*/
this._messageMutex = {};
if(_this.window === undefined) {
this.moment = require('moment');
} else {
this.moment = moment;
}
}
/**
* Create message of starwave type
* @param {object} data Message Data
* @param {string} reciver Receiver address
* @param {string} sender Sender address
* @param {string} id Message id
* @param {number} timestamp Message timestamp
* @param {number} TTL Message TTL
* @param {number} relevancyTime Message lifetime
* @param {object} route Routes object
* @param {number} type Messages type
* @param {number} timestampOfStart Initial message timestamp
* @returns {{data: *, reciver: *, sender: *, id: *, timestamp: number, TTL: number, index: *, mutex: string, relevancyTime: Array, route: Array, type: number, timestampOfStart: number}}
*/
createMessage(data, reciver, sender, id, timestamp, TTL, relevancyTime, route, type, timestampOfStart) {
return {
data: data,
reciver: reciver,
sender: sender !== undefined ? sender : this.candy.recieverAddress,
id: id,
timestamp: timestamp !== undefined ? timestamp : this.moment().utc().valueOf(),
TTL: typeof TTL !== 'undefined' ? TTL : 0,
mutex: this.candy.getid() + this.candy.getid() + this.candy.getid(),
relevancyTime: relevancyTime !== undefined ? relevancyTime : LATENCY_TIME, //time of message's relevancy
route: route !== undefined ? route : [],
type: type !== undefined ? type : this.candy.MessageType.SW_BROADCAST,
timestampOfStart: timestampOfStart !== undefined ? timestampOfStart : this.moment().utc().valueOf()
};
};
/**
* Register message handler
* @param {string} message Message ID
* @param {function} handler Handler function
* @return {boolean}
*/
registerMessageHandler(message, handler) {
let that = this;
if(typeof that.candy !== 'undefined') {
this.candy.registerMessageHandler(message, function (messageBody) {
if(messageBody.id === message || message.length === 0) {
if(typeof messageBody.mutex !== 'undefined' && typeof that._messageMutex[messageBody.mutex] === 'undefined') {
if(handler(messageBody)) {
that.handleMessageMutex(messageBody);
return true;
} else {
return false;
}
}
}
});
return true;
}
return false;
};
/**
* Send message to peer directly(using busAddress)
* @param {string} messageBusAddress Receiver address
* @param {object} message Message object
*/
sendMessageToPeer(messageBusAddress, message) {
let that = this;
if(typeof that.candy !== 'undefined') {
if(messageBusAddress === this.getAddress()) { //message to yourself
this.handleMessage(message, this.candy.messagesHandlers, null);
return true;
} else {
let socket = this.getSocketByBusAddress(messageBusAddress);
if(!socket) { //no such connected socket
return false;
} else {
//add this node address if the route isn't complete
if(!this.routeIsComplete(message)) {
message.route.push(this.candy.recieverAddress);
}
//send the message
this.write(socket, message);
this.handleMessageMutex(message);
return true; //message has been sended
}
}
}
};
/**
* Send broadcasting messages to all peers excluding previous sender
* @param {object} message
*/
broadcastMessage(message) {
let that = this;
//if the route is empty then it is the first sending and we send it to all
if(typeof that.candy !== 'undefined') {
let prevSender; //previous sender og the message
if(message.route.length > 0) { //if the route is empty then it's the first sending of the message and we send it to all connected peers without exclusions
//saving previous sender (last element in route array)
prevSender = that.getSocketByBusAddress(message.route[message.route.length - 1]);
}
//adding our address to route
message.route.push(this.candy.recieverAddress);
//set message type
message.type = this.candy.MessageType.SW_BROADCAST;
//send to all except previous sender(if it's not the first sending)
this.broadcast(message, prevSender);
this.handleMessageMutex(message);
}
};
/**
* Send message using StarWave protocol
* @param {object} message Message object
*/
sendMessage(message) {
if(!this.sendMessageToPeer(message.reciver, message)) { //can't send directly, no such connected peer, then send to all
//clear route starting from our address
this.broadcastMessage(message);
return 2; //sended broadcast
}
return 1; //sended directly to peer
};
/**
* Disassemble incoming message and decide what we should do with it
* @param {object} message
* @returns {Boolean}
*/
manageIncomingMessage(message) {
//message from ourselves
if(message.sender === this.getAddress()) {
try { //trying to close connection
this.getSocketByBusAddress(message.sender).close();
} catch (e) {
}
return 0;
}
//check if the message is't too old
let m = this.moment().utc().valueOf();
if(m > (message.timestamp + message.relevancyTime + LATENCY_TIME)) {
return 0; //do nothing
}
//is it an endpoint of the message
if(this.endpointForMessage(message)) {
//save the route
if(message.route.length > 1) { //if the route consist of the only element we don't save the route becase of direct connection to peer
message.route.push(this.candy.recieverAddress);//reverse the route to use it in future sendings
this.candy.routes[message.sender] = message.route.reverse().filter((v, i, a) => a.indexOf(v) === i);
}
return 1; //message delivered
} else { //if the message shoud be forwarded
//there should be foreward processing
return 0;
}
};
/**
* Retranslate incoming message
* @param message
* @returns {*} sended message
* @private
*/
retranslateMessage(message) {
//change some information in message
let newMessage = message;
if(this.routeIsComplete(newMessage)) {
let ind = newMessage.route.indexOf(this.candy.recieverAddress); // find index of this node in route array
if(!this.sendMessageToPeer(newMessage.route[ind + 1], newMessage)) { //can't send directly, no such connected peer, then send to all
//clear route starting from our address because the toute is wrong and we should rebuild it
newMessage.route = newMessage.route.splice(ind);
this.broadcastMessage(newMessage);
}
} else {//if the route isn't complete
this.sendMessage(newMessage);
}
return newMessage;
};
/**
* Full message processing according to the Protocol
* @private
* @param message
* @param messagesHandlers
* @param ws
* @returns {*} //id of processed message
*/
handleMessage(message, messagesHandlers, ws) {
if(message.type === this.candy.MessageType.SW_BROADCAST) {
if(this.manageIncomingMessage(message) === 1) {
//this.starwaveCrypto.handleIncomingMessage(message);
//message is on the endpoint and we execute handlers
for (let a in messagesHandlers) {
if(messagesHandlers.hasOwnProperty(a)) {
message._socket = ws;
if(messagesHandlers[a].handle(message)) {
return message.id; //if the message is processed we return
}
}
}
}
}
}
/**
* Process the message mutex
* @private
* @param messageBody
*/
handleMessageMutex(messageBody) {
this._messageMutex[messageBody.mutex] = true;
setTimeout(() => {
if(typeof this._messageMutex[messageBody.mutex] !== 'undefined') {
delete this._messageMutex[messageBody.mutex];
}
}, MESSAGE_MUTEX_TIMEOUT);
};
/**
* Check if our node is the reciver
* @private
* @param message
* @returns {boolean}
*/
endpointForMessage(message) {
return message.reciver === this.candy.recieverAddress;
};
/**
* Check if our route is complete
* @private
* @param message
* @returns {boolean}
*/
routeIsComplete(message) {
return (message.route[message.route.length - 1] === message.reciver);
};
/**
* Returns address
* @return {string}
*/
getAddress() {
return this.candy.recieverAddress;
};
/**
* Write to socket
* @param ws
* @param message
*/
write(ws, message) {
try {
ws.send(JSON.stringify(message))
} catch (e) { //send error. it's possibly that socket is inactive
console.log('Send error: ' + e);
}
}
/**
* Get the list of connected peers(sockets)
* @returns {Array}
*/
getCurrentPeers(fullSockets) {
return this.candy.sockets.map(function (s) {
if(s && s.readyState === 1) {
if(fullSockets) {
return s;
} else {
return 'ws://' + s._socket.remoteAddress + ':' + /*s._socket.remotePort*/ config.p2pPort
}
}
}).filter((v, i, a) => a.indexOf(v) === i);
}
/**
* find socket using bus address
* @param address
* @return {*}
*/
getSocketByBusAddress(address) {
const sockets = this.getCurrentPeers(true);
for (let i in sockets) {
if(sockets.hasOwnProperty(i)) {
if(sockets[i] && sockets[i].nodeMetaInfo) {
if(sockets[i].nodeMetaInfo.messageBusAddress === address) {
return sockets[i];
}
}
}
}
return false;
}
/**
* Broadcast message
* @private
* @param message
* @param excludeIp
*/
broadcast(message, excludeIp) {
let i;
for (i = 0; i < this.candy.sockets.length; i++) {
let socket = this.candy.sockets[i];
if(typeof excludeIp === 'undefined' || socket !== excludeIp) {
this.write(socket, message);
} else {
}
}
}
/**
* close connection with socket if there are more then one url on that busaddress
* @private
* @returns {number} //status of the operation
*/
preventMultipleSockets(socket) {
let busAddress;
if(socket.nodeMetaInfo) {
busAddress = socket.nodeMetaInfo.messageBusAddress;
if(busAddress === undefined) {
return 2; //socket without busAddress
}
} else {
return 3; //socket has no meta info
}
//if there are more than 1 socket on busaddress we close connection
const sockets = this.getCurrentPeers(true);
let socketsOnBus = 0;
const socketsNumber = sockets.length;
for (let i = 0; i < socketsNumber; i++) {
if(sockets[i] && sockets[i].nodeMetaInfo) {
if(sockets[i].nodeMetaInfo.messageBusAddress === busAddress) {
socketsOnBus++;
}
}
}
if(socketsOnBus > 1) {
socket.close();
return 0; //close connection
} else {
return 1; // no other connections
}
}
}
//unify browser and node
if(this.window === undefined) {
module.exports = starwaveProtocol;
}