RSS Git Download  Clone
Raw Blame History
/*******************************************************************************
 *	Boap.cc	Boap RPC protocol
 *			T.Barnaby,	BEAM Ltd,	8/5/03
 *	updated by	D.Korchagin,	CERN AB-BI-SW,	2007-08-31
 *
 * History:
 *	4/2/05:		Updated addressing scheme
 *******************************************************************************
 */
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <Boap.h>
#ifndef __Lynx__
#include <byteswap.h>
#else
static inline int bswap_32(int i){
	return (i << 24) | ((i & 0xff00) << 8) | ((i >> 8) & 0xff00) | (i >> 24);
}
#endif

// Boap NS
#include <BoapnsD.h>
#include <BoapnsC.h>

#define	DEBUG			0

#define APIVERSION_TEST		1		// Tests the API version during a connect

#if DEBUG
#define	dprintf(fmt, a...)       printf(fmt, ##a);
#else
#define	dprintf(fmt, a...)
#endif

#if __BYTE_ORDER == __BIG_ENDIAN
#define IS_BIG_ENDIAN	1
#endif


const int boapPort = 12000;		///< The default BOAP connection port
const int roundSize = 256;

static inline void swap8(char* d, char* s){
	*d = *s;
}
static inline void swap16(char* d, char* s){
	d[1] = s[0];
	d[0] = s[1];
}
static inline void swap32(char* d, char* s){
	d[3] = s[0];
	d[2] = s[1];
	d[1] = s[2];
	d[0] = s[3];
}
static inline void swap64(char* d, char* s){
	d[7] = s[0];
	d[6] = s[1];
	d[5] = s[2];
	d[4] = s[3];
	d[3] = s[4];
	d[2] = s[5];
	d[1] = s[6];
	d[0] = s[7];
}

BoapPacket::BoapPacket(){
	opos = 0;
	osize = roundSize;
	odata = (char*)malloc(osize);
}
BoapPacket::~BoapPacket(){
	delete odata;
}

int BoapPacket::resize(int size){
	if(size > osize){
//		printf("BoapPacket::resize: from %d to %d\n", osize, size);
		osize = roundSize * ((size + roundSize - 1) / roundSize);
		odata = (char*)realloc(odata, osize);
	}
	return 0;
}

BError BoapPacket::setData(void* data, int nbytes){
	BError	err;
	
	resize(nbytes);
	memcpy(odata, data, nbytes);
	return err;
}

int BoapPacket::nbytes(){
#if IS_BIG_ENDIAN
	return bswap_32(((BoapPacketHead*)odata)->length);
#else
	return ((BoapPacketHead*)odata)->length;
#endif
}

char* BoapPacket::data(){
	return odata;
}

void BoapPacket::updateLen(){
#if IS_BIG_ENDIAN
	((BoapPacketHead*)odata)->length = bswap_32(opos);
#else
	((BoapPacketHead*)odata)->length = opos;
#endif
}

UInt32 BoapPacket::getCmd(){
#if IS_BIG_ENDIAN
	return bswap_32(((BoapPacketHead*)odata)->cmd);
#else
	return ((BoapPacketHead*)odata)->cmd;
#endif	
}

inline void BoapPacket::copyWithSwap(void* dst, const void* src, UInt32 nBytes, char* swapType){
#if IS_BIG_ENDIAN
	char*	d = (char*)dst;
	char*	s = (char*)src;
	
//	printf("BoapPacket::copyWithSwap: BIG_ENDIAN: swapType: %s\n", swapType);
	if(swapType[1] == '\0'){
		switch(swapType[0]){
		case '8':
			while(nBytes){
				swap64(d, s);
				d += 8;
				s += 8;
				nBytes -= 8;
			}
			break;

		case '4':
			while(nBytes){
				swap32(d, s);
				d += 4;
				s += 4;
				nBytes -= 4;
			}
			break;

		case '2':
		{
			while(nBytes){
				swap16(d, s);
				d += 2;
				s += 2;
				nBytes -= 2;
			}
			break;
		}

		case '1':
		default:
			memcpy(dst, src, nBytes);
			break;
		}
	}
	else {
		char*	t = swapType;
		char*	d = (char*)dst;
		char*	s = (char*)src;
		int	l = 0;
		
		while(nBytes){
			switch(*t){
			case '8':	swap64(d, s);		l = 8;	break;
			case '4':	swap32(d, s);		l = 4;	break;
			case '2':	swap16(d, s);		l = 2;	break;
			case '1':	swap8(d, s);		l = 1;	break;
			default:	swap8(d, s);		l = 1;	break;
			}
			
			d += l;
			s += l;
			nBytes -= l;
			t++;
			if(*t == '\0')
				t = swapType;
		}
	}
#else
	memcpy(dst, src, nBytes);
#endif
}

int BoapPacket::pushHead(BoapPacketHead& head){
	opos = 0;
	resize(opos + sizeof(head));
	copyWithSwap(&odata[opos], &head, sizeof(head), "4");
	opos += sizeof(head);
	updateLen();
	return 0;
}

int BoapPacket::push(Int8 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "1");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt8 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "1");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(Int16 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "2");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt16 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "2");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(Int32 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "4");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt32 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "4");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(Int64 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "8");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(UInt64 v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "8");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(const BString& v){
	Int32	l = v.len() + 1;
	
	resize(opos + sizeof(l) + l);
	copyWithSwap(&odata[opos], &l, sizeof(l), "4");
	opos += sizeof(l);
	copyWithSwap(&odata[opos], v.retStr(), l, "1");
	opos += l;
	updateLen();
	return 0;
}

int BoapPacket::push(Double v){
	resize(opos + sizeof(v));
	copyWithSwap(&odata[opos], &v, sizeof(v), "8");
	opos += sizeof(v);
	updateLen();
	return 0;
}

int BoapPacket::push(const BError& v){
	Int32	n = v.getErrorNo();
	BString	s = v.getString();
	push(n);
	push(s);
	return 0;
}

int BoapPacket::push(UInt32 nBytes, const void* data, char* swapType){
	resize(opos + nBytes);
	copyWithSwap(&odata[opos], data, nBytes, swapType);
	opos += nBytes;
	updateLen();
	return 0;
}


int BoapPacket::peekHead(BoapPacketHead& head){
	opos = 0;
	copyWithSwap(&head, &odata[opos], sizeof(head), "4");
	return 0;
}

int BoapPacket::popHead(BoapPacketHead& head){
	opos = 0;
	copyWithSwap(&head, &odata[opos], sizeof(head), "4");
	opos += sizeof(head);
	return 0;
}

int BoapPacket::pop(Int8& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "1");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt8& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "1");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(Int16& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "2");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt16& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "2");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(Int32& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "4");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt32& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "4");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(Int64& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "8");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(UInt64& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "8");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(BString& v){
	Int32	l;
	
	copyWithSwap(&l, &odata[opos], sizeof(l), "4");
	opos += sizeof(l);
	v = (char*)&odata[opos];
	opos += l;
	return 0;
}

int BoapPacket::pop(Double& v){
	copyWithSwap(&v, &odata[opos], sizeof(v), "8");
	opos += sizeof(v);
	return 0;
}

int BoapPacket::pop(BError& v){
	Int32	n;
	BString	s;
	pop(n);
	pop(s);
	v.set(n, s);
	return 0;
}

int BoapPacket::pop(UInt32 nBytes, void* data, char* swapType){
	copyWithSwap(data, &odata[opos], nBytes, swapType);
	opos += nBytes;
	return 0;
}



/*******************************************************************************
 *	Boap Client communications object
 *******************************************************************************
 */

/*******************************************************************************
 *	Base for all Client Classes
 *******************************************************************************
 */
BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){
	oconnected = 0;
	oservice = 0;
	oapiVersion = 0;
	opriority = BoapPriorityNormal;
	omaxLength = 0;
	otimeout = -1;
	oreconnect = 0;
}

BError BoapClientObject::connectService(BString name){
	BError			err;
	Boapns::BoapEntry	entry;
	BString			host = "localhost";
	BString			boapnsName;
	BSocketAddressINET	add;

	// Check if already connected to same service
	if(!oreconnect && (name == oname) && oconnected){
		return err;
	}
	
	// Check if a re-connect
	if(oconnected){
		oconnected = 0;
		close();
		init(STREAM);
	}

	oname = name;
	
	// Parse name
	if(name.subString(0, 2) == "//"){
		name.pullSeparators("/");
		host = name.pullToken("/");
	}

	if(name == "boapns"){
		if(!(err = add.set(host, "boapns", "tcp")) || !(err = add.set(host, boapPort))){
			if(! (err = connect(add))){
				oservice = 0;
				oconnected = 1;
			}
		}
	}
	else {
		Boapns::Boapns	boapns("boapns");
		BIter		i;

		// Lookup name
		boapnsName = BString("//") + host + "/boapns";

		if(! (err = boapns.connectService(boapnsName))){
			if(! (err = boapns.getEntry(name, entry))){
				// First try and connect to the host name
				dprintf("BoapClientObject::connectService: Try To Connect to: %s:%d\n", entry.hostName.retStr(), entry.port);
				if(! (err = add.set(entry.hostName, entry.port))){
					dprintf("BoapClientObject::connectService: Connect to hostname: %s:%d\n", entry.hostName.retStr(), entry.port);
					if(! (err = connect(add))){
						oservice = entry.service;
						oconnected = 1;
#if APIVERSION_TEST
						err = checkApiVersion();
#endif
					}
				}

				if(!oconnected){				
					// Try and connect on one of the available network addresses
					for(entry.addressList.end(i); !entry.addressList.isEnd(i); entry.addressList.prev(i)){
						if(! (err = add.set(entry.addressList[i], entry.port))){
							dprintf("BoapClientObject::connectService: Connect to ipaddress: %s:%d on %s\n", entry.hostName.retStr(), entry.port, entry.addressList[i].retStr());
							if(! (err = connect(add))){
								oservice = entry.service;
								oconnected = 1;
#if APIVERSION_TEST
								err = checkApiVersion();
#endif
								break;
							}
						}
					}
				}
			}
		}
		else {
			err.set(err.getErrorNo(), BString("Connection to BoapNs (") + boapnsName + ") Failed: " + err.getString());
		}
	}

	oreconnect = 0;
	dprintf("BoapClientObject::connectService: Status: %s\n", err.getString().retStr());

	return err;
}

BError BoapClientObject::disconnectService(){
	BError	err;

	if(oconnected){
		oconnected = 0;
		close();
		init(STREAM);
	}
	
	return err;
}

BString BoapClientObject::getServiceName(){
	return oname;
}

BError BoapClientObject::ping(BUInt32& apiVersion){
	BError		err;

	olock.lock();
	err = pingLocked(apiVersion);
	olock.unlock();

	return err;
}

BError BoapClientObject::pingLocked(BUInt32& apiVersion){
	BError		err;
	BError		ret;
	BoapPacketHead	txhead;
	BoapPacketHead	rxhead;

	if(err = connectService(oname)){
		return err;
	}

	txhead.cmd = 0;
	txhead.type = BoapMagic | BoapTypeRpc;
	txhead.service = oservice;
	otx.pushHead(txhead);
	if(err = performCall(otx, orx)){
		return err;
	}
	orx.popHead(rxhead);
	orx.pop(apiVersion);
	orx.pop(ret);

	return ret;
}

BError BoapClientObject::setConnectionPriority(BoapPriority priority){
	BError			err;
	BoapPacketHead		txhead;
	BoapPacketHead		rxhead;
	BSocket::Priority	pri;

	olock.lock();

	if(err = connectService(oname)){
		olock.unlock();
		return err;
	}

	opriority = priority;
	switch(opriority){
	case BoapPriorityLow:		pri = BSocket::PriorityLow;	break;
	case BoapPriorityNormal:	pri = BSocket::PriorityNormal;	break;
	case BoapPriorityHigh:		pri = BSocket::PriorityHigh;	break;
	default:			pri = BSocket::PriorityNormal;	break;
	}
	
	err = BSocket::setPriority(pri);
	if(!err){
		txhead.cmd = 1;
		txhead.type = BoapMagic | BoapTypeRpc;
		txhead.service = oservice;
		otx.pushHead(txhead);
		otx.push(priority);
		if(err = performCall(otx, orx)){
			return err;
		}
		orx.popHead(rxhead);
		orx.pop(err);
	}
	
	olock.unlock();
	
	return err;
}

void BoapClientObject::setMaxLength(BUInt32 maxLength){
	omaxLength = omaxLength;
}

void BoapClientObject::setTimeout(int timeout){
	otimeout = timeout;
}

BError BoapClientObject::checkApiVersion(){
	BError	err;
	BUInt32	apiVersion;
	
	if(! (err = pingLocked(apiVersion))){
#if DEBUG
		printf("CheckVersions: Local: %d == Rem: %d\n", oapiVersion, apiVersion);
#endif
		if(oapiVersion != apiVersion)
			err.set(1, BString("BOAP API versions incorrect: Local: ") + oapiVersion + " Remote: " + apiVersion);
	}
	
	return err;
}

BError BoapClientObject::performSend(BoapPacket& tx){
	BError			err;
	BSize			nb = tx.nbytes();
	char*			data = tx.data();
	BSize			n = 0;
	BSize			nd = 0;

	// Send packet
	while(n < nb){
		if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}

BError BoapClientObject::performRecv(BoapPacket& rx){
	BError			err;
	BSize			nb = 0;
	char*			data = 0;
	BSize			n = 0;
	BSize			nd = 0;
	BoapPacketHead		head;

	// Recieve Packet
	// Read Head first
	rx.resize(sizeof(BoapPacketHead));
	nb = sizeof(BoapPacketHead);
	data = rx.data();
	n = 0;
	while(n < nb){
#ifdef ZAP
		if(err = recv(&data[n], nb - n, nd)){
			return err;
		}
#else
		if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){
			oreconnect = 1;
			return err;
		}
#endif
		n = n + nd;
	}

	// Read rest of packet
	rx.popHead(head);

	// Validate packet
	if((head.type & 0xFFFFFF00) != BoapMagic)
		return err.set(1, "A non BOAP packet was received");

	if(omaxLength && (head.length > omaxLength))
		return err.set(1, "BOAP packet length was to large");

	rx.resize(head.length);
	nb = head.length - sizeof(BoapPacketHead);
	data = &rx.data()[sizeof(BoapPacketHead)];
	n = 0;
	while(n < nb){
#ifdef ZAP
		if(err = recv(&data[n], nb - n, nd)){
			return err;
		}
#else
		if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){
			oreconnect = 1;
			return err;
		}
#endif
		n = n + nd;
	}

	return err;
}

BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){
	BError			err;
	
	
	if(! (err = performSend(tx))){
		err = performRecv(rx);
	}
	
	if(!err && (tx.getCmd() != rx.getCmd()))
		err.set(1, BString("BOAP incorrect packet for command recieved: Expected: ") + tx.getCmd() + " Got: " + rx.getCmd());

	return err;
}

BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){
	setBroadCast(1);
}

BError BoapSignalObject::performSend(BoapPacket& tx){
	BError			err;
	BSize			nb = tx.nbytes();
	char*			data = tx.data();
	BSize			n = 0;
	BSize			nd = 0;
	BSocketAddressINET	nadd;

	nadd.set(INADDR_BROADCAST, 14000);
	
	// Send packet
	while(n < nb){
		if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}



/*******************************************************************************
 *	Boap Server communications object
 *******************************************************************************
 */
BoapServerConnection::BoapServerConnection(BoapServer& boapServer, int fd) 
	: oboapServer(boapServer), osocket(fd){
	omaxLength = 0;
}

BError BoapServerConnection::process(){
	BError			err;
	unsigned int		nb = 0;
	char*			data = 0;
	unsigned int		n = 0;
	unsigned int		nd = 0;
	BoapPacketHead		head;

	// Recieve Packet
	// Read Head first
	orx.resize(sizeof(BoapPacketHead));
	nb = sizeof(BoapPacketHead);
	data = orx.data();
	n = 0;
	while(n < nb){
		if(err = osocket.recv(&data[n], nb - n, nd, 0))
			return err;
		n = n + nd;
	}
	
	// Read rest of packet
	orx.peekHead(head);

	// Validate packet
	if((head.type & 0xFFFFFF00) != BoapMagic)
		return err.set(1, "A non BOAP packet was received");
	
	if(omaxLength && (head.length > omaxLength))
		return err.set(1, "BOAP packet length was to large");

	orx.resize(head.length);
	nb = head.length - sizeof(BoapPacketHead);
	data = &orx.data()[sizeof(BoapPacketHead)];
	n = 0;
	while(n < nb){
		if(err = osocket.recv(&data[n], nb - n, nd, 0))
			return err;
		n = n + nd;
	}

	dprintf("Command received: Type(%x) Len(%d) Service(%d) Cmd(%d)\n", head.type, head.length, head.service, head.cmd);

	oboapServer.process(this, orx, otx);

	// Send packet
	nb = otx.nbytes();
	data = otx.data();
	n = 0;
	while(n < nb){
		if(err = osocket.send(&data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}

BSocket& BoapServerConnection::getSocket(){
	return osocket;
}

void BoapServerConnection::setMaxLength(BUInt32 maxLength){
	omaxLength = maxLength;
}

void* BoapServerConnection::function(){
	BError	err;
	
	while(1){
		if(err = process()){
			dprintf("BoapServerConnection::function: Error: %s\n", err.getString().retStr());
			oboapServer.clientGone(this);
			return 0;
		}
	}
}

BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) {
	othreaded = 0;
	oisBoapns = 0;
	onet.setReuseAddress(1);
	oboapns = new Boapns::Boapns();
}

BoapServer::~BoapServer(){
	delete oboapns;
}

BError BoapServer::init(BString boapNsHost, int threaded, int isBoapns){
	BError			err;
	BIter			i;
	BoapService		s;
	BList<BString>		addressList;
	BSocketAddressINET	nadd;
	BString			boapNsObject = "boapns";
	BString			name;

	othreaded = threaded;
	oisBoapns = isBoapns;

	ohostName = BSocketAddressINET::getHostName();
	addressList = BSocketAddressINET::getIpAddressListAll();
		
	if(oisBoapns)
		nadd.set("", "boapns", "tcp");
	else
		nadd.set("", 0);

	// Create service socket
	if(err = onet.bind(nadd))
		return err;
	
	// Create event socket
	if(err = onet.getAddress(nadd))
		return err;
	if(err = onetEvent.bind(nadd))
		return err;

	// BOAP event broadcast address
	onetEventAddress.set("", "boapns", "udp");
	onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port());
	onetEvent.setBroadCast(1);

	if(!oisBoapns){
		if(boapNsHost != "")
			boapNsObject = BString("//") + boapNsHost + "/" + boapNsObject;
		if(err = oboapns->connectService(boapNsObject)){
			err.set(err.getErrorNo(), BString("Connection to BoapNs Failed: ") + err.getString());
			return err;
		}
			
		for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
			// If a name is not given, get one from the Boapns
			if(oservices[i].oobject->name() == ""){
				oboapns->getNewName(name);
				oservices[i].oobject->setName(name);
			}
			Boapns::BoapEntry	e = Boapns::BoapEntry(oservices[i].oobject->name(), ohostName, addressList, nadd.port(), s);
			if(err = oboapns->addEntry(e))
				break;
		}
	}
	return err;
}

void BoapServer::clientGone(BoapServerConnection* client){
	oclientGoneEvent.sendEvent(client->getSocket().getFd());
}

void* BoapServer::function(){
	run(0);
	return 0;
}

BError BoapServer::run(int inThread){
	BError			err;
	BError			e;
	BSocketAddressINET	nadd;
	int			fd;
	BIter			i;
	BoapServerConnection*	c;

	if(inThread){
		start();
		return err;
	}
	
	opoll.append(onet.getFd());
	opoll.append(oclientGoneEvent.getFd());
	opoll.append(onetEvent.getFd());
	onet.listen();
	
	while(1){
		opoll.doPoll(fd);
		if(fd == onet.getFd()){
			onet.accept(fd, nadd);
			dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr());
			oclients.append(c = new BoapServerConnection(*this, fd));
			if(othreaded){
				c->start();
			}
			else {
				opoll.append(fd);
			}
		}
		else if(fd == oclientGoneEvent.getFd()){
			oclientGoneEvent.getEvent(fd);
			dprintf("Client Gone Event: Fd: %d\n", fd);
			if(othreaded){
				for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){
					if(oclients[i]->getSocket().getFd() == fd){
						oclients[i]->waitForCompletion();
						delete oclients[i];
						oclients.del(i);
						close(fd);
						break;
					}
				}
			}
		}
		else if(fd == onetEvent.getFd()){
			processEvent(fd);
		}
		else if(fd >= 0){
			dprintf("Process Client: %d\n", fd);
			for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){
				if(oclients[i]->getSocket().getFd() == fd){
					if(e = oclients[i]->process()){
						dprintf("ClientGone: %d Error: %s\n", fd, e.getString().retStr());
						opoll.delFd(fd);
						delete oclients[i];
						oclients.del(i);
						close(fd);
					}
					break;
				}
			}
		}
	}
	
	return err;
}

BError BoapServer::addObject(BoapServiceObject* object){
	BError	err;

	oservices.append(BoapServiceEntry(oservices.number(), object));
	
	return err;
}

BError BoapServer::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError		err;
	BIter		i;
	BoapPacketHead	head;
	
	rx.popHead(head);
	for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){
		if(oservices[i].oservice == head.service){
			err = oservices[i].oobject->process(conn, rx, tx);
			break;
		}
	}
	return err;
}

BError BoapServer::processEvent(BoapPacket& rx){
	BError	err;
	BIter	i;
	int	s;
	
	dprintf("BoapServer::processEvent\n");
	for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
		err = oservices[i].oobject->processEvent(rx);
	}
	
	return err;
}

BError BoapServer::processEvent(int fd){
	BError			err;
	int			nb = 0;
	BoapPacketHead		head;
	BoapPacket		rx;

	// Recieve Packet
	rx.resize(1500);
	if((nb = recv(fd, rx.data(), 1500, 0)) < 0)
		return BError(errno, strerror(errno));

	rx.popHead(head);
	dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd);

	return processEvent(rx);
}

BError BoapServer::sendEvent(BoapPacket& tx){
	BError			err;
	int			nb = 0;
	char*			data = 0;
	int			n = 0;
	uint32_t		nsent;

	// Send packet
	nb = tx.nbytes();
	data = tx.data();
	n = 0;
	while(n < nb){
		dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr());
		err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent);
		dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr());
		if(err)
			return err;
		n = n + nsent;
	}
	return err;
}

BSocket& BoapServer::getSocket(){
	return onet;
}

BSocket& BoapServer::getEventSocket(){
	return onetEvent;
}

BString	BoapServer::getHostName(){
	return ohostName;
}

int BoapServer::getConnectionsNumber(){
	return oclients.number();
}

/*******************************************************************************
 *	Base for all Server Objects
 *******************************************************************************
 */
BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) :
	ocmd(cmd), ofunc(func){
}

BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){
	oapiVersion = 0;
	oserver.addObject(this);
	ofuncList.append(BoapFuncEntry(0, &BoapServiceObject::doPing));
	ofuncList.append(BoapFuncEntry(1, &BoapServiceObject::doConnectionPriority));
}

BoapServiceObject::~BoapServiceObject(){
}

BError BoapServiceObject::setName(BString name){
	BError	err;
	
	oname = name;
	
	return err;
}
	
BString	BoapServiceObject::name(){
	return oname;
}

BError	BoapServiceObject::doPing(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		rxhead;
	BoapPacketHead		txhead;

	rx.popHead(rxhead);
	
	txhead.type = BoapMagic | BoapTypeRpcReply;
	txhead.service = rxhead.service;
	txhead.cmd = rxhead.cmd;
	tx.pushHead(txhead);
	tx.push(oapiVersion);
	tx.push(err);
	
	return err;
}

BError	BoapServiceObject::doConnectionPriority(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		rxhead;
	BoapPacketHead		txhead;
	BUInt32			priority;
	BSocket::Priority	pri;
	
	rx.popHead(rxhead);
	rx.pop(priority);

	switch(priority){
	case BoapPriorityLow:		pri = BSocket::PriorityLow;	break;
	case BoapPriorityNormal:	pri = BSocket::PriorityNormal;	break;
	case BoapPriorityHigh:		pri = BSocket::PriorityHigh;	break;
	default:			pri = BSocket::PriorityNormal;	break;
	}

	conn->getSocket().setPriority(pri);
	
	txhead.type = BoapMagic | BoapTypeRpcReply;
	txhead.service = rxhead.service;
	txhead.cmd = rxhead.cmd;
	tx.pushHead(txhead);
	tx.push(err);
	
	return err;
}

BError BoapServiceObject::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		head;
	BIter			i;

	rx.popHead(head);
	for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){
		if(ofuncList[i].ocmd == head.cmd){
			err = (this->*ofuncList[i].ofunc)(conn, rx, tx);
			break;
		}
	}

	return err;
}

BError BoapServiceObject::sendEvent(BString name, Int32 arg){
	BError		err;
	BoapPacketHead	txhead;
	BoapPacket	tx;
	BString		fullName = BString("//") + oserver.getHostName() + "/" + oname;
	
	txhead.type = BoapMagic | BoapTypeSignal;
	txhead.service = 0;
	txhead.cmd = 0;
	tx.pushHead(txhead);
	tx.push(fullName);
	tx.push(name);
	tx.push(arg);
	if(err = sendEvent(tx))
		return err;

	return err;
}

BError BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg){
	BError		err;
	
	dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg)\n");
	return err;
}

BError BoapServiceObject::sendEvent(BoapPacket& tx){
	return oserver.sendEvent(tx);
}

BError BoapServiceObject::processEvent(BoapPacket& rx){
	BError		err;
	BoapPacketHead	rxhead;
	BError		ret;
	BString		objectName;
	BString		signalName;
	Int32		arg;

	dprintf("BoapServiceObject::processEvent\n");
	rx.popHead(rxhead);
	rx.pop(objectName);
	rx.pop(signalName);
	rx.pop(arg);
	ret = processEvent(objectName, signalName, arg);
	return err;
}