common/broker.js

/**
 * Event Broker
 * @module broker
 * @desc Broker publish and subscription sensor events.
 */


  const brewlog = require("./brewlog.js");
  
  const EventEmitter = require("events").EventEmitter;
  const emitter = new EventEmitter();

  emitter.setMaxListeners(1000);

  //socket
  const clients = [];
  let _socket = null;

  //listeners
  /* @type {string[]} */
  const sensorNames = [];

  let debug = false;
  
  function exists(s){
	let found = false;
	//Check if it's already connected
	clients.forEach(({conn}) => {
		//if (socket){
			if (s.conn.remoteAddress == conn.remoteAddress){
				found = true;
			}
		//}
	});

	return found;
 }

 let _emit;
 module.exports = {
	 setEmitFn(emit){
		_emit = emit;
	 },

	 /**
	  * @param {String} sensorName
	  * @return {Function} Function to be used to publish this event  
	  */
 	create(sensorName) {
		sensorNames.push(sensorName);	
		
		//return a publish function
		return (value, emit=true) => {    
			let timeStamp = new Date().getTime();
			brewlog.sensorLog(sensorName, value);
			if (_socket){
				//_socket.broadcast.emit(sensorName, {name: sensorName, date: timeStamp, value});
				_socket.broadcast.emit(sensorName,  {date: timeStamp, value});
				_socket.emit(sensorName,  {date: timeStamp, value});
			}
			if (emit){
				emitter.emit("sensor", {
					name: sensorName, 
					date: timeStamp, 
					value});
			}
			
			// clients.forEach(sk => {
			// 	if (sk.connected){
			// 		sk.broadcast.emit(sensorName,  {date: timeStamp, value});
			// 	}
			// });
		};		
	},		  
	
	debug(onOff){
		debug = onOff;
	},
	
	/**
	 * @desc Remove publish function.
	 * @param {String} sensorName
	 */
	destroy(sensorName) {	
		brewlog.info("DESTROY", sensorName)
		brewlog.sensorStop(sensorName);
		
		//Remove entry from array
		let index = sensorNames.indexOf(sensorName);
		if (index !== -1) {
			sensorNames.splice(index, 1);
		}	
	},
	
	/**
	 * @desc Pass in a function to call when a sensor event fires.
	 * @param {String} sensorName 
	 * @param {Function} cb - Publish callback 
	 */
    subscribe(sensorName, cb) {
		if (sensorNames.includes(sensorName)){
			brewlog.info("subscribe", sensorName);
			emitter.on("sensor", value => {
				if (value.name == sensorName){
					cb(value);
				}
			});
						
		}else{
			brewlog.error(`Broker cannot subscribe, sensor=${sensorName} does not exist.`);
		}

		return listener;
		
		function listener(value) {
			if (value.name == sensorName){
				cb(value);
			}
		}
	},

	/**
	 * Stop subscribing to a sensor 
	 * @param {*} listener 
	 */
    unSubscribe(listener) {	
		if (listener) emitter.removeListener("sensor", listener);
		listener = null;
		emitter.removeAllListeners("sensor");
    },

	exists,

	/**
	 * @desc Attach a socket that listens to all events
	 * @param {Object} socket - Socket 
	 */
	attach(socket) {
		_socket = socket;
		// console.log(`Attached client ${socket.conn.remoteAddress}`);
		// return;
        if (exists(socket) === false) {
			clients.push(socket);
			console.log(`New Attached client ${socket.conn.remoteAddress}`);
        }
    },
		
	/**
	 * @desc Remove a previously attached socket
	 * @param {Object} c - Client 
	 */
	detach({conn}) {
		console.log(`Detached client ${conn.remoteAddress}`);
		// return;
		clients.forEach(({conn}, index) => {
			if (conn.remoteAddress == conn.remoteAddress){
				clients[index] = null;
				clients.splice(index, 1);

				console.log(`Detached client ${conn.remoteAddress}`);
			}
		});
		_socket = null;
	},

  }