RSS Git Download  Clone
Raw Blame History
/*******************************************************************************
 *	Boap.cc	Boap RPC protocol
 *			T.Barnaby,	BEAM Ltd,	8/5/03
 *
 * History:
 *	4/2/05:		Updated addressing scheme
 *******************************************************************************
 */
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <Boap.h>

// Boap NS
#include <BoapnsD.h>
#include <BoapnsC.h>


#define	DEBUG	0

#if DEBUG
#define	dprintf(fmt, a...)       printf(fmt, ##a);
#else
#define	dprintf(fmt, a...)
#endif


const int roundSize = 256;

BoapPacket::BoapPacket(){
	opos = 0;
	osize = roundSize;
	odata = (char*)malloc(osize);
}
BoapPacket::~BoapPacket(){
	delete odata;
}

int BoapPacket::resize(int size){
	if(size > osize){
		osize = roundSize * ((size + roundSize - 1) / roundSize);
		odata = (char*)realloc(odata, osize);
	}
	return 0;
}

BError BoapPacket::setData(void* data, int nbytes){
	BError	err;
	
	resize(nbytes);
	memcpy(odata, data, nbytes);
	return err;
}

int BoapPacket::nbytes(){
	return ((BoapPacketHead*)odata)->length;
}

char* BoapPacket::data(){
	return odata;
}

void BoapPacket::updateLen(){
	((BoapPacketHead*)odata)->length = opos;
}

int BoapPacket::pushHead(BoapPacketHead& head){
	opos = 0;
	resize(opos + sizeof(head));
	memcpy(&odata[opos], &head, sizeof(head));
	opos += sizeof(head);
	updateLen();
	return 0;
}

int BoapPacket::push(Int8 v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt8 v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(Int16 v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt16 v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(Int32 v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt32 v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(BString& v){
	Int32	l = v.len() + 1;
	
	resize(opos + sizeof(l) + l);
	memcpy(&odata[opos], &l, sizeof(l));
	opos += sizeof(l);
	memcpy(&odata[opos], v.retStr(), l);
	opos += l;
	updateLen();
	return 0;
}

int BoapPacket::push(Double v){
	resize(opos + sizeof(v));
	memcpy(&odata[opos], &v, sizeof(v));
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(BError& v){
	Int32	n = v.getErrorNo();
	BString	s = v.getString();
	push(n);
	push(s);
	return 0;
}

int BoapPacket::push(UInt32 nBytes, const void* data){
}


int BoapPacket::popHead(BoapPacketHead& head){
	opos = 0;
	memcpy(&head, &odata[opos], sizeof(head));
	opos += sizeof(head);
	return 0;
}

int BoapPacket::pop(Int8& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt8& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(Int16& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt16& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(Int32& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt32& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(BString& v){
	Int32	l;
	
	memcpy(&l, &odata[opos], sizeof(l));
	opos += sizeof(l);
	v = (char*)&odata[opos];
	opos += l;
	return 0;
}

int BoapPacket::pop(Double& v){
	memcpy(&v, &odata[opos], sizeof(v));
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(BError& v){
	Int32	n;
	BString	s;
	pop(n);
	pop(s);
	v.set(n, s);
	return 0;
}

int BoapPacket::pop(UInt32 nBytes, void* data){
	return 0;
}



/*******************************************************************************
 *	Boap Client communications object
 *******************************************************************************
 */

/*******************************************************************************
 *	Base for all Client Classes
 *******************************************************************************
 */
BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){
	oconnected = 0;
	oservice = 0;
}

BError BoapClientObject::connectService(BString name){
	BError			err;
	Boapns::Boapns		boapns("boapns");
	Boapns::BoapEntry	entry;
	BString			host = "localhost";
	BString			boapnsName;
	BSocketAddressINET	add;

	if(oconnected)
		return err;

	oname = name;
	
	// Parse name
	if(name.subString(0, 2) == "//"){
		name.pullSeparators("/");
		host = name.pullToken("/");
	}

	if(name == "boapns"){
		if(! (err = add.set(host, "boapns", "tcp"))){
			if(! (err = connect(add))){
				oservice = 0;
				oconnected = 1;
			}
		}
	}
	else {
		// Lookup name
		boapnsName = BString("//") + host + "/boapns";

		if(! (err = boapns.connectService(boapnsName))){
			if(! (err = boapns.getEntry(name, entry))){
				if(! (err = add.set(entry.ipAddress, entry.port))){
					if(! (err = connect(add))){
						oservice = entry.service;
						oconnected = 1;
					}
				}
			}
		}
	}
	
	return err;
}

BError BoapClientObject::performSend(BoapPacket& tx){
	BError			err;
	BSize			nb = tx.nbytes();
	char*			data = tx.data();
	BSize			n = 0;
	BSize			nd = 0;

	// Connect if required
	if(err = connectService(oname))
		return err;

	// Send packet
	while(n < nb){
		if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}

BError BoapClientObject::performRecv(BoapPacket& rx){
	BError			err;
	BSize			nb = 0;
	char*			data = 0;
	BSize			n = 0;
	BSize			nd = 0;
	BoapPacketHead		head;

	// Connect if required
	if(err = connectService(oname))
		return err;

	// Recieve Packet
	// Read Head first
	rx.resize(sizeof(BoapPacketHead));
	nb = sizeof(BoapPacketHead);
	data = rx.data();
	n = 0;
	while(n < nb){
		if(err = recv(&data[n], nb - n, nd))
			return err;
		n = n + nd;
	}
	
	// Read rest of packet
	rx.popHead(head);
	rx.resize(head.length);
	nb = head.length - sizeof(BoapPacketHead);
	data = &rx.data()[sizeof(BoapPacketHead)];
	n = 0;
	while(n < nb){
		if(err = recv(&data[n], nb - n, nd))
			return err;
		n = n + nd;
	}

	return err;
}

BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){
	BError			err;
	
	if(! (err = performSend(tx))){
		err = performRecv(rx);
	}

	return err;
}

BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){
	setBroadCast(1);
}

BError BoapSignalObject::performSend(BoapPacket& tx){
	BError			err;
	BSize			nb = tx.nbytes();
	char*			data = tx.data();
	BSize			n = 0;
	BSize			nd = 0;
	BSocketAddressINET	nadd;

	nadd.set(INADDR_BROADCAST, 14000);
	
	// Send packet
	while(n < nb){
		if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}



/*******************************************************************************
 *	Boap Server communications object
 *******************************************************************************
 */
BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) {
	oboapNs = 0;
	onet.setReuseAddress(1);
}

BError BoapServer::init(int boapNs){
	BError			err;
	Boapns::Boapns		boapns("boapns");
	BIter			i;
	BoapService		s;
	BList<uint32_t>		l;
	uint32_t		ipAddress;
	BSocketAddressINET	nadd;

	oboapNs = boapNs;
	ohostName = BSocketAddressINET::getHostName();
	l = BSocketAddressINET::getIpAddresses();
	if(l.number())
		ipAddress = l[0];
	else
		ipAddress = 0;
	
	if(boapNs)
		nadd.set("", "boapns", "tcp");
	else
		nadd.set("", 0);

	// Create service socket
	if(err = onet.bind(nadd))
		return err;
	
	// Create event socket
	if(err = onet.getAddress(nadd))
		return err;
	if(err = onetEvent.bind(nadd))
		return err;

	// BOAP event broadcast address
	onetEventAddress.set("", "boapns", "udp");
	onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port());
	onetEvent.setBroadCast(1);

	if(!boapNs){
		for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
			if(err = boapns.addEntry(Boapns::BoapEntry(oservices[i].oobject->name(), ipAddress, nadd.port(), s)))
				break;
		}
	}
	return err;
}

BError BoapServer::run(){
	BError			err;
	BSocketAddressINET	nadd;
	int		fd;

	opoll.append(onet.getFd());
	opoll.append(onetEvent.getFd());
	onet.listen();
	
	while(1){
		opoll.doPoll(fd);
		if(fd == onet.getFd()){
			onet.accept(fd, nadd);
			dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr());

			opoll.append(fd);
		}
		else if(fd == onetEvent.getFd()){
			processEvent(fd);
		}
		else if(fd >= 0){
			if(process(fd)){
				opoll.delFd(fd);
				close(fd);
			}
		}
	}
	
	return err;
}

BError BoapServer::addObject(BoapServiceObject* object){
	BError	err;

	oservices.append(BoapServiceEntry(oservices.number(), object));
	
	return err;
}

BError BoapServer::process(int fd){
	BError			err;
	int			nb = 0;
	char*			data = 0;
	int			n = 0;
	int			nd = 0;
	BoapPacketHead		head;
	BIter			i;

	// Recieve Packet
	// Read Head first
	orx.resize(sizeof(BoapPacketHead));
	nb = sizeof(BoapPacketHead);
	data = orx.data();
	n = 0;
	while(n < nb){
		nd = recv(fd, &data[n], nb - n, 0);
		if(nd <= 0)
			return BError(1, strerror(errno));
		n = n + nd;
	}
	
	// Read rest of packet
	orx.popHead(head);
	orx.resize(head.length);
	nb = head.length - sizeof(BoapPacketHead);
	data = &orx.data()[sizeof(BoapPacketHead)];
	n = 0;
	while(n < nb){
		nd = recv(fd, &data[n], nb - n, 0);
		if(nd <= 0)
			return BError(1, strerror(errno));
		n = n + nd;
	}

	dprintf("Command received: Len(%d) Type(%d) Service(%d) Cmd(%d)\n", head.length, head.type, head.service, head.cmd);

	for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){
		if(oservices[i].oservice == head.service){
			oservices[i].oobject->process(orx, otx);
			break;
		}
	}

	// Send packet
	nb = otx.nbytes();
	data = otx.data();
	n = 0;
	while(n < nb){
		nd = send(fd, &data[n], nb - n, MSG_NOSIGNAL);
		if(nd <= 0)
			return BError(1, strerror(errno));
		n = n + nd;
	}

	return err;
}

BError BoapServer::processEvent(BoapPacket& rx){
	BError	err;
	BIter	i;
	int	s;
	
	dprintf("BoapServer::processEvent\n");
	for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
		oservices[i].oobject->processEvent(rx);
	}
	
	return err;
}

BError BoapServer::processEvent(int fd){
	BError			err;
	int			nb = 0;
	BoapPacketHead		head;
	BoapPacket		rx;

	// Recieve Packet
	rx.resize(1500);
	if((nb = recv(fd, rx.data(), 1500, 0)) < 0)
		return BError(errno, strerror(errno));

	orx.popHead(head);
	dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd);

	return processEvent(rx);
}

BError BoapServer::sendEvent(BoapPacket& tx){
	BError			err;
	int			nb = 0;
	char*			data = 0;
	int			n = 0;
	uint32_t		nsent;

	// Send packet
	nb = tx.nbytes();
	data = tx.data();
	n = 0;
	while(n < nb){
		dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr());
		err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent);
		dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr());
		if(err)
			return err;
		n = n + nsent;
	}
	return err;
}

BSocket& BoapServer::getSocket(){
	return onet;
}

BSocket& BoapServer::getEventSocket(){
	return onetEvent;
}

BString	BoapServer::getHostName(){
	return ohostName;
}

/*******************************************************************************
 *	Base for all Server Objects
 *******************************************************************************
 */
BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) :
	ocmd(cmd), ofunc(func){
}

BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){
	oserver.addObject(this);
}

BoapServiceObject::~BoapServiceObject(){
}

BString	BoapServiceObject::name(){
	return oname;
}

BError BoapServiceObject::process(BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		head;
	BIter			i;

	rx.popHead(head);
	for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){
		if(ofuncList[i].ocmd == head.cmd){
			err = (this->*ofuncList[i].ofunc)(rx, tx);
			break;
		}
	}

	return err;
}

BError BoapServiceObject::sendEvent(BString name, Int32 arg){
	BError		err;
	BoapPacketHead	txhead;
	BoapPacket	tx;
	BString		fullName = BString("//") + oserver.getHostName() + "/" + oname;
	
	txhead.type = BoapTypeSignal;
	txhead.service = 0;
	txhead.cmd = 0;
	tx.pushHead(txhead);
	tx.push(fullName);
	tx.push(name);
	tx.push(arg);
	if(err = sendEvent(tx))
		return err;

	return err;
}

BError BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg){
	BError		err;
	
	dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg)\n");
	return err;
}

BError BoapServiceObject::sendEvent(BoapPacket& tx){
	return oserver.sendEvent(tx);
}

BError BoapServiceObject::processEvent(BoapPacket& rx){
	BError		err;
	BoapPacketHead	rxhead;
	BError		ret;
	BString		objectName;
	BString		signalName;
	Int32		arg;

	dprintf("BoapServiceObject::processEvent\n");
	rx.popHead(rxhead);
	rx.pop(objectName);
	rx.pop(signalName);
	rx.pop(arg);
	ret = processEvent(objectName, signalName, arg);
	return err;
}