/*******************************************************************************
* Boap.cc Boap RPC protocol
* T.Barnaby, BEAM Ltd, 8/5/03
* updated by D.Korchagin, CERN AB-BI-SW, 2007-08-31
*
* History:
* 4/2/05: Updated addressing scheme
*******************************************************************************
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <Boap.h>
#ifndef __Lynx__
#include <byteswap.h>
#else
static inline int bswap_32(int i){
return (i << 24) | ((i & 0xff00) << 8) | ((i >> 8) & 0xff00) | (i >> 24);
}
#endif
// Boap NS
#include <BoapnsD.h>
#include <BoapnsC.h>
#define DEBUG 0
#define APIVERSION_TEST 1 // Tests the API version during a connect
#if DEBUG
#define dprintf(fmt, a...) printf(fmt, ##a);
#else
#define dprintf(fmt, a...)
#endif
#if __BYTE_ORDER == __BIG_ENDIAN
#define IS_BIG_ENDIAN 1
#endif
const int boapPort = 12000; ///< The default BOAP connection port
const int roundSize = 256;
static inline void swap8(char* d, char* s){
*d = *s;
}
static inline void swap16(char* d, char* s){
d[1] = s[0];
d[0] = s[1];
}
static inline void swap32(char* d, char* s){
d[3] = s[0];
d[2] = s[1];
d[1] = s[2];
d[0] = s[3];
}
static inline void swap64(char* d, char* s){
d[7] = s[0];
d[6] = s[1];
d[5] = s[2];
d[4] = s[3];
d[3] = s[4];
d[2] = s[5];
d[1] = s[6];
d[0] = s[7];
}
BoapPacket::BoapPacket(){
opos = 0;
osize = roundSize;
odata = (char*)malloc(osize);
}
BoapPacket::~BoapPacket(){
delete odata;
}
int BoapPacket::resize(int size){
if(size > osize){
// printf("BoapPacket::resize: from %d to %d\n", osize, size);
osize = roundSize * ((size + roundSize - 1) / roundSize);
odata = (char*)realloc(odata, osize);
}
return 0;
}
BError BoapPacket::setData(void* data, int nbytes){
BError err;
resize(nbytes);
memcpy(odata, data, nbytes);
return err;
}
int BoapPacket::nbytes(){
#if IS_BIG_ENDIAN
return bswap_32(((BoapPacketHead*)odata)->length);
#else
return ((BoapPacketHead*)odata)->length;
#endif
}
char* BoapPacket::data(){
return odata;
}
void BoapPacket::updateLen(){
#if IS_BIG_ENDIAN
((BoapPacketHead*)odata)->length = bswap_32(opos);
#else
((BoapPacketHead*)odata)->length = opos;
#endif
}
UInt32 BoapPacket::getCmd(){
#if IS_BIG_ENDIAN
return bswap_32(((BoapPacketHead*)odata)->cmd);
#else
return ((BoapPacketHead*)odata)->cmd;
#endif
}
inline void BoapPacket::copyWithSwap(void* dst, const void* src, UInt32 nBytes, char* swapType){
#if IS_BIG_ENDIAN
char* d = (char*)dst;
char* s = (char*)src;
// printf("BoapPacket::copyWithSwap: BIG_ENDIAN: swapType: %s\n", swapType);
if(swapType[1] == '\0'){
switch(swapType[0]){
case '8':
while(nBytes){
swap64(d, s);
d += 8;
s += 8;
nBytes -= 8;
}
break;
case '4':
while(nBytes){
swap32(d, s);
d += 4;
s += 4;
nBytes -= 4;
}
break;
case '2':
{
while(nBytes){
swap16(d, s);
d += 2;
s += 2;
nBytes -= 2;
}
break;
}
case '1':
default:
memcpy(dst, src, nBytes);
break;
}
}
else {
char* t = swapType;
char* d = (char*)dst;
char* s = (char*)src;
int l = 0;
while(nBytes){
switch(*t){
case '8': swap64(d, s); l = 8; break;
case '4': swap32(d, s); l = 4; break;
case '2': swap16(d, s); l = 2; break;
case '1': swap8(d, s); l = 1; break;
default: swap8(d, s); l = 1; break;
}
d += l;
s += l;
nBytes -= l;
t++;
if(*t == '\0')
t = swapType;
}
}
#else
memcpy(dst, src, nBytes);
#endif
}
int BoapPacket::pushHead(BoapPacketHead& head){
opos = 0;
resize(opos + sizeof(head));
copyWithSwap(&odata[opos], &head, sizeof(head), "4");
opos += sizeof(head);
updateLen();
return 0;
}
int BoapPacket::push(Int8 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "1");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt8 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "1");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(Int16 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "2");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt16 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "2");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(Int32 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "4");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt32 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "4");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(Int64 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "8");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt64 v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "8");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(const BString& v){
Int32 l = v.len() + 1;
resize(opos + sizeof(l) + l);
copyWithSwap(&odata[opos], &l, sizeof(l), "4");
opos += sizeof(l);
copyWithSwap(&odata[opos], v.retStr(), l, "1");
opos += l;
updateLen();
return 0;
}
int BoapPacket::push(Double v){
resize(opos + sizeof(v));
copyWithSwap(&odata[opos], &v, sizeof(v), "8");
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(const BError& v){
Int32 n = v.getErrorNo();
BString s = v.getString();
push(n);
push(s);
return 0;
}
int BoapPacket::push(UInt32 nBytes, const void* data, char* swapType){
resize(opos + nBytes);
copyWithSwap(&odata[opos], data, nBytes, swapType);
opos += nBytes;
updateLen();
return 0;
}
int BoapPacket::peekHead(BoapPacketHead& head){
opos = 0;
copyWithSwap(&head, &odata[opos], sizeof(head), "4");
return 0;
}
int BoapPacket::popHead(BoapPacketHead& head){
opos = 0;
copyWithSwap(&head, &odata[opos], sizeof(head), "4");
opos += sizeof(head);
return 0;
}
int BoapPacket::pop(Int8& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "1");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt8& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "1");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(Int16& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "2");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt16& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "2");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(Int32& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "4");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt32& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "4");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(Int64& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "8");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt64& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "8");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(BString& v){
Int32 l;
copyWithSwap(&l, &odata[opos], sizeof(l), "4");
opos += sizeof(l);
v = (char*)&odata[opos];
opos += l;
return 0;
}
int BoapPacket::pop(Double& v){
copyWithSwap(&v, &odata[opos], sizeof(v), "8");
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(BError& v){
Int32 n;
BString s;
pop(n);
pop(s);
v.set(n, s);
return 0;
}
int BoapPacket::pop(UInt32 nBytes, void* data, char* swapType){
copyWithSwap(data, &odata[opos], nBytes, swapType);
opos += nBytes;
return 0;
}
/*******************************************************************************
* Boap Client communications object
*******************************************************************************
*/
/*******************************************************************************
* Base for all Client Classes
*******************************************************************************
*/
BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){
oconnected = 0;
oservice = 0;
oapiVersion = 0;
opriority = BoapPriorityNormal;
omaxLength = 0;
otimeout = -1;
oreconnect = 0;
}
BError BoapClientObject::connectService(BString name){
BError err;
Boapns::BoapEntry entry;
BString host = "localhost";
BString boapnsName;
BSocketAddressINET add;
// Check if already connected to same service
if(!oreconnect && (name == oname) && oconnected){
return err;
}
// Check if a re-connect
if(oconnected){
oconnected = 0;
close();
init(STREAM);
}
oname = name;
// Parse name
if(name.subString(0, 2) == "//"){
name.pullSeparators("/");
host = name.pullToken("/");
}
if(name == "boapns"){
if(!(err = add.set(host, "boapns", "tcp")) || !(err = add.set(host, boapPort))){
if(! (err = connect(add))){
oservice = 0;
oconnected = 1;
}
}
}
else {
Boapns::Boapns boapns("boapns");
BIter i;
// Lookup name
boapnsName = BString("//") + host + "/boapns";
if(! (err = boapns.connectService(boapnsName))){
if(! (err = boapns.getEntry(name, entry))){
// First try and connect to the host name
dprintf("BoapClientObject::connectService: Try To Connect to: %s:%d\n", entry.hostName.retStr(), entry.port);
if(! (err = add.set(entry.hostName, entry.port))){
dprintf("BoapClientObject::connectService: Connect to hostname: %s:%d\n", entry.hostName.retStr(), entry.port);
if(! (err = connect(add))){
oservice = entry.service;
oconnected = 1;
#if APIVERSION_TEST
err = checkApiVersion();
#endif
}
}
if(!oconnected){
// Try and connect on one of the available network addresses
for(entry.addressList.end(i); !entry.addressList.isEnd(i); entry.addressList.prev(i)){
if(! (err = add.set(entry.addressList[i], entry.port))){
dprintf("BoapClientObject::connectService: Connect to ipaddress: %s:%d on %s\n", entry.hostName.retStr(), entry.port, entry.addressList[i].retStr());
if(! (err = connect(add))){
oservice = entry.service;
oconnected = 1;
#if APIVERSION_TEST
err = checkApiVersion();
#endif
break;
}
}
}
}
}
}
else {
err.set(err.getErrorNo(), BString("Connection to BoapNs (") + boapnsName + ") Failed: " + err.getString());
}
}
oreconnect = 0;
dprintf("BoapClientObject::connectService: Status: %s\n", err.getString().retStr());
return err;
}
BError BoapClientObject::disconnectService(){
BError err;
if(oconnected){
oconnected = 0;
close();
init(STREAM);
}
return err;
}
BString BoapClientObject::getServiceName(){
return oname;
}
BError BoapClientObject::ping(BUInt32& apiVersion){
BError err;
olock.lock();
err = pingLocked(apiVersion);
olock.unlock();
return err;
}
BError BoapClientObject::pingLocked(BUInt32& apiVersion){
BError err;
BError ret;
BoapPacketHead txhead;
BoapPacketHead rxhead;
if(err = connectService(oname)){
return err;
}
txhead.cmd = 0;
txhead.type = BoapMagic | BoapTypeRpc;
txhead.service = oservice;
otx.pushHead(txhead);
if(err = performCall(otx, orx)){
return err;
}
orx.popHead(rxhead);
orx.pop(apiVersion);
orx.pop(ret);
return ret;
}
BError BoapClientObject::setConnectionPriority(BoapPriority priority){
BError err;
BoapPacketHead txhead;
BoapPacketHead rxhead;
BSocket::Priority pri;
olock.lock();
if(err = connectService(oname)){
olock.unlock();
return err;
}
opriority = priority;
switch(opriority){
case BoapPriorityLow: pri = BSocket::PriorityLow; break;
case BoapPriorityNormal: pri = BSocket::PriorityNormal; break;
case BoapPriorityHigh: pri = BSocket::PriorityHigh; break;
default: pri = BSocket::PriorityNormal; break;
}
err = BSocket::setPriority(pri);
if(!err){
txhead.cmd = 1;
txhead.type = BoapMagic | BoapTypeRpc;
txhead.service = oservice;
otx.pushHead(txhead);
otx.push(priority);
if(err = performCall(otx, orx)){
return err;
}
orx.popHead(rxhead);
orx.pop(err);
}
olock.unlock();
return err;
}
void BoapClientObject::setMaxLength(BUInt32 maxLength){
omaxLength = omaxLength;
}
void BoapClientObject::setTimeout(int timeout){
otimeout = timeout;
}
BError BoapClientObject::checkApiVersion(){
BError err;
BUInt32 apiVersion;
if(! (err = pingLocked(apiVersion))){
#if DEBUG
printf("CheckVersions: Local: %d == Rem: %d\n", oapiVersion, apiVersion);
#endif
if(oapiVersion != apiVersion)
err.set(1, BString("BOAP API versions incorrect: Local: ") + oapiVersion + " Remote: " + apiVersion);
}
return err;
}
BError BoapClientObject::performSend(BoapPacket& tx){
BError err;
BSize nb = tx.nbytes();
char* data = tx.data();
BSize n = 0;
BSize nd = 0;
// Send packet
while(n < nb){
if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL))
return err;
n = n + nd;
}
return err;
}
BError BoapClientObject::performRecv(BoapPacket& rx){
BError err;
BSize nb = 0;
char* data = 0;
BSize n = 0;
BSize nd = 0;
BoapPacketHead head;
// Recieve Packet
// Read Head first
rx.resize(sizeof(BoapPacketHead));
nb = sizeof(BoapPacketHead);
data = rx.data();
n = 0;
while(n < nb){
#ifdef ZAP
if(err = recv(&data[n], nb - n, nd)){
return err;
}
#else
if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){
oreconnect = 1;
return err;
}
#endif
n = n + nd;
}
// Read rest of packet
rx.popHead(head);
// Validate packet
if((head.type & 0xFFFFFF00) != BoapMagic)
return err.set(1, "A non BOAP packet was received");
if(omaxLength && (head.length > omaxLength))
return err.set(1, "BOAP packet length was to large");
rx.resize(head.length);
nb = head.length - sizeof(BoapPacketHead);
data = &rx.data()[sizeof(BoapPacketHead)];
n = 0;
while(n < nb){
#ifdef ZAP
if(err = recv(&data[n], nb - n, nd)){
return err;
}
#else
if(err = recvWithTimeout(&data[n], nb - n, nd, otimeout)){
oreconnect = 1;
return err;
}
#endif
n = n + nd;
}
return err;
}
BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){
BError err;
if(! (err = performSend(tx))){
err = performRecv(rx);
}
if(!err && (tx.getCmd() != rx.getCmd()))
err.set(1, BString("BOAP incorrect packet for command recieved: Expected: ") + tx.getCmd() + " Got: " + rx.getCmd());
return err;
}
BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){
setBroadCast(1);
}
BError BoapSignalObject::performSend(BoapPacket& tx){
BError err;
BSize nb = tx.nbytes();
char* data = tx.data();
BSize n = 0;
BSize nd = 0;
BSocketAddressINET nadd;
nadd.set(INADDR_BROADCAST, 14000);
// Send packet
while(n < nb){
if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL))
return err;
n = n + nd;
}
return err;
}
/*******************************************************************************
* Boap Server communications object
*******************************************************************************
*/
BoapServerConnection::BoapServerConnection(BoapServer& boapServer, int fd)
: oboapServer(boapServer), osocket(fd){
omaxLength = 0;
}
BError BoapServerConnection::process(){
BError err;
unsigned int nb = 0;
char* data = 0;
unsigned int n = 0;
unsigned int nd = 0;
BoapPacketHead head;
// Recieve Packet
// Read Head first
orx.resize(sizeof(BoapPacketHead));
nb = sizeof(BoapPacketHead);
data = orx.data();
n = 0;
while(n < nb){
if(err = osocket.recv(&data[n], nb - n, nd, 0))
return err;
n = n + nd;
}
// Read rest of packet
orx.peekHead(head);
// Validate packet
if((head.type & 0xFFFFFF00) != BoapMagic)
return err.set(1, "A non BOAP packet was received");
if(omaxLength && (head.length > omaxLength))
return err.set(1, "BOAP packet length was to large");
orx.resize(head.length);
nb = head.length - sizeof(BoapPacketHead);
data = &orx.data()[sizeof(BoapPacketHead)];
n = 0;
while(n < nb){
if(err = osocket.recv(&data[n], nb - n, nd, 0))
return err;
n = n + nd;
}
dprintf("Command received: Type(%x) Len(%d) Service(%d) Cmd(%d)\n", head.type, head.length, head.service, head.cmd);
oboapServer.process(this, orx, otx);
// Send packet
nb = otx.nbytes();
data = otx.data();
n = 0;
while(n < nb){
if(err = osocket.send(&data[n], nb - n, nd, MSG_NOSIGNAL))
return err;
n = n + nd;
}
return err;
}
BSocket& BoapServerConnection::getSocket(){
return osocket;
}
void BoapServerConnection::setMaxLength(BUInt32 maxLength){
omaxLength = maxLength;
}
void* BoapServerConnection::function(){
BError err;
while(1){
if(err = process()){
dprintf("BoapServerConnection::function: Error: %s\n", err.getString().retStr());
oboapServer.clientGone(this);
return 0;
}
}
}
BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) {
othreaded = 0;
oisBoapns = 0;
onet.setReuseAddress(1);
oboapns = new Boapns::Boapns();
}
BoapServer::~BoapServer(){
delete oboapns;
}
BError BoapServer::init(BString boapNsHost, int threaded, int isBoapns){
BError err;
BIter i;
BoapService s;
BList<BString> addressList;
BSocketAddressINET nadd;
BString boapNsObject = "boapns";
BString name;
othreaded = threaded;
oisBoapns = isBoapns;
ohostName = BSocketAddressINET::getHostName();
addressList = BSocketAddressINET::getIpAddressListAll();
if(oisBoapns)
nadd.set("", "boapns", "tcp");
else
nadd.set("", 0);
// Create service socket
if(err = onet.bind(nadd))
return err;
// Create event socket
if(err = onet.getAddress(nadd))
return err;
if(err = onetEvent.bind(nadd))
return err;
// BOAP event broadcast address
onetEventAddress.set("", "boapns", "udp");
onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port());
onetEvent.setBroadCast(1);
if(!oisBoapns){
if(boapNsHost != "")
boapNsObject = BString("//") + boapNsHost + "/" + boapNsObject;
if(err = oboapns->connectService(boapNsObject)){
err.set(err.getErrorNo(), BString("Connection to BoapNs Failed: ") + err.getString());
return err;
}
for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
// If a name is not given, get one from the Boapns
if(oservices[i].oobject->name() == ""){
oboapns->getNewName(name);
oservices[i].oobject->setName(name);
}
Boapns::BoapEntry e = Boapns::BoapEntry(oservices[i].oobject->name(), ohostName, addressList, nadd.port(), s);
if(err = oboapns->addEntry(e))
break;
}
}
return err;
}
void BoapServer::clientGone(BoapServerConnection* client){
oclientGoneEvent.sendEvent(client->getSocket().getFd());
}
void* BoapServer::function(){
run(0);
return 0;
}
BError BoapServer::run(int inThread){
BError err;
BError e;
BSocketAddressINET nadd;
int fd;
BIter i;
BoapServerConnection* c;
if(inThread){
start();
return err;
}
opoll.append(onet.getFd());
opoll.append(oclientGoneEvent.getFd());
opoll.append(onetEvent.getFd());
onet.listen();
while(1){
opoll.doPoll(fd);
if(fd == onet.getFd()){
onet.accept(fd, nadd);
dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr());
oclients.append(c = new BoapServerConnection(*this, fd));
if(othreaded){
c->start();
}
else {
opoll.append(fd);
}
}
else if(fd == oclientGoneEvent.getFd()){
oclientGoneEvent.getEvent(fd);
dprintf("Client Gone Event: Fd: %d\n", fd);
if(othreaded){
for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){
if(oclients[i]->getSocket().getFd() == fd){
oclients[i]->waitForCompletion();
delete oclients[i];
oclients.del(i);
close(fd);
break;
}
}
}
}
else if(fd == onetEvent.getFd()){
processEvent(fd);
}
else if(fd >= 0){
dprintf("Process Client: %d\n", fd);
for(oclients.start(i); !oclients.isEnd(i); oclients.next(i)){
if(oclients[i]->getSocket().getFd() == fd){
if(e = oclients[i]->process()){
dprintf("ClientGone: %d Error: %s\n", fd, e.getString().retStr());
opoll.delFd(fd);
delete oclients[i];
oclients.del(i);
close(fd);
}
break;
}
}
}
}
return err;
}
BError BoapServer::addObject(BoapServiceObject* object){
BError err;
oservices.append(BoapServiceEntry(oservices.number(), object));
return err;
}
BError BoapServer::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
BError err;
BIter i;
BoapPacketHead head;
rx.popHead(head);
for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){
if(oservices[i].oservice == head.service){
err = oservices[i].oobject->process(conn, rx, tx);
break;
}
}
return err;
}
BError BoapServer::processEvent(BoapPacket& rx){
BError err;
BIter i;
int s;
dprintf("BoapServer::processEvent\n");
for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
err = oservices[i].oobject->processEvent(rx);
}
return err;
}
BError BoapServer::processEvent(int fd){
BError err;
int nb = 0;
BoapPacketHead head;
BoapPacket rx;
// Recieve Packet
rx.resize(1500);
if((nb = recv(fd, rx.data(), 1500, 0)) < 0)
return BError(errno, strerror(errno));
rx.popHead(head);
dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd);
return processEvent(rx);
}
BError BoapServer::sendEvent(BoapPacket& tx){
BError err;
int nb = 0;
char* data = 0;
int n = 0;
uint32_t nsent;
// Send packet
nb = tx.nbytes();
data = tx.data();
n = 0;
while(n < nb){
dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr());
err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent);
dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr());
if(err)
return err;
n = n + nsent;
}
return err;
}
BSocket& BoapServer::getSocket(){
return onet;
}
BSocket& BoapServer::getEventSocket(){
return onetEvent;
}
BString BoapServer::getHostName(){
return ohostName;
}
int BoapServer::getConnectionsNumber(){
return oclients.number();
}
/*******************************************************************************
* Base for all Server Objects
*******************************************************************************
*/
BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) :
ocmd(cmd), ofunc(func){
}
BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){
oapiVersion = 0;
oserver.addObject(this);
ofuncList.append(BoapFuncEntry(0, &BoapServiceObject::doPing));
ofuncList.append(BoapFuncEntry(1, &BoapServiceObject::doConnectionPriority));
}
BoapServiceObject::~BoapServiceObject(){
}
BError BoapServiceObject::setName(BString name){
BError err;
oname = name;
return err;
}
BString BoapServiceObject::name(){
return oname;
}
BError BoapServiceObject::doPing(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
BError err;
BoapPacketHead rxhead;
BoapPacketHead txhead;
rx.popHead(rxhead);
txhead.type = BoapMagic | BoapTypeRpcReply;
txhead.service = rxhead.service;
txhead.cmd = rxhead.cmd;
tx.pushHead(txhead);
tx.push(oapiVersion);
tx.push(err);
return err;
}
BError BoapServiceObject::doConnectionPriority(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
BError err;
BoapPacketHead rxhead;
BoapPacketHead txhead;
BUInt32 priority;
BSocket::Priority pri;
rx.popHead(rxhead);
rx.pop(priority);
switch(priority){
case BoapPriorityLow: pri = BSocket::PriorityLow; break;
case BoapPriorityNormal: pri = BSocket::PriorityNormal; break;
case BoapPriorityHigh: pri = BSocket::PriorityHigh; break;
default: pri = BSocket::PriorityNormal; break;
}
conn->getSocket().setPriority(pri);
txhead.type = BoapMagic | BoapTypeRpcReply;
txhead.service = rxhead.service;
txhead.cmd = rxhead.cmd;
tx.pushHead(txhead);
tx.push(err);
return err;
}
BError BoapServiceObject::process(BoapServerConnection* conn, BoapPacket& rx, BoapPacket& tx){
BError err;
BoapPacketHead head;
BIter i;
rx.popHead(head);
for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){
if(ofuncList[i].ocmd == head.cmd){
err = (this->*ofuncList[i].ofunc)(conn, rx, tx);
break;
}
}
return err;
}
BError BoapServiceObject::sendEvent(BString name, Int32 arg){
BError err;
BoapPacketHead txhead;
BoapPacket tx;
BString fullName = BString("//") + oserver.getHostName() + "/" + oname;
txhead.type = BoapMagic | BoapTypeSignal;
txhead.service = 0;
txhead.cmd = 0;
tx.pushHead(txhead);
tx.push(fullName);
tx.push(name);
tx.push(arg);
if(err = sendEvent(tx))
return err;
return err;
}
BError BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg){
BError err;
dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg)\n");
return err;
}
BError BoapServiceObject::sendEvent(BoapPacket& tx){
return oserver.sendEvent(tx);
}
BError BoapServiceObject::processEvent(BoapPacket& rx){
BError err;
BoapPacketHead rxhead;
BError ret;
BString objectName;
BString signalName;
Int32 arg;
dprintf("BoapServiceObject::processEvent\n");
rx.popHead(rxhead);
rx.pop(objectName);
rx.pop(signalName);
rx.pop(arg);
ret = processEvent(objectName, signalName, arg);
return err;
}