RSS Git Download  Clone
Raw View History
Blames found: 1 Mode: text/x-c++src Binary: false

/******************************************************************************* * Boap.cc Boap RPC protocol * T.Barnaby, BEAM Ltd, 8/5/03 * * History: * 4/2/05: Updated addressing scheme ******************************************************************************* */ #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <sys/socket.h> #include <netinet/in.h> #include <Boap.h> // Boap NS #include <BoapnsD.h> #include <BoapnsC.h> #define DEBUG 0 #if DEBUG #define dprintf(fmt, a...) printf(fmt, ##a); #else #define dprintf(fmt, a...) #endif const int roundSize = 256; BoapPacket::BoapPacket(){ opos = 0; osize = roundSize; odata = (char*)malloc(osize); } BoapPacket::~BoapPacket(){ delete odata; } int BoapPacket::resize(int size){ if(size > osize){ osize = roundSize * ((size + roundSize - 1) / roundSize); odata = (char*)realloc(odata, osize); } return 0; } BError BoapPacket::setData(void* data, int nbytes){ BError err; resize(nbytes); memcpy(odata, data, nbytes); return err; } int BoapPacket::nbytes(){ return ((BoapPacketHead*)odata)->length; } char* BoapPacket::data(){ return odata; } void BoapPacket::updateLen(){ ((BoapPacketHead*)odata)->length = opos; } int BoapPacket::pushHead(BoapPacketHead& head){ opos = 0; resize(opos + sizeof(head)); memcpy(&odata[opos], &head, sizeof(head)); opos += sizeof(head); updateLen(); return 0; } int BoapPacket::push(Int8 v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt8 v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(Int16 v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt16 v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(Int32 v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(UInt32 v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(BString& v){ Int32 l = v.len() + 1; resize(opos + sizeof(l) + l); memcpy(&odata[opos], &l, sizeof(l)); opos += sizeof(l); memcpy(&odata[opos], v.retStr(), l); opos += l; updateLen(); return 0; } int BoapPacket::push(Double v){ resize(opos + sizeof(v)); memcpy(&odata[opos], &v, sizeof(v)); opos += sizeof(v); updateLen(); return 0; } int BoapPacket::push(BError& v){ Int32 n = v.getErrorNo(); BString s = v.getString(); push(n); push(s); return 0; } int BoapPacket::push(UInt32 nBytes, const void* data){ } int BoapPacket::popHead(BoapPacketHead& head){ opos = 0; memcpy(&head, &odata[opos], sizeof(head)); opos += sizeof(head); return 0; } int BoapPacket::pop(Int8& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt8& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(Int16& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt16& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(Int32& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(UInt32& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(BString& v){ Int32 l; memcpy(&l, &odata[opos], sizeof(l)); opos += sizeof(l); v = (char*)&odata[opos]; opos += l; return 0; } int BoapPacket::pop(Double& v){ memcpy(&v, &odata[opos], sizeof(v)); opos += sizeof(v); return 0; } int BoapPacket::pop(BError& v){ Int32 n; BString s; pop(n); pop(s); v.set(n, s); return 0; } int BoapPacket::pop(UInt32 nBytes, void* data){ return 0; } /******************************************************************************* * Boap Client communications object ******************************************************************************* */ /******************************************************************************* * Base for all Client Classes ******************************************************************************* */ BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){ oconnected = 0; oservice = 0; } BError BoapClientObject::connectService(BString name){ BError err; Boapns::Boapns boapns("boapns"); Boapns::BoapEntry entry; BString host = "localhost"; BString boapnsName; BSocketAddressINET add; if(oconnected) return err; oname = name; // Parse name if(name.subString(0, 2) == "//"){ name.pullSeparators("/"); host = name.pullToken("/"); } if(name == "boapns"){ if(! (err = add.set(host, "boapns", "tcp"))){ if(! (err = connect(add))){ oservice = 0; oconnected = 1; } } } else { // Lookup name boapnsName = BString("//") + host + "/boapns"; if(! (err = boapns.connectService(boapnsName))){ if(! (err = boapns.getEntry(name, entry))){ if(! (err = add.set(entry.ipAddress, entry.port))){ if(! (err = connect(add))){ oservice = entry.service; oconnected = 1; } } } } } return err; } BError BoapClientObject::performSend(BoapPacket& tx){ BError err; BSize nb = tx.nbytes(); char* data = tx.data(); BSize n = 0; BSize nd = 0; // Connect if required if(err = connectService(oname)) return err; // Send packet while(n < nb){ if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } BError BoapClientObject::performRecv(BoapPacket& rx){ BError err; BSize nb = 0; char* data = 0; BSize n = 0; BSize nd = 0; BoapPacketHead head; // Connect if required if(err = connectService(oname)) return err; // Recieve Packet // Read Head first rx.resize(sizeof(BoapPacketHead)); nb = sizeof(BoapPacketHead); data = rx.data(); n = 0; while(n < nb){ if(err = recv(&data[n], nb - n, nd)) return err; n = n + nd; } // Read rest of packet rx.popHead(head); rx.resize(head.length); nb = head.length - sizeof(BoapPacketHead); data = &rx.data()[sizeof(BoapPacketHead)]; n = 0; while(n < nb){ if(err = recv(&data[n], nb - n, nd)) return err; n = n + nd; } return err; } BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){ BError err; if(! (err = performSend(tx))){ err = performRecv(rx); } return err; } BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){ setBroadCast(1); } BError BoapSignalObject::performSend(BoapPacket& tx){ BError err; BSize nb = tx.nbytes(); char* data = tx.data(); BSize n = 0; BSize nd = 0; BSocketAddressINET nadd; nadd.set(INADDR_BROADCAST, 14000); // Send packet while(n < nb){ if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } /******************************************************************************* * Boap Server communications object ******************************************************************************* */ BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) { oboapNs = 0; onet.setReuseAddress(1); } BError BoapServer::init(int boapNs){ BError err; Boapns::Boapns boapns("boapns"); BIter i; BoapService s; BList<uint32_t> l; uint32_t ipAddress; BSocketAddressINET nadd; oboapNs = boapNs; ohostName = BSocketAddressINET::getHostName(); l = BSocketAddressINET::getIpAddresses(); if(l.number()) ipAddress = l[0]; else ipAddress = 0; if(boapNs) nadd.set("", "boapns", "tcp"); else nadd.set("", 0); // Create service socket if(err = onet.bind(nadd)) return err; // Create event socket if(err = onet.getAddress(nadd)) return err; if(err = onetEvent.bind(nadd)) return err; // BOAP event broadcast address onetEventAddress.set("", "boapns", "udp"); onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port()); onetEvent.setBroadCast(1); if(!boapNs){ for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){ if(err = boapns.addEntry(Boapns::BoapEntry(oservices[i].oobject->name(), ipAddress, nadd.port(), s))) break; } } return err; } BError BoapServer::run(){ BError err; BSocketAddressINET nadd; int fd; opoll.append(onet.getFd()); opoll.append(onetEvent.getFd()); onet.listen(); while(1){ opoll.doPoll(fd); if(fd == onet.getFd()){ onet.accept(fd, nadd); dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr()); opoll.append(fd); } else if(fd == onetEvent.getFd()){ processEvent(fd); } else if(fd >= 0){ if(process(fd)){ opoll.delFd(fd); close(fd); } } } return err; } BError BoapServer::addObject(BoapServiceObject* object){ BError err; oservices.append(BoapServiceEntry(oservices.number(), object)); return err; } BError BoapServer::process(int fd){ BError err; int nb = 0; char* data = 0; int n = 0; int nd = 0; BoapPacketHead head; BIter i; // Recieve Packet // Read Head first orx.resize(sizeof(BoapPacketHead)); nb = sizeof(BoapPacketHead); data = orx.data(); n = 0; while(n < nb){ nd = recv(fd, &data[n], nb - n, 0); if(nd <= 0) return BError(1, strerror(errno)); n = n + nd; } // Read rest of packet orx.popHead(head); orx.resize(head.length); nb = head.length - sizeof(BoapPacketHead); data = &orx.data()[sizeof(BoapPacketHead)]; n = 0; while(n < nb){ nd = recv(fd, &data[n], nb - n, 0); if(nd <= 0) return BError(1, strerror(errno)); n = n + nd; } dprintf("Command received: Len(%d) Type(%d) Service(%d) Cmd(%d)\n", head.length, head.type, head.service, head.cmd); for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){ if(oservices[i].oservice == head.service){ oservices[i].oobject->process(orx, otx); break; } } // Send packet nb = otx.nbytes(); data = otx.data(); n = 0; while(n < nb){ nd = send(fd, &data[n], nb - n, MSG_NOSIGNAL); if(nd <= 0) return BError(1, strerror(errno)); n = n + nd; } return err; } BError BoapServer::processEvent(BoapPacket& rx){ BError err; BIter i; int s; dprintf("BoapServer::processEvent\n"); for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){ oservices[i].oobject->processEvent(rx); } return err; } BError BoapServer::processEvent(int fd){ BError err; int nb = 0; BoapPacketHead head; BoapPacket rx; // Recieve Packet rx.resize(1500); if((nb = recv(fd, rx.data(), 1500, 0)) < 0) return BError(errno, strerror(errno)); orx.popHead(head); dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd); return processEvent(rx); } BError BoapServer::sendEvent(BoapPacket& tx){ BError err; int nb = 0; char* data = 0; int n = 0; uint32_t nsent; // Send packet nb = tx.nbytes(); data = tx.data(); n = 0; while(n < nb){ dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr()); err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent); dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr()); if(err) return err; n = n + nsent; } return err; } BSocket& BoapServer::getSocket(){ return onet; } BSocket& BoapServer::getEventSocket(){ return onetEvent; } BString BoapServer::getHostName(){ return ohostName; } /******************************************************************************* * Base for all Server Objects ******************************************************************************* */ BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) : ocmd(cmd), ofunc(func){ } BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){ oserver.addObject(this); } BoapServiceObject::~BoapServiceObject(){ } BString BoapServiceObject::name(){ return oname; } BError BoapServiceObject::process(BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead head; BIter i; rx.popHead(head); for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){ if(ofuncList[i].ocmd == head.cmd){ err = (this->*ofuncList[i].ofunc)(rx, tx); break; } } return err; } BError BoapServiceObject::sendEvent(BString name, Int32 arg){ BError err; BoapPacketHead txhead; BoapPacket tx; BString fullName = BString("//") + oserver.getHostName() + "/" + oname; txhead.type = BoapTypeSignal; txhead.service = 0; txhead.cmd = 0; tx.pushHead(txhead); tx.push(fullName); tx.push(name); tx.push(arg); if(err = sendEvent(tx)) return err; return err; } BError BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg){ BError err; dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg)\n"); return err; } BError BoapServiceObject::sendEvent(BoapPacket& tx){ return oserver.sendEvent(tx); } BError BoapServiceObject::processEvent(BoapPacket& rx){ BError err; BoapPacketHead rxhead; BError ret; BString objectName; BString signalName; Int32 arg; dprintf("BoapServiceObject::processEvent\n"); rx.popHead(rxhead); rx.pop(objectName); rx.pop(signalName); rx.pop(arg); ret = processEvent(objectName, signalName, arg); return err; }