/******************************************************************************* * Boap.cc Boap RPC protocol * T.Barnaby, BEAM Ltd, 8/5/03 * updated by D.Korchagin, CERN AB-BI-SW, 2007-08-31 * * History: * 4/2/05: Updated addressing scheme ******************************************************************************* */ #include #include #include #include #include #include #include #include #ifndef __Lynx__ #include #else static inline int bswap_32(int i){ return (i << 24) | ((i & 0xff00) << 8) | ((i >> 8) & 0xff00) | (i >> 24); } #endif // Boap NS #include #include #define DEBUG 0 #define APIVERSION_TEST 1 // Tests the API version during a connect #if DEBUG #define dprintf(fmt, a...) printf(fmt, ##a); #else #define dprintf(fmt, a...) #endif #if __BYTE_ORDER == __BIG_ENDIAN #define IS_BIG_ENDIAN 1 #endif const int boapPort = 12000; ///< The default BOAP connection port const int roundSize = 256; static inline void swap8(char* d, char* s){ *d = *s; } static inline void swap16(char* d, char* s){ d[1] = s[0]; d[0] = s[1]; } static inline void swap32(char* d, char* s){ d[3] = s[0]; d[2] = s[1]; d[1] = s[2]; d[0] = s[3]; } static inline void swap64(char* d, char* s){ d[7] = s[0]; d[6] = s[1]; d[5] = s[2]; d[4] = s[3]; d[3] = s[4]; d[2] = s[5]; d[1] = s[6]; d[0] = s[7]; } BoapPacket::BoapPacket(){ opos = 0; osize = roundSize; odata = (char*)malloc(osize); } BoapPacket::~BoapPacket(){ delete odata; } int BoapPacket::resize(int size){ if(size > osize){ // printf("BoapPacket::resize: from %d to %d\n", osize, size); osize = roundSize * ((size + roundSize - 1) / roundSize); odata = (char*)realloc(odata, osize); } return 0; } BError BoapPacket::setData(void* data, int nbytes){ BError err; resize(nbytes); memcpy(odata, data, nbytes); return err; } int BoapPacket::nbytes(){ #if IS_BIG_ENDIAN return bswap_32(((BoapPacketHead*)odata)->length); #else return ((BoapPacketHead*)odata)->length; #endif } char* BoapPacket::data(){ return odata; } void BoapPacket::updateLen(){ #if IS_BIG_ENDIAN ((BoapPacketHead*)odata)->length = bswap_32(opos); #else ((BoapPacketHead*)odata)->length = opos; #endif } UInt32 BoapPacket::getCmd(){ #if IS_BIG_ENDIAN return bswap_32(((BoapPacketHead*)odata)->cmd); #else return ((BoapPacketHead*)odata)->cmd; #endif } inline void BoapPacket::copyWithSwap(void* dst, const void* src, UInt32 nBytes, char* swapType){ #if IS_BIG_ENDIAN char* d = (char*)dst; char* s = (char*)src; // printf("BoapPacket::copyWithSwap: BIG_ENDIAN: swapType: %s\n", swapType); if(swapType[1] == '\0'){ switch(swapType[0]){ case '8': while(nBytes){ swap64(d, s); d += 8; s += 8; nBytes -= 8; } break; case '4': while(nBytes){ swap32(d, s); d += 4; s += 4; nBytes -= 4; } break; case '2': { while(nBytes){ swap16(d, s); d += 2; s += 2; nBytes -= 2; } break; } case '1': default: memcpy(dst, src, nBytes); break; } } else { char* t = swapType; char* d = (char*)dst; char* s = (char*)src; int l = 0; while(nBytes){ switch(*t){ case '8': swap64(d, s); l = 8; break; case '4': swap32(d, s); l = 4; break; case '2': swap16(d, s); l = 2; break; case '1': swap8(d, s); l = 1; break; default: swap8(d, s); l = 1; break; } d += l; s += l; nBytes -= l; t++; if(*t == '\0') t = swapType; } } #else memcpy(dst, src, nBytes); #endif } int BoapPacket::pushHead(BoapPacketHead& head){ opos = 0; resize(opos + sizeof(head)); copyWithSwap(&odata[opos], &head, sizeof(head), "4"); opos += sizeof(head); updateLen(); return 0; } int BoapPacket::push(Int8 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "1"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt8 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "1"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(Int16 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "2"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt16 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "2"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(Int32 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "4"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt32 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "4"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(Int64 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "8"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt64 v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "8"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(const BString& v){ Int32 l = v.len() + 1; resize(opos + sizeof(l) + l); copyWithSwap(&odata[opos], &l, sizeof(l), "4"); opos += sizeof(l); copyWithSwap(&odata[opos], v.retStr(), l, "1"); opos += l; updateLen(); return 0; } int BoapPacket::push(Double v){ resize(opos + sizeof(v)); copyWithSwap(&odata[opos], &v, sizeof(v), "8"); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(const BError& v){ Int32 n = v.getErrorNo(); BString s = v.getString(); push(n); push(s); return 0; } int BoapPacket::push(UInt32 nBytes, const void* data, char* swapType){ resize(opos + nBytes); copyWithSwap(&odata[opos], data, nBytes, swapType); opos += nBytes; updateLen(); return 0; } int BoapPacket::peekHead(BoapPacketHead& head){ opos = 0; copyWithSwap(&head, &odata[opos], sizeof(head), "4"); return 0; } int BoapPacket::popHead(BoapPacketHead& head){ opos = 0; copyWithSwap(&head, &odata[opos], sizeof(head), "4"); opos += sizeof(head); return 0; } int BoapPacket::pop(Int8& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "1"); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt8& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "1"); opos += sizeof(v); return 0; } int BoapPacket::pop(Int16& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "2"); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt16& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "2"); opos += sizeof(v); return 0; } int BoapPacket::pop(Int32& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "4"); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt32& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "4"); opos += sizeof(v); return 0; } int BoapPacket::pop(Int64& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "8"); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt64& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "8"); opos += sizeof(v); return 0; } int BoapPacket::pop(BString& v){ Int32 l; copyWithSwap(&l, &odata[opos], sizeof(l), "4"); opos += sizeof(l); v = (char*)&odata[opos]; opos += l; return 0; } int BoapPacket::pop(Double& v){ copyWithSwap(&v, &odata[opos], sizeof(v), "8"); opos += sizeof(v); return 0; } int BoapPacket::pop(BError& v){ Int32 n; BString s; pop(n); pop(s); v.set(n, s); return 0; } int BoapPacket::pop(UInt32 nBytes, void* data, char* swapType){ copyWithSwap(data, &odata[opos], nBytes, swapType); opos += nBytes; return 0; } /******************************************************************************* * Boap Client communications object ******************************************************************************* */ /******************************************************************************* * Base for all Client Classes ******************************************************************************* */ BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){ oconnected = 0; oservice = 0; oapiVersion = 0; opriority = BoapPriorityNormal; omaxLength = 0; otimeout = -1; oreconnect = 0; } BError BoapClientObject::connectService(BString name){ BError err; Boapns::BoapEntry entry; BString host = "localhost"; BString boapnsName; BSocketAddressINET add; // Check if already connected to same service if(!oreconnect && (name == oname) && oconnected){ return err; } // Check if a re-connect if(oconnected){ oconnected = 0; close(); init(STREAM); } oname = name; // Parse name if(name.subString(0, 2) == "//"){ name.pullSeparators("/"); host = name.pullToken("/"); } if(name == "boapns"){ if(!(err = add.set(host, "boapns", "tcp")) || !(err = add.set(host, boapPort))){ if(! (err = connect(add))){ oservice = 0; oconnected = 1; } } } else { Boapns::Boapns boapns("boapns"); BIter i; // Lookup name boapnsName = BString("//") + host + "/boapns"; if(! (err = boapns.connectService(boapnsName))){ if(! (err = boapns.getEntry(name, entry))){ // First try and connect to the host name dprintf("BoapClientObject::connectService: Try To Connect to: %s:%d\n", entry.hostName.retStr(), entry.port); if(! (err = add.set(entry.hostName, entry.port))){ dprintf("BoapClientObject::connectService: Connect to hostname: %s:%d\n", entry.hostName.retStr(), entry.port); if(! (err = connect(add))){ oservice = entry.service; oconnected = 1; #if APIVERSION_TEST err = checkApiVersion(); #endif } } if(!oconnected){ // Try and connect on one of the available network addresses for(entry.addressList.end(i); !entry.addressList.isEnd(i); entry.addressList.prev(i)){ if(! (err = add.set(entry.addressList[i], entry.port))){ dprintf("BoapClientObject::connectService: Connect to ipaddress: %s:%d on %s\n", entry.hostName.retStr(), entry.port, entry.addressList[i].retStr()); if(! (err = connect(add))){ oservice = entry.service; oconnected = 1; #if APIVERSION_TEST err = checkApiVersion(); #endif break; } } } } } } else { err.set(err.getErrorNo(), BString("Connection to BoapNs (") + boapnsName + ") Failed: " + err.getString()); } } oreconnect = 0; dprintf("BoapClientObject::connectService: Status: %s\n", err.getString().retStr()); return err; } BError BoapClientObject::disconnectService(){ BError err; if(oconnected){ oconnected = 0; close(); init(STREAM); } return err; } BString BoapClientObject::getServiceName(){ return oname; } BError BoapClientObject::ping(BUInt32& apiVersion){ BError err; olock.lock(); err = pingLocked(apiVersion); olock.unlock(); return err; } BError BoapClientObject::pingLocked(BUInt32& apiVersion){ BError err; BError ret; BoapPacketHead txhead; BoapPacketHead rxhead; if(err = connectService(oname)){ return err; } txhead.cmd = 0; txhead.type = BoapMagic | BoapTypeRpc; txhead.service = oservice; otx.pushHead(txhead); if(err = performCall(otx, orx)){ return err; } orx.popHead(rxhead); orx.pop(apiVersion); orx.pop(ret); return ret; } BError BoapClientObject::setConnectionPriority(BoapPriority priority){ BError err; BoapPacketHead txhead; BoapPacketHead rxhead; BSocket::Priority pri; olock.lock(); if(err = connectService(oname)){ olock.unlock(); return err; } opriority = priority; switch(opriority){ case BoapPriorityLow: pri = BSocket::PriorityLow; break; case BoapPriorityNormal: pri = BSocket::PriorityNormal; break; case BoapPriorityHigh: pri = BSocket::PriorityHigh; break; default: pri = BSocket::PriorityNormal; break; } err = BSocket::setPriority(pri); if(!err){ txhead.cmd = 1; txhead.type = BoapMagic | BoapTypeRpc; txhead.service = oservice; otx.pushHead(txhead); otx.push(priority); if(err = performCall(otx, orx)){ return err; } orx.popHead(rxhead); orx.pop(err); } olock.unlock(); return err; } void BoapClientObject::setMaxLength(BUInt32 maxLength){ omaxLength = omaxLength; } void BoapClientObject::setTimeout(int timeout){ otimeout = timeout; } BError BoapClientObject::checkApiVersion(){ BError err; BUInt32 apiVersion; if(! (err = pingLocked(apiVersion))){ #if DEBUG printf("CheckVersions: Local: %d == Rem: %d\n", oapiVersion, apiVersion); #endif if(oapiVersion != apiVersion) err.set(1, BString("BOAP API versions incorrect: Local: ") + oapiVersion + " Remote: " + apiVersion); } return err; } BError BoapClientObject::performSend(BoapPacket& tx){ BError err; BSize nb = tx.nbytes(); char* data = tx.data(); BSize n = 0; BSize nd = 0; // Send packet while(n < nb){ if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } BError BoapClientObject::performRecv(BoapPacket& rx){ BError err; BSize nb = 0; char* data = 0; BSize n = 0; BSize nd = 0; BoapPacketHead head; // Recieve Packet // Read Head first rx.resize(sizeof(BoapPacketHead)); nb = sizeof(BoapPacketHead); data = rx.data(); n = 0; while(n < nb){ #ifdef ZAP if(err = recv(&data[n], nb - n, nd)){ return err; } #else if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){ oreconnect = 1; return err; } #endif n = n + nd; } // Read rest of packet rx.popHead(head); // Validate packet if((head.type & 0xFFFFFF00) != BoapMagic) return err.set(1, "A non BOAP packet was received"); if(omaxLength && (head.length > omaxLength)) return err.set(1, "BOAP packet length was to large"); rx.resize(head.length); nb = head.length - sizeof(BoapPacketHead); data = &rx.data()[sizeof(BoapPacketHead)]; n = 0; while(n < nb){ #ifdef ZAP if(err = recv(&data[n], nb - n, nd)){ return err; } #else if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){ oreconnect = 1; return err; } #endif n = n + nd; } return err; } BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){ BError err; if(! (err = performSend(tx))){ err = performRecv(rx); } if(!err && (tx.getCmd() != rx.getCmd())) err.set(1, BString("BOAP incorrect packet for command recieved: Expected: ") + tx.getCmd() + " Got: " + rx.getCmd()); return err; } BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){ setBroadCast(1); } BError BoapSignalObject::performSend(BoapPacket& tx){ BError err; BSize nb = tx.nbytes(); char* data = tx.data(); BSize n = 0; BSize nd = 0; BSocketAddressINET nadd; nadd.set(INADDR_BROADCAST, 14000); // Send packet while(n < nb){ if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } /******************************************************************************* * Boap Server communications object ******************************************************************************* */ BoapServerConnection::BoapServerConnection(BoapServer& boapServer, int fd) : oboapServer(boapServer), osocket(fd){ omaxLength = 0; } BError BoapServerConnection::process(){ BError err; unsigned int nb = 0; char* data = 0; unsigned int n = 0; unsigned int nd = 0; BoapPacketHead head; // Recieve Packet // Read Head first orx.resize(sizeof(BoapPacketHead)); nb = sizeof(BoapPacketHead); data = orx.data(); n = 0; while(n < nb){ if(err = osocket.recv(&data[n], nb - n, nd, 0)) return err; n = n + nd; } // Read rest of packet orx.peekHead(head); // Validate packet if((head.type & 0xFFFFFF00) != BoapMagic) return err.set(1, "A non BOAP packet was received"); if(omaxLength && (head.length > omaxLength)) return err.set(1, "BOAP packet length was to large"); orx.resize(head.length); nb = head.length - sizeof(BoapPacketHead); data = &orx.data()[sizeof(BoapPacketHead)]; n = 0; while(n < nb){ if(err = osocket.recv(&data[n], nb - n, nd, 0)) return err; n = n + nd; } dprintf("Command received: Type(%x) Len(%d) Service(%d) Cmd(%d)\n", head.type, head.length, head.service, head.cmd); oboapServer.process(this, orx, otx); // Send packet nb = otx.nbytes(); data = otx.data(); n = 0; while(n < nb){ if(err = osocket.send(&data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } BSocket& BoapServerConnection::getSocket(){ return osocket; } void BoapServerConnection::setMaxLength(BUInt32 maxLength){ omaxLength = maxLength; } void* BoapServerConnection::function(){ BError err; while(1){ if(err = process()){ dprintf("BoapServerConnection::function: Error: %s\n", err.getString().retStr()); oboapServer.clientGone(this); return 0; } } } BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) { othreaded = 0; oisBoapns = 0; onet.setReuseAddress(1); oboapns = new Boapns::Boapns(); } BoapServer::~BoapServer(){ delete oboapns; } BError BoapServer::init(BString boapNsHost, int threaded, int isBoapns){ BError err; BIter i; BoapService s; BList addressList; BSocketAddressINET nadd; BString boapNsObject = "boapns"; BString name; othreaded = threaded; oisBoapns = isBoapns; ohostName = BSocketAddressINET::getHostName(); addressList = BSocketAddressINET::getIpAddressListAll(); if(oisBoapns) nadd.set("", "boapns", "tcp"); else nadd.set("", 0); // Create service socket if(err = onet.bind(nadd)) return err; // Create event socket if(err = onet.getAddress(nadd)) return err; if(err = onetEvent.bind(nadd)) return err; // BOAP event broadcast address onetEventAddress.set("", "boapns", "udp"); onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port()); onetEvent.setBroadCast(1); if(!oisBoapns){ if(boapNsHost != "") boapNsObject = BString("//") + boapNsHost + "/" + boapNsObject; if(err = oboapns->connectService(boapNsObject)){ err.set(err.getErrorNo(), BString("Connection to BoapNs Failed: ") + err.getString()); return err; } for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){ // If a name is not given, get one from the Boapns if(oservices[i].oobject->name() == ""){ oboapns->getNewName(name); oservices[i].oobject->setName(name); } Boapns::BoapEntry e = Boapns::BoapEntry(oservices[i].oobject->name(), ohostName, addressList, nadd.port(), s); if(err = oboapns->addEntry(e)) break; } } return err; } void BoapServer::clientGone(BoapServerConnection* client){ oclientGoneEvent.sendEvent(client->getSocket().getFd()); } void* BoapServer::function(){ run(0); return 0; } BError BoapServer::run(int inThread){ BError err; BError e; BSocketAddressINET nadd; int fd; BIter i; BoapServerConnection* c; if(inThread){ start(); return err; } opoll.append(onet.getFd()); opoll.append(oclientGoneEvent.getFd()); opoll.append(onetEvent.getFd()); onet.listen(); while(1){ opoll.doPoll(fd); if(fd == onet.getFd()){ onet.accept(fd, nadd); dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr()); oclients.append(c = new BoapServerConnection(*this, fd)); if(othreaded){ c->start(); } else { opoll.append(fd); } } else if(fd == oclientGoneEvent.getFd()){ oclientGoneEvent.getEvent(fd); dprintf("Client Gone Event: Fd: %d\n", fd); if(othreaded){ for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){ if(oclients[i]->getSocket().getFd() == fd){ oclients[i]->waitForCompletion(); delete oclients[i]; oclients.del(i); close(fd); break; } } } } else if(fd == onetEvent.getFd()){ processEvent(fd); } else if(fd >= 0){ dprintf("Process Client: %d\n", fd); for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){ if(oclients[i]->getSocket().getFd() == fd){ if(e = oclients[i]->process()){ dprintf("ClientGone: %d Error: %s\n", fd, e.getString().retStr()); opoll.delFd(fd); delete oclients[i]; oclients.del(i); close(fd); } break; } } } } return err; } BError BoapServer::addObject(BoapServiceObject* object){ BError err; oservices.append(BoapServiceEntry(oservices.number(), object)); return err; } BError BoapServer::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BIter i; BoapPacketHead head; rx.popHead(head); for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){ if(oservices[i].oservice == head.service){ err = oservices[i].oobject->process(conn, rx, tx); break; } } return err; } BError BoapServer::processEvent(BoapPacket& rx){ BError err; BIter i; int s; dprintf("BoapServer::processEvent\n"); for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){ err = oservices[i].oobject->processEvent(rx); } return err; } BError BoapServer::processEvent(int fd){ BError err; int nb = 0; BoapPacketHead head; BoapPacket rx; // Recieve Packet rx.resize(1500); if((nb = recv(fd, rx.data(), 1500, 0)) < 0) return BError(errno, strerror(errno)); rx.popHead(head); dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd); return processEvent(rx); } BError BoapServer::sendEvent(BoapPacket& tx){ BError err; int nb = 0; char* data = 0; int n = 0; uint32_t nsent; // Send packet nb = tx.nbytes(); data = tx.data(); n = 0; while(n < nb){ dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr()); err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent); dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr()); if(err) return err; n = n + nsent; } return err; } BSocket& BoapServer::getSocket(){ return onet; } BSocket& BoapServer::getEventSocket(){ return onetEvent; } BString BoapServer::getHostName(){ return ohostName; } int BoapServer::getConnectionsNumber(){ return oclients.number(); } /******************************************************************************* * Base for all Server Objects ******************************************************************************* */ BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) : ocmd(cmd), ofunc(func){ } BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){ oapiVersion = 0; oserver.addObject(this); ofuncList.append(BoapFuncEntry(0, &BoapServiceObject::doPing)); ofuncList.append(BoapFuncEntry(1, &BoapServiceObject::doConnectionPriority)); } BoapServiceObject::~BoapServiceObject(){ } BError BoapServiceObject::setName(BString name){ BError err; oname = name; return err; } BString BoapServiceObject::name(){ return oname; } BError BoapServiceObject::doPing(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead rxhead; BoapPacketHead txhead; rx.popHead(rxhead); txhead.type = BoapMagic | BoapTypeRpcReply; txhead.service = rxhead.service; txhead.cmd = rxhead.cmd; tx.pushHead(txhead); tx.push(oapiVersion); tx.push(err); return err; } BError BoapServiceObject::doConnectionPriority(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead rxhead; BoapPacketHead txhead; BUInt32 priority; BSocket::Priority pri; rx.popHead(rxhead); rx.pop(priority); switch(priority){ case BoapPriorityLow: pri = BSocket::PriorityLow; break; case BoapPriorityNormal: pri = BSocket::PriorityNormal; break; case BoapPriorityHigh: pri = BSocket::PriorityHigh; break; default: pri = BSocket::PriorityNormal; break; } conn->getSocket().setPriority(pri); txhead.type = BoapMagic | BoapTypeRpcReply; txhead.service = rxhead.service; txhead.cmd = rxhead.cmd; tx.pushHead(txhead); tx.push(err); return err; } BError BoapServiceObject::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead head; BIter i; rx.popHead(head); for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){ if(ofuncList[i].ocmd == head.cmd){ err = (this->*ofuncList[i].ofunc)(conn, rx, tx); break; } } return err; } BError BoapServiceObject::sendEvent(BString name, Int32 arg){ BError err; BoapPacketHead txhead; BoapPacket tx; BString fullName = BString("//") + oserver.getHostName() + "/" + oname; txhead.type = BoapMagic | BoapTypeSignal; txhead.service = 0; txhead.cmd = 0; tx.pushHead(txhead); tx.push(fullName); tx.push(name); tx.push(arg); if(err = sendEvent(tx)) return err; return err; } BError BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg){ BError err; dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg)\n"); return err; } BError BoapServiceObject::sendEvent(BoapPacket& tx){ return oserver.sendEvent(tx); } BError BoapServiceObject::processEvent(BoapPacket& rx){ BError err; BoapPacketHead rxhead; BError ret; BString objectName; BString signalName; Int32 arg; dprintf("BoapServiceObject::processEvent\n"); rx.popHead(rxhead); rx.pop(objectName); rx.pop(signalName); rx.pop(arg); ret = processEvent(objectName, signalName, arg); return err; }