/******************************************************************************* * Boap.cc Boap RPC protocol * T.Barnaby, BEAM Ltd, 8/5/03 * updated by D.Korchagin, CERN AB-BI-SW, 2007-08-31 * * History: * 4/2/05: Updated addressing scheme ******************************************************************************* */ #include #include #include #include #include #include #include #include #ifndef __Lynx__ #include #else static inline int bswap_32(int i){ return (i << 24) | ((i & 0xff00) << 8) | ((i >> 8) & 0xff00) | (i >> 24); } #endif // Boap NS #include #include #undef DEBUG #define DEBUG 0 #if DEBUG #include #endif #define APIVERSION_TEST 1 // Tests the API version during a connect #if DEBUG #define dprintf(fmt, a...) printf(fmt, ##a); #else #define dprintf(fmt, a...) #endif #if __BYTE_ORDER == __BIG_ENDIAN #define IS_BIG_ENDIAN 1 #endif const int boapPort = 12000; ///< The default BOAP connection port BoapPacket::BoapPacket(){ } BoapPacket::~BoapPacket(){ } void BoapPacket::updateHead(){ #if IS_BIG_ENDIAN ((BoapPacketHead*)odata)->length = bswap_32(opos); #else ((BoapPacketHead*)odata)->length = opos; #endif } BUInt32 BoapPacket::getCmd(){ #if IS_BIG_ENDIAN return bswap_32(((BoapPacketHead*)odata)->cmd); #else return ((BoapPacketHead*)odata)->cmd; #endif } int BoapPacket::pushHead(BoapPacketHead& head){ setSize(sizeof(head)); #if IS_BIG_ENDIAN bswap_copy(1, &head, odata, sizeof(head), "4"); #else memcpy(odata, &head, sizeof(head)); #endif opos = sizeof(head); return 0; } int BoapPacket::peekHead(BoapPacketHead& head){ #if IS_BIG_ENDIAN bswap_copy(1, odata, &head, sizeof(head), "4"); #else memcpy(&head, odata, sizeof(head)); #endif return 0; } int BoapPacket::popHead(BoapPacketHead& head){ #if IS_BIG_ENDIAN bswap_copy(1, odata, &head, sizeof(head), "4"); #else memcpy(&head, odata, sizeof(head)); #endif opos = sizeof(head); return 0; } /******************************************************************************* * Boap Client communications object ******************************************************************************* */ /******************************************************************************* * Base for all Client Classes ******************************************************************************* */ BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){ oconnected = 0; oservice = 0; oapiVersion = 0; opriority = BoapPriorityNormal; omaxLength = 0; otimeout = -1; oreconnect = 0; } BoapClientObject::~BoapClientObject(){ } BError BoapClientObject::connectService(BString name){ BError err; Boapns::BoapEntry entry; BString host = ""; BString thisHostName; BString boapnsName; BSocketAddressINET add; dprintf("BoapClientObject::connectService: %s\n", name.retStr()); // Check if already connected to same service if(!oreconnect && (name == oname) && oconnected){ dprintf("BoapClientObject::connectService: Return already connected: %s\n", name.retStr()); return err; } // Check if a re-connect if(oconnected){ oconnected = 0; close(); init(STREAM); } thisHostName = BSocketAddressINET::getHostName(); oname = name; // Parse name if(name.subString(0, 2) == "//"){ name.pullSeparators("/"); host = name.pullToken("/"); } if(name == "boapns"){ if(!(err = add.set(host, "boapns", "tcp")) || !(err = add.set(host, boapPort))){ if(! (err = connect(add))){ oservice = 0; oconnected = 1; } } } else { Boapns::Boapns boapns("boapns"); BIter i; // Lookup name boapnsName = BString("//") + host + "/boapns"; if(! (err = boapns.connectService(boapnsName))){ if(! (err = boapns.getEntry(name, entry))){ // First try and connect to the host name dprintf("BoapClientObject::connectService: Try To Connect to: %s:%d from: %s\n", entry.hostName.retStr(), entry.port, thisHostName.retStr()); if(! (err = add.set(entry.hostName, entry.port))){ dprintf("BoapClientObject::connectService: Connect to hostname: %s:%d\n", entry.hostName.retStr(), entry.port); if(! (err = connect(add))){ oservice = entry.service; oconnected = 1; dprintf("BoapClientObject::connectService: TCP/IP connected\n"); #if APIVERSION_TEST err = checkApiVersion(); #endif dprintf("BoapClientObject::connectService: ApiVersion checked: %s\n", err.getString().retStr()); } } if(!oconnected){ // Try and connect on one of the available network addresses for(entry.addressList.end(i); !entry.addressList.isEnd(i); entry.addressList.prev(i)){ if(! (err = add.set(entry.addressList[i], entry.port))){ dprintf("BoapClientObject::connectService: Connect to ipaddress: %s:%d on %s\n", entry.hostName.retStr(), entry.port, entry.addressList[i].retStr()); // Ignore localhost entry if not the same host if((entry.hostName != thisHostName) && (entry.addressList[i].compareWild("127.*"))) continue; if(! (err = connect(add))){ oservice = entry.service; oconnected = 1; #if APIVERSION_TEST err = checkApiVersion(); #endif break; } } } } } } else { err.set(err.getErrorNo(), BString("Connection to BoapNs (") + boapnsName + ") Failed: " + err.getString()); } } oreconnect = 0; dprintf("BoapClientObject::connectService: End: Status: %s\n", err.getString().retStr()); return err; } BError BoapClientObject::disconnectService(){ BError err; if(oconnected){ oconnected = 0; close(); init(STREAM); } return err; } BString BoapClientObject::getServiceName(){ return oname; } BError BoapClientObject::ping(BUInt32& apiVersion){ BError err; olock.lock(); err = pingLocked(apiVersion); olock.unlock(); return err; } BError BoapClientObject::pingLocked(BUInt32& apiVersion){ BError err; BError ret; BoapPacketHead txhead; BoapPacketHead rxhead; if(err = connectService(oname)){ return err; } txhead.cmd = 0; txhead.type = BoapMagic | BoapTypeRpc; txhead.service = oservice; otx.pushHead(txhead); if(err = performCall(otx, orx)){ return err; } orx.popHead(rxhead); orx.pop(ret); if((rxhead.type & 0xFF) == BoapTypeRpcReply){ orx.pop(apiVersion); } return ret; } BError BoapClientObject::setConnectionPriority(BoapPriority priority){ BError err; BoapPacketHead txhead; BoapPacketHead rxhead; BSocket::Priority pri; olock.lock(); if(err = connectService(oname)){ olock.unlock(); return err; } opriority = priority; switch(opriority){ case BoapPriorityLow: pri = BSocket::PriorityLow; break; case BoapPriorityNormal: pri = BSocket::PriorityNormal; break; case BoapPriorityHigh: pri = BSocket::PriorityHigh; break; default: pri = BSocket::PriorityNormal; break; } err = BSocket::setPriority(pri); if(!err){ txhead.cmd = 1; txhead.type = BoapMagic | BoapTypeRpc; txhead.service = oservice; otx.pushHead(txhead); otx.push(priority); if(err = performCall(otx, orx)){ return err; } orx.popHead(rxhead); orx.pop(err); } olock.unlock(); return err; } void BoapClientObject::setMaxLength(BUInt32 maxLength){ omaxLength = omaxLength; } void BoapClientObject::setTimeout(int timeout){ otimeout = timeout; } BError BoapClientObject::checkApiVersion(){ BError err; BUInt32 apiVersion; dprintf("BoapClientObject::checkApiVersion\n"); if(! (err = pingLocked(apiVersion))){ #if DEBUG printf("CheckVersions: Local: %d == Rem: %d\n", oapiVersion, apiVersion); #endif if(oapiVersion != apiVersion) err.set(ErrorMisc, BString("BOAP API versions incorrect: Local: ") + oapiVersion + " Remote: " + apiVersion); } dprintf("BoapClientObject::checkApiVersion: End\n"); return err; } BError BoapClientObject::performSend(BoapPacket& tx){ BError err; BSize nb = tx.size(); char* data = tx.data(); BSize n = 0; BSize nd = 0; // Send packet while(n < nb){ if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } BError BoapClientObject::performRecv(BoapPacket& rx){ BError err; BSize nb = 0; char* data = 0; BSize n = 0; BSize nd = 0; BoapPacketHead head; // Recieve Packet // Read Head first rx.resize(sizeof(BoapPacketHead)); nb = sizeof(BoapPacketHead); data = rx.data(); n = 0; while(n < nb){ #ifdef ZAP if(err = recv(&data[n], nb - n, nd)){ return err; } #else if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){ oreconnect = 1; return err; } #endif n = n + nd; } // Read rest of packet rx.popHead(head); // Validate packet if((head.type & 0xFFFFFF00) != BoapMagic) return err.set(ErrorMisc, "A non BOAP packet was received"); if(omaxLength && (head.length > omaxLength)) return err.set(ErrorMisc, "BOAP packet length was to large"); rx.resize(head.length); nb = head.length - sizeof(BoapPacketHead); data = &rx.data()[sizeof(BoapPacketHead)]; n = 0; while(n < nb){ #ifdef ZAP if(err = recv(&data[n], nb - n, nd)){ return err; } #else if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){ oreconnect = 1; return err; } #endif n = n + nd; } return err; } BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){ BError err; tx.updateHead(); while(1){ if(! (err = performSend(tx))){ err = performRecv(rx); } // Handle a disconnection by calling handleReconnect() if implemented if(err == -EPIPE){ BoapPacket ltx; // Save command ltx.setSize(tx.size()); ltx.writeData(0, tx.data(), tx.size()); olock.unlock(); // Call handler disconnectService(); oreconnect = 0; err = handleReconnect(err); // Resume command olock.lock(); tx.setSize(ltx.size()); tx.writeData(0, ltx.data(), ltx.size()); if(err) break; } else { break; } } if(!err && (tx.getCmd() != rx.getCmd())) err.set(ErrorMisc, BString("BOAP incorrect packet for command recieved: Expected: ") + tx.getCmd() + " Got: " + rx.getCmd()); return err; } BError BoapClientObject::handleReconnect(BError err){ return err; } BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){ setBroadCast(1); } BError BoapSignalObject::performSend(BoapPacket& tx){ BError err; BSize nb = tx.size(); char* data = tx.data(); BSize n = 0; BSize nd = 0; BSocketAddressINET nadd; nadd.set(INADDR_BROADCAST, 14000); // Send packet while(n < nb){ if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } return err; } /******************************************************************************* * Boap Server communications object ******************************************************************************* */ BoapServerConnection::BoapServerConnection(BoapServer& boapServer, int fd) : oboapServer(boapServer), osocket(fd){ omaxLength = 0; } BoapServerConnection::~BoapServerConnection(){ } BError BoapServerConnection::init(){ BError err; return err; } BError BoapServerConnection::process(){ BError err; unsigned int nb = 0; char* data = 0; unsigned int n = 0; BSize nd = 0; BoapPacketHead head; // Recieve Packet // Read Head first orx.resize(sizeof(BoapPacketHead)); nb = sizeof(BoapPacketHead); data = orx.data(); n = 0; while(n < nb){ if(err = osocket.recv(&data[n], nb - n, nd, 0)) return err; n = n + nd; } // Read rest of packet orx.peekHead(head); #if DEBUG fprintf(stderr, "PacketHead: "); hd8(&head, sizeof(head)); #endif // Validate packet if((head.type & 0xFFFFFF00) != BoapMagic){ return err.set(ErrorMisc, "A non BOAP packet was received"); } if(omaxLength && (head.length > omaxLength)) return err.set(ErrorMisc, "BOAP packet length was to large"); orx.resize(head.length); nb = head.length - sizeof(BoapPacketHead); data = &orx.data()[sizeof(BoapPacketHead)]; n = 0; while(n < nb){ if(err = osocket.recv(&data[n], nb - n, nd, 0)) return err; n = n + nd; } dprintf("Command received: Type(%x) Len(%d) Service(%d) Cmd(%d)\n", head.type, head.length, head.service, head.cmd); if((head.cmd != 0) && (err = validate())){ BoapPacketHead txHead; dprintf("Command: return validation error\n"); txHead.type = BoapMagic | BoapTypeRpcError; txHead.service = head.service; txHead.cmd = head.cmd; otx.pushHead(txHead); otx.push(err); } else { oboapServer.process(this, orx, otx); } // Send packet dprintf("Command reply: Len(%d)\n", otx.size()); otx.updateHead(); nb = otx.size(); data = otx.data(); n = 0; while(n < nb){ dprintf("Command send: Len(%d)\n", nb - n); if(err = osocket.send(&data[n], nb - n, nd, MSG_NOSIGNAL)) return err; n = n + nd; } dprintf("Command reply: End\n"); oboapServer.onumOperations++; return err; } BSocket& BoapServerConnection::getSocket(){ return osocket; } void BoapServerConnection::setMaxLength(BUInt32 maxLength){ omaxLength = maxLength; } BError BoapServerConnection::getHead(BoapPacketHead& head){ BError err; orx.peekHead(head); return err; } BError BoapServerConnection::validate(){ BError err; return err; } void* BoapServerConnection::function(){ BError err; while(1){ if(err = process()){ dprintf("BoapServerConnection::function: Error: %s\n", err.getString().retStr()); oboapServer.clientGone(this); return 0; } } } BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) { othreaded = 0; oisBoapns = 0; onumOperations = 0; onet.setReuseAddress(1); oboapns = new Boapns::Boapns(); } BoapServer::~BoapServer(){ delete oboapns; } BError BoapServer::init(BString boapNsHost, int port, int threaded, int isBoapns){ BError err; BIter i; BoapService s; BList addressList; BSocketAddressINET nadd; BString boapNsObject = "boapns"; BString name; othreaded = threaded; oisBoapns = isBoapns; ohostName = BSocketAddressINET::getHostName(); addressList = BSocketAddressINET::getIpAddressListAll(); if(oisBoapns) nadd.set("", "boapns", "tcp"); else nadd.set("", port); // Create service socket if(err = onet.bind(nadd)) return err; // Create event socket if(err = onet.getAddress(nadd)) return err; if(err = onetEvent.bind(nadd)) return err; // BOAP event broadcast address onetEventAddress.set("", "boapns", "udp"); onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port()); onetEvent.setBroadCast(1); if(!oisBoapns){ if(boapNsHost != "") boapNsObject = BString("//") + boapNsHost + "/" + boapNsObject; if(err = oboapns->connectService(boapNsObject)){ err.set(err.getErrorNo(), BString("Connection to BoapNs Failed: ") + err.getString()); return err; } for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){ // If a name is not given, get one from the Boapns if(oservices[i].oobject->name() == ""){ oboapns->getNewName(name); oservices[i].oobject->setName(name); } Boapns::BoapEntry e = Boapns::BoapEntry(oservices[i].oobject->name(), ohostName, addressList, nadd.port(), s); if(err = oboapns->addEntry(e)) break; } } return err; } void BoapServer::clientGone(BoapServerConnection* client){ oclientGoneEvent.sendEvent(client->getSocket().getFd()); } void* BoapServer::function(){ run(0); return 0; } BError BoapServer::run(int inThread){ BError err; BError e; BSocketAddressINET nadd; int fd; BIter i; BoapServerConnection* c; if(inThread){ start(); return err; } opoll.append(onet.getFd()); opoll.append(oclientGoneEvent.getFd()); opoll.append(onetEvent.getFd()); onet.listen(); while(1){ opoll.doPoll(fd); if(fd == onet.getFd()){ onet.accept(fd, nadd); dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr()); c = newConnection(fd, nadd); if(e = c->init()){ delete c; } oclients.append(c); if(othreaded){ c->start(); } else { opoll.append(fd); } } else if(fd == oclientGoneEvent.getFd()){ oclientGoneEvent.getEvent(fd); dprintf("Client Gone Event: Fd: %d\n", fd); if(othreaded){ for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){ if(oclients[i]->getSocket().getFd() == fd){ oclients[i]->waitForCompletion(); delete oclients[i]; oclients.del(i); break; } } } } else if(fd == onetEvent.getFd()){ processEvent(fd); } else if(fd >= 0){ dprintf("Process Client: %d\n", fd); for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){ if(oclients[i]->getSocket().getFd() == fd){ if(e = oclients[i]->process()){ dprintf("ClientGone: %d Error: %s\n", fd, e.getString().retStr()); opoll.delFd(fd); delete oclients[i]; oclients.del(i); } break; } } } } return err; } BError BoapServer::addObject(BoapServiceObject* object){ BError err; oservices.append(BoapServiceEntry(oservices.number(), object)); return err; } BError BoapServer::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BIter i; BoapPacketHead head; rx.popHead(head); for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){ if(oservices[i].oservice == head.service){ err = oservices[i].oobject->process(conn, rx, tx); break; } } return err; } BError BoapServer::processEvent(BoapPacket& rx){ BError err; BIter i; int s; dprintf("BoapServer::processEvent\n"); for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){ err = oservices[i].oobject->processEvent(rx); } return err; } BError BoapServer::processEvent(int fd){ BError err; int nb = 0; BoapPacketHead head; BoapPacket rx; // Recieve Packet rx.resize(1500); if((nb = recv(fd, rx.data(), 1500, 0)) < 0) return BError(errno, strerror(errno)); rx.popHead(head); dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd); return processEvent(rx); } BError BoapServer::sendEvent(BoapPacket& tx){ BError err; int nb = 0; char* data = 0; int n = 0; BSize nsent; // Send packet nb = tx.size(); data = tx.data(); n = 0; while(n < nb){ dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr()); err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent); dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr()); if(err) return err; n = n + nsent; } return err; } BSocket& BoapServer::getSocket(){ return onet; } BSocket& BoapServer::getEventSocket(){ return onetEvent; } BString BoapServer::getHostName(){ return ohostName; } int BoapServer::getConnectionsNumber(){ return oclients.number(); } BoapServerConnection* BoapServer::newConnection(int fd, BSocketAddressINET address){ return new BoapServerConnection(*this, fd); } /******************************************************************************* * Base for all Server Objects ******************************************************************************* */ BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) : ocmd(cmd), ofunc(func){ } BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){ oapiVersion = 0; oserver.addObject(this); ofuncList.append(BoapFuncEntry(0, &BoapServiceObject::doPing)); ofuncList.append(BoapFuncEntry(1, &BoapServiceObject::doConnectionPriority)); } BoapServiceObject::~BoapServiceObject(){ } BError BoapServiceObject::setName(BString name){ BError err; oname = name; return err; } BString BoapServiceObject::name(){ return oname; } BError BoapServiceObject::doPing(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead rxhead; BoapPacketHead txhead; rx.popHead(rxhead); txhead.type = BoapMagic | BoapTypeRpcReply; txhead.service = rxhead.service; txhead.cmd = rxhead.cmd; tx.pushHead(txhead); tx.push(err); tx.push(oapiVersion); return err; } BError BoapServiceObject::doConnectionPriority(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead rxhead; BoapPacketHead txhead; BUInt32 priority; BSocket::Priority pri; rx.popHead(rxhead); rx.pop(priority); switch(priority){ case BoapPriorityLow: pri = BSocket::PriorityLow; break; case BoapPriorityNormal: pri = BSocket::PriorityNormal; break; case BoapPriorityHigh: pri = BSocket::PriorityHigh; break; default: pri = BSocket::PriorityNormal; break; } conn->getSocket().setPriority(pri); txhead.type = BoapMagic | BoapTypeRpcReply; txhead.service = rxhead.service; txhead.cmd = rxhead.cmd; tx.pushHead(txhead); tx.push(err); return err; } BError BoapServiceObject::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){ BError err; BoapPacketHead head; BIter i; rx.popHead(head); for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){ if(ofuncList[i].ocmd == head.cmd){ err = (this->*ofuncList[i].ofunc)(conn, rx, tx); break; } } return err; } BError BoapServiceObject::sendEvent(BString name, BInt32 arg){ BError err; BoapPacketHead txhead; BoapPacket tx; BString fullName = BString("//") + oserver.getHostName() + "/" + oname; txhead.type = BoapMagic | BoapTypeSignal; txhead.service = 0; txhead.cmd = 0; tx.pushHead(txhead); tx.push(fullName); tx.push(name); tx.push(arg); if(err = sendEvent(tx)) return err; return err; } BError BoapServiceObject::processEvent(BString objectName, BString signalName, BInt32 arg){ BError err; dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, BInt32 arg)\n"); return err; } BError BoapServiceObject::sendEvent(BoapPacket& tx){ return oserver.sendEvent(tx); } BError BoapServiceObject::processEvent(BoapPacket& rx){ BError err; BoapPacketHead rxhead; BError ret; BString objectName; BString signalName; BInt32 arg; dprintf("BoapServiceObject::processEvent\n"); rx.popHead(rxhead); rx.pop(objectName); rx.pop(signalName); rx.pop(arg); ret = processEvent(objectName, signalName, arg); return err; }