RSS Git Download  Clone
Raw Blame History
/*******************************************************************************
 *	Boap.cc	Boap RPC protocol
 *			T.Barnaby,	BEAM Ltd,	8/5/03
 *	updated by	D.Korchagin,	CERN AB-BI-SW,	2007-08-31
 *
 * History:
 *	4/2/05:		Updated addressing scheme
 *******************************************************************************
 */
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <Boap.h>

#ifndef __Lynx__
#include <byteswap.h>
#else
static inline int bswap_32(int i){
	return (i << 24) | ((i & 0xff00) << 8) | ((i >> 8) & 0xff00) | (i >> 24);
}
#endif

// Boap NS
#include <BoapnsD.h>
#include <BoapnsC.h>

#undef DEBUG
#define	DEBUG			0

#if DEBUG
#include <BDebug.h>
#endif

#define APIVERSION_TEST		1		// Tests the API version during a connect

#if DEBUG
#define	dprintf(fmt, a...)       printf(fmt, ##a);
#else
#define	dprintf(fmt, a...)
#endif

#if __BYTE_ORDER == __BIG_ENDIAN
#define IS_BIG_ENDIAN	1
#endif


const int boapPort = 12000;		///< The default BOAP connection port


BoapPacket::BoapPacket(){
}
BoapPacket::~BoapPacket(){
}

void BoapPacket::updateHead(){
#if IS_BIG_ENDIAN
	((BoapPacketHead*)odata)->length = bswap_32(opos);
#else
	((BoapPacketHead*)odata)->length = opos;
#endif
}

BUInt32 BoapPacket::getCmd(){
#if IS_BIG_ENDIAN
	return bswap_32(((BoapPacketHead*)odata)->cmd);
#else
	return ((BoapPacketHead*)odata)->cmd;
#endif	
}

int BoapPacket::pushHead(BoapPacketHead& head){
	setSize(sizeof(head));
#if IS_BIG_ENDIAN
	bswap_copy(1, &head, odata, sizeof(head), "4");
#else
	memcpy(odata, &head, sizeof(head));
#endif
	opos = sizeof(head);
	return 0;
}

int BoapPacket::peekHead(BoapPacketHead& head){
#if IS_BIG_ENDIAN
	bswap_copy(1, odata, &head, sizeof(head), "4");
#else
	memcpy(&head, odata, sizeof(head));
#endif
	return 0;
}

int BoapPacket::popHead(BoapPacketHead& head){
#if IS_BIG_ENDIAN
	bswap_copy(1, odata, &head, sizeof(head), "4");
#else
	memcpy(&head, odata, sizeof(head));
#endif
	opos = sizeof(head);
	return 0;
}


/*******************************************************************************
 *	Boap Client communications object
 *******************************************************************************
 */

/*******************************************************************************
 *	Base for all Client Classes
 *******************************************************************************
 */
BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){
	oconnected = 0;
	oservice = 0;
	oapiVersion = 0;
	opriority = BoapPriorityNormal;
	omaxLength = 0;
	otimeout = -1;
	oreconnect = 0;
}

BoapClientObject::~BoapClientObject(){
}

BError BoapClientObject::connectService(BString name){
	BError			err;
	Boapns::BoapEntry	entry;
	BString			host = "";
	BString			thisHostName;
	BString			boapnsName;
	BSocketAddressINET	add;

	dprintf("BoapClientObject::connectService: %s\n", name.retStr());

	// Check if already connected to same service
	if(!oreconnect && (name == oname) && oconnected){
		dprintf("BoapClientObject::connectService: Return already connected: %s\n", name.retStr());
		return err;
	}
	
	// Check if a re-connect
	if(oconnected){
		oconnected = 0;
		close();
		init(STREAM);
	}

	thisHostName = BSocketAddressINET::getHostName();
	
	oname = name;
	
	// Parse name
	if(name.subString(0, 2) == "//"){
		name.pullSeparators("/");
		host = name.pullToken("/");
	}

	if(name == "boapns"){
		if(!(err = add.set(host, "boapns", "tcp")) || !(err = add.set(host, boapPort))){
			if(! (err = connect(add))){
				oservice = 0;
				oconnected = 1;
			}
		}
	}
	else {
		Boapns::Boapns	boapns("boapns");
		BIter		i;

		// Lookup name
		boapnsName = BString("//") + host + "/boapns";

		if(! (err = boapns.connectService(boapnsName))){
			if(! (err = boapns.getEntry(name, entry))){
				// First try and connect to the host name
				dprintf("BoapClientObject::connectService: Try To Connect to: %s:%d from: %s\n", entry.hostName.retStr(), entry.port, thisHostName.retStr());
				if(! (err = add.set(entry.hostName, entry.port))){
					dprintf("BoapClientObject::connectService: Connect to hostname: %s:%d\n", entry.hostName.retStr(), entry.port);
					if(! (err = connect(add))){
						oservice = entry.service;
						oconnected = 1;
						dprintf("BoapClientObject::connectService: TCP/IP connected\n");

#if APIVERSION_TEST
						err = checkApiVersion();
#endif
						dprintf("BoapClientObject::connectService: ApiVersion checked: %s\n", err.getString().retStr());
					}
				}

				if(!oconnected){				
					// Try and connect on one of the available network addresses
					for(entry.addressList.end(i); !entry.addressList.isEnd(i); entry.addressList.prev(i)){
						if(! (err = add.set(entry.addressList[i], entry.port))){
							dprintf("BoapClientObject::connectService: Connect to ipaddress: %s:%d on %s\n", entry.hostName.retStr(), entry.port, entry.addressList[i].retStr());

							// Ignore localhost entry if not the same host
							if((entry.hostName != thisHostName) && (entry.addressList[i].compareWild("127.*")))
								continue;

							if(! (err = connect(add))){
								oservice = entry.service;
								oconnected = 1;
#if APIVERSION_TEST
								err = checkApiVersion();
#endif
								break;
							}
						}
					}
				}
			}
		}
		else {
			err.set(err.getErrorNo(), BString("Connection to BoapNs (") + boapnsName + ") Failed: " + err.getString());
		}
	}

	oreconnect = 0;
	dprintf("BoapClientObject::connectService: End: Status: %s\n", err.getString().retStr());

	return err;
}

BError BoapClientObject::disconnectService(){
	BError	err;

	if(oconnected){
		oconnected = 0;
		close();
		init(STREAM);
	}
	
	return err;
}

BString BoapClientObject::getServiceName(){
	return oname;
}

BError BoapClientObject::ping(BUInt32& apiVersion){
	BError		err;

	olock.lock();
	err = pingLocked(apiVersion);
	olock.unlock();

	return err;
}

BError BoapClientObject::pingLocked(BUInt32& apiVersion){
	BError		err;
	BError		ret;
	BoapPacketHead	txhead;
	BoapPacketHead	rxhead;

	if(err = connectService(oname)){
		return err;
	}

	txhead.cmd = 0;
	txhead.type = BoapMagic | BoapTypeRpc;
	txhead.service = oservice;
	otx.pushHead(txhead);
	if(err = performCall(otx, orx)){
		return err;
	}
	orx.popHead(rxhead);
	orx.pop(ret);
	if((rxhead.type & 0xFF) == BoapTypeRpcReply){
		orx.pop(apiVersion);
	}

	return ret;
}

BError BoapClientObject::setConnectionPriority(BoapPriority priority){
	BError			err;
	BoapPacketHead		txhead;
	BoapPacketHead		rxhead;
	BSocket::Priority	pri;

	olock.lock();

	if(err = connectService(oname)){
		olock.unlock();
		return err;
	}

	opriority = priority;
	switch(opriority){
	case BoapPriorityLow:		pri = BSocket::PriorityLow;	break;
	case BoapPriorityNormal:	pri = BSocket::PriorityNormal;	break;
	case BoapPriorityHigh:		pri = BSocket::PriorityHigh;	break;
	default:			pri = BSocket::PriorityNormal;	break;
	}
	
	err = BSocket::setPriority(pri);
	if(!err){
		txhead.cmd = 1;
		txhead.type = BoapMagic | BoapTypeRpc;
		txhead.service = oservice;
		otx.pushHead(txhead);
		otx.push(priority);
		if(err = performCall(otx, orx)){
			return err;
		}
		orx.popHead(rxhead);
		orx.pop(err);
	}
	
	olock.unlock();
	
	return err;
}

void BoapClientObject::setMaxLength(BUInt32 maxLength){
	omaxLength = omaxLength;
}

void BoapClientObject::setTimeout(int timeout){
	otimeout = timeout;
}

BError BoapClientObject::checkApiVersion(){
	BError	err;
	BUInt32	apiVersion;
	
	dprintf("BoapClientObject::checkApiVersion\n");
	if(! (err = pingLocked(apiVersion))){
#if DEBUG
		printf("CheckVersions: Local: %d == Rem: %d\n", oapiVersion, apiVersion);
#endif
		if(oapiVersion != apiVersion)
			err.set(ErrorMisc, BString("BOAP API versions incorrect: Local: ") + oapiVersion + " Remote: " + apiVersion);
	}
	dprintf("BoapClientObject::checkApiVersion: End\n");
	
	return err;
}

BError BoapClientObject::performSend(BoapPacket& tx){
	BError			err;
	BSize			nb = tx.size();
	char*			data = tx.data();
	BSize			n = 0;
	BSize			nd = 0;

	// Send packet
	while(n < nb){
		if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}

BError BoapClientObject::performRecv(BoapPacket& rx){
	BError			err;
	BSize			nb = 0;
	char*			data = 0;
	BSize			n = 0;
	BSize			nd = 0;
	BoapPacketHead		head;

	// Recieve Packet
	// Read Head first
	rx.resize(sizeof(BoapPacketHead));
	nb = sizeof(BoapPacketHead);
	data = rx.data();
	n = 0;
	while(n < nb){
#ifdef ZAP
		if(err = recv(&data[n], nb - n, nd)){
			return err;
		}
#else
		if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){
			oreconnect = 1;
			return err;
		}
#endif
		n = n + nd;
	}

	// Read rest of packet
	rx.popHead(head);

	// Validate packet
	if((head.type & 0xFFFFFF00) != BoapMagic)
		return err.set(ErrorMisc, "A non BOAP packet was received");

	if(omaxLength && (head.length > omaxLength))
		return err.set(ErrorMisc, "BOAP packet length was to large");

	rx.resize(head.length);
	nb = head.length - sizeof(BoapPacketHead);
	data = &rx.data()[sizeof(BoapPacketHead)];
	n = 0;
	while(n < nb){
#ifdef ZAP
		if(err = recv(&data[n], nb - n, nd)){
			return err;
		}
#else
		if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){
			oreconnect = 1;
			return err;
		}
#endif
		n = n + nd;
	}

	return err;
}

BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){
	BError			err;

	tx.updateHead();
	while(1){
		if(! (err = performSend(tx))){
			err = performRecv(rx);
		}

		// Handle a disconnection by calling handleReconnect() if implemented
		if(err == -EPIPE){
			BoapPacket	ltx;
			
			// Save command
			ltx.setSize(tx.size());
			ltx.writeData(0, tx.data(), tx.size());
			olock.unlock();

			// Call handler
			disconnectService();
			oreconnect = 0;
			err = handleReconnect(err);
			
			// Resume command
			olock.lock();
			tx.setSize(ltx.size());
			tx.writeData(0, ltx.data(), ltx.size());

			if(err)
				break;
		}
		else {
			break;
		}
	}
	
	if(!err && (tx.getCmd() != rx.getCmd()))
		err.set(ErrorMisc, BString("BOAP incorrect packet for command recieved: Expected: ") + tx.getCmd() + " Got: " + rx.getCmd());

	return err;
}

BError BoapClientObject::handleReconnect(BError err){
	return err;
}


BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){
	setBroadCast(1);
}

BError BoapSignalObject::performSend(BoapPacket& tx){
	BError			err;
	BSize			nb = tx.size();
	char*			data = tx.data();
	BSize			n = 0;
	BSize			nd = 0;
	BSocketAddressINET	nadd;

	nadd.set(INADDR_BROADCAST, 14000);
	
	// Send packet
	while(n < nb){
		if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}

	return err;
}

/*******************************************************************************
 *	Boap Server communications object
 *******************************************************************************
 */
BoapServerConnection::BoapServerConnection(BoapServer& boapServer, int fd) 
	: oboapServer(boapServer), osocket(fd){
	omaxLength = 0;
}

BoapServerConnection::~BoapServerConnection(){
}

BError BoapServerConnection::init(){
	BError	err;
	
	return err;
}

BError BoapServerConnection::process(){
	BError			err;
	unsigned int		nb = 0;
	char*			data = 0;
	unsigned int		n = 0;
	BSize			nd = 0;
	BoapPacketHead		head;

	// Recieve Packet
	// Read Head first
	orx.resize(sizeof(BoapPacketHead));
	nb = sizeof(BoapPacketHead);
	data = orx.data();
	n = 0;
	while(n < nb){
		if(err = osocket.recv(&data[n], nb - n, nd, 0))
			return err;
		n = n + nd;
	}
	
	// Read rest of packet
	orx.peekHead(head);

#if DEBUG
	fprintf(stderr, "PacketHead: "); hd8(&head, sizeof(head));
#endif

	// Validate packet
	if((head.type & 0xFFFFFF00) != BoapMagic){
		return err.set(ErrorMisc, "A non BOAP packet was received");
	}
	
	if(omaxLength && (head.length > omaxLength))
		return err.set(ErrorMisc, "BOAP packet length was to large");

	orx.resize(head.length);
	nb = head.length - sizeof(BoapPacketHead);
	data = &orx.data()[sizeof(BoapPacketHead)];
	n = 0;
	while(n < nb){
		if(err = osocket.recv(&data[n], nb - n, nd, 0))
			return err;
		n = n + nd;
	}

	dprintf("Command received: Type(%x) Len(%d) Service(%d) Cmd(%d)\n", head.type, head.length, head.service, head.cmd);

	if((head.cmd != 0) && (err = validate())){
		BoapPacketHead		txHead;

		dprintf("Command: return validation error\n");
		txHead.type = BoapMagic | BoapTypeRpcError;
		txHead.service = head.service;
		txHead.cmd = head.cmd;
		otx.pushHead(txHead);
		otx.push(err);
	}
	else {
		oboapServer.process(this, orx, otx);
	}

	// Send packet
	dprintf("Command reply: Len(%d)\n", otx.size());
	otx.updateHead();
	nb = otx.size();
	data = otx.data();
	n = 0;
	while(n < nb){
		dprintf("Command send: Len(%d)\n", nb - n);
		if(err = osocket.send(&data[n], nb - n, nd, MSG_NOSIGNAL))
			return err;
		n = n + nd;
	}
	dprintf("Command reply: End\n");

	oboapServer.onumOperations++;

	return err;
}

BSocket& BoapServerConnection::getSocket(){
	return osocket;
}

void BoapServerConnection::setMaxLength(BUInt32 maxLength){
	omaxLength = maxLength;
}

BError BoapServerConnection::getHead(BoapPacketHead& head){
	BError	err;

	orx.peekHead(head);	
	
	return err;
}

BError BoapServerConnection::validate(){
	BError	err;
	
	return err;
}

void* BoapServerConnection::function(){
	BError	err;
	
	while(1){
		if(err = process()){
			dprintf("BoapServerConnection::function: Error: %s\n", err.getString().retStr());
			oboapServer.clientGone(this);
			return 0;
		}
	}
}

BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) {
	othreaded = 0;
	oisBoapns = 0;
	onumOperations = 0;
	onet.setReuseAddress(1);
	oboapns = new Boapns::Boapns();
}

BoapServer::~BoapServer(){
	delete oboapns;
}

BError BoapServer::init(BString boapNsHost, int port, int threaded, int isBoapns){
	BError			err;
	BIter			i;
	BoapService		s;
	BList<BString>		addressList;
	BSocketAddressINET	nadd;
	BString			boapNsObject = "boapns";
	BString			name;

	othreaded = threaded;
	oisBoapns = isBoapns;

	ohostName = BSocketAddressINET::getHostName();
	addressList = BSocketAddressINET::getIpAddressListAll();
		
	if(oisBoapns)
		nadd.set("", "boapns", "tcp");
	else
		nadd.set("", port);

	// Create service socket
	if(err = onet.bind(nadd))
		return err;
	
	// Create event socket
	if(err = onet.getAddress(nadd))
		return err;
	if(err = onetEvent.bind(nadd))
		return err;

	// BOAP event broadcast address
	onetEventAddress.set("", "boapns", "udp");
	onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port());
	onetEvent.setBroadCast(1);

	if(!oisBoapns){
		if(boapNsHost != "")
			boapNsObject = BString("//") + boapNsHost + "/" + boapNsObject;
		if(err = oboapns->connectService(boapNsObject)){
			err.set(err.getErrorNo(), BString("Connection to BoapNs Failed: ") + err.getString());
			return err;
		}
			
		for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
			// If a name is not given, get one from the Boapns
			if(oservices[i].oobject->name() == ""){
				oboapns->getNewName(name);
				oservices[i].oobject->setName(name);
			}
			Boapns::BoapEntry	e = Boapns::BoapEntry(oservices[i].oobject->name(), ohostName, addressList, nadd.port(), s);
			if(err = oboapns->addEntry(e))
				break;
		}
	}
	return err;
}

void BoapServer::clientGone(BoapServerConnection* client){
	oclientGoneEvent.sendEvent(client->getSocket().getFd());
}

void* BoapServer::function(){
	run(0);
	return 0;
}

BError BoapServer::run(int inThread){
	BError			err;
	BError			e;
	BSocketAddressINET	nadd;
	int			fd;
	BIter			i;
	BoapServerConnection*	c;

	if(inThread){
		start();
		return err;
	}
	
	opoll.append(onet.getFd());
	opoll.append(oclientGoneEvent.getFd());
	opoll.append(onetEvent.getFd());
	onet.listen();
	
	while(1){
		opoll.doPoll(fd);
		if(fd == onet.getFd()){
			onet.accept(fd, nadd);
			dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr());
			c = newConnection(fd, nadd);
			if(e = c->init()){
				delete c;
			}

			oclients.append(c);
			if(othreaded){
				c->start();
			}
			else {
				opoll.append(fd);
			}
		}
		else if(fd == oclientGoneEvent.getFd()){
			oclientGoneEvent.getEvent(fd);
			dprintf("Client Gone Event: Fd: %d\n", fd);
			if(othreaded){
				for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){
					if(oclients[i]->getSocket().getFd() == fd){
						oclients[i]->waitForCompletion();
						delete oclients[i];
						oclients.del(i);
						break;
					}
				}
			}
		}
		else if(fd == onetEvent.getFd()){
			processEvent(fd);
		}
		else if(fd >= 0){
			dprintf("Process Client: %d\n", fd);
			for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){
				if(oclients[i]->getSocket().getFd() == fd){
					if(e = oclients[i]->process()){
						dprintf("ClientGone: %d Error: %s\n", fd, e.getString().retStr());
						opoll.delFd(fd);
						delete oclients[i];
						oclients.del(i);
					}
					break;
				}
			}
		}
	}
	
	return err;
}

BError BoapServer::addObject(BoapServiceObject* object){
	BError	err;

	oservices.append(BoapServiceEntry(oservices.number(), object));
	
	return err;
}

BError BoapServer::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError		err;
	BIter		i;
	BoapPacketHead	head;
	
	rx.popHead(head);
	for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){
		if(oservices[i].oservice == head.service){
			err = oservices[i].oobject->process(conn, rx, tx);
			break;
		}
	}
	return err;
}

BError BoapServer::processEvent(BoapPacket& rx){
	BError	err;
	BIter	i;
	int	s;
	
	dprintf("BoapServer::processEvent\n");
	for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
		err = oservices[i].oobject->processEvent(rx);
	}
	
	return err;
}

BError BoapServer::processEvent(int fd){
	BError			err;
	int			nb = 0;
	BoapPacketHead		head;
	BoapPacket		rx;

	// Recieve Packet
	rx.resize(1500);
	if((nb = recv(fd, rx.data(), 1500, 0)) < 0)
		return BError(errno, strerror(errno));

	rx.popHead(head);
	dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd);

	return processEvent(rx);
}

BError BoapServer::sendEvent(BoapPacket& tx){
	BError			err;
	int			nb = 0;
	char*			data = 0;
	int			n = 0;
	BSize			nsent;

	// Send packet
	nb = tx.size();
	data = tx.data();
	n = 0;
	while(n < nb){
		dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr());
		err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent);
		dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr());
		if(err)
			return err;
		n = n + nsent;
	}
	return err;
}

BSocket& BoapServer::getSocket(){
	return onet;
}

BSocket& BoapServer::getEventSocket(){
	return onetEvent;
}

BString	BoapServer::getHostName(){
	return ohostName;
}

int BoapServer::getConnectionsNumber(){
	return oclients.number();
}

BoapServerConnection* BoapServer::newConnection(int fd, BSocketAddressINET address){
	return new BoapServerConnection(*this, fd);
}

/*******************************************************************************
 *	Base for all Server Objects
 *******************************************************************************
 */
BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) :
	ocmd(cmd), ofunc(func){
}

BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){
	oapiVersion = 0;
	oserver.addObject(this);
	ofuncList.append(BoapFuncEntry(0, &BoapServiceObject::doPing));
	ofuncList.append(BoapFuncEntry(1, &BoapServiceObject::doConnectionPriority));
}

BoapServiceObject::~BoapServiceObject(){
}

BError BoapServiceObject::setName(BString name){
	BError	err;
	
	oname = name;
	
	return err;
}
	
BString	BoapServiceObject::name(){
	return oname;
}

BError	BoapServiceObject::doPing(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		rxhead;
	BoapPacketHead		txhead;

	rx.popHead(rxhead);
	
	txhead.type = BoapMagic | BoapTypeRpcReply;
	txhead.service = rxhead.service;
	txhead.cmd = rxhead.cmd;
	tx.pushHead(txhead);
	tx.push(err);
	tx.push(oapiVersion);
	
	return err;
}

BError	BoapServiceObject::doConnectionPriority(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		rxhead;
	BoapPacketHead		txhead;
	BUInt32			priority;
	BSocket::Priority	pri;
	
	rx.popHead(rxhead);
	rx.pop(priority);

	switch(priority){
	case BoapPriorityLow:		pri = BSocket::PriorityLow;	break;
	case BoapPriorityNormal:	pri = BSocket::PriorityNormal;	break;
	case BoapPriorityHigh:		pri = BSocket::PriorityHigh;	break;
	default:			pri = BSocket::PriorityNormal;	break;
	}

	conn->getSocket().setPriority(pri);
	
	txhead.type = BoapMagic | BoapTypeRpcReply;
	txhead.service = rxhead.service;
	txhead.cmd = rxhead.cmd;
	tx.pushHead(txhead);
	tx.push(err);
	
	return err;
}

BError BoapServiceObject::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
	BError			err;
	BoapPacketHead		head;
	BIter			i;

	rx.popHead(head);
	for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){
		if(ofuncList[i].ocmd == head.cmd){
			err = (this->*ofuncList[i].ofunc)(conn, rx, tx);
			break;
		}
	}

	return err;
}

BError BoapServiceObject::sendEvent(BString name, BInt32 arg){
	BError		err;
	BoapPacketHead	txhead;
	BoapPacket	tx;
	BString		fullName = BString("//") + oserver.getHostName() + "/" + oname;
	
	txhead.type = BoapMagic | BoapTypeSignal;
	txhead.service = 0;
	txhead.cmd = 0;
	tx.pushHead(txhead);
	tx.push(fullName);
	tx.push(name);
	tx.push(arg);
	if(err = sendEvent(tx))
		return err;

	return err;
}

BError BoapServiceObject::processEvent(BString objectName, BString signalName, BInt32 arg){
	BError		err;
	
	dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, BInt32 arg)\n");
	return err;
}

BError BoapServiceObject::sendEvent(BoapPacket& tx){
	return oserver.sendEvent(tx);
}

BError BoapServiceObject::processEvent(BoapPacket& rx){
	BError		err;
	BoapPacketHead	rxhead;
	BError		ret;
	BString		objectName;
	BString		signalName;
	BInt32		arg;

	dprintf("BoapServiceObject::processEvent\n");
	rx.popHead(rxhead);
	rx.pop(objectName);
	rx.pop(signalName);
	rx.pop(arg);
	ret = processEvent(objectName, signalName, arg);
	return err;
}