/*******************************************************************************
* Boap.cc Boap RPC protocol
* T.Barnaby, BEAM Ltd, 8/5/03
*
* History:
* 4/2/05: Updated addressing scheme
*******************************************************************************
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <Boap.h>
// Boap NS
#include <BoapnsD.h>
#include <BoapnsC.h>
#define DEBUG 0
#if DEBUG
#define dprintf(fmt, a...) printf(fmt, ##a);
#else
#define dprintf(fmt, a...)
#endif
const int roundSize = 256;
BoapPacket::BoapPacket(){
opos = 0;
osize = roundSize;
odata = (char*)malloc(osize);
}
BoapPacket::~BoapPacket(){
delete odata;
}
int BoapPacket::resize(int size){
if(size > osize){
osize = roundSize * ((size + roundSize - 1) / roundSize);
odata = (char*)realloc(odata, osize);
}
return 0;
}
BError BoapPacket::setData(void* data, int nbytes){
BError err;
resize(nbytes);
memcpy(odata, data, nbytes);
return err;
}
int BoapPacket::nbytes(){
return ((BoapPacketHead*)odata)->length;
}
char* BoapPacket::data(){
return odata;
}
void BoapPacket::updateLen(){
((BoapPacketHead*)odata)->length = opos;
}
int BoapPacket::pushHead(BoapPacketHead& head){
opos = 0;
resize(opos + sizeof(head));
memcpy(&odata[opos], &head, sizeof(head));
opos += sizeof(head);
updateLen();
return 0;
}
int BoapPacket::push(Int8 v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt8 v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(Int16 v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt16 v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(Int32 v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(UInt32 v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(BString& v){
Int32 l = v.len() + 1;
resize(opos + sizeof(l) + l);
memcpy(&odata[opos], &l, sizeof(l));
opos += sizeof(l);
memcpy(&odata[opos], v.retStr(), l);
opos += l;
updateLen();
return 0;
}
int BoapPacket::push(Double v){
resize(opos + sizeof(v));
memcpy(&odata[opos], &v, sizeof(v));
opos += sizeof(v);
updateLen();
return 0;
}
int BoapPacket::push(BError& v){
Int32 n = v.getErrorNo();
BString s = v.getString();
push(n);
push(s);
return 0;
}
int BoapPacket::push(UInt32 nBytes, const void* data){
}
int BoapPacket::popHead(BoapPacketHead& head){
opos = 0;
memcpy(&head, &odata[opos], sizeof(head));
opos += sizeof(head);
return 0;
}
int BoapPacket::pop(Int8& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt8& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(Int16& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt16& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(Int32& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(UInt32& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(BString& v){
Int32 l;
memcpy(&l, &odata[opos], sizeof(l));
opos += sizeof(l);
v = (char*)&odata[opos];
opos += l;
return 0;
}
int BoapPacket::pop(Double& v){
memcpy(&v, &odata[opos], sizeof(v));
opos += sizeof(v);
return 0;
}
int BoapPacket::pop(BError& v){
Int32 n;
BString s;
pop(n);
pop(s);
v.set(n, s);
return 0;
}
int BoapPacket::pop(UInt32 nBytes, void* data){
return 0;
}
/*******************************************************************************
* Boap Client communications object
*******************************************************************************
*/
/*******************************************************************************
* Base for all Client Classes
*******************************************************************************
*/
BoapClientObject::BoapClientObject(BString name): BSocket(STREAM), oname(name){
oconnected = 0;
oservice = 0;
}
BError BoapClientObject::connectService(BString name){
BError err;
Boapns::Boapns boapns("boapns");
Boapns::BoapEntry entry;
BString host = "localhost";
BString boapnsName;
BSocketAddressINET add;
if(oconnected)
return err;
oname = name;
// Parse name
if(name.subString(0, 2) == "//"){
name.pullSeparators("/");
host = name.pullToken("/");
}
if(name == "boapns"){
if(! (err = add.set(host, "boapns", "tcp"))){
if(! (err = connect(add))){
oservice = 0;
oconnected = 1;
}
}
}
else {
// Lookup name
boapnsName = BString("//") + host + "/boapns";
if(! (err = boapns.connectService(boapnsName))){
if(! (err = boapns.getEntry(name, entry))){
if(! (err = add.set(entry.ipAddress, entry.port))){
if(! (err = connect(add))){
oservice = entry.service;
oconnected = 1;
}
}
}
}
}
return err;
}
BError BoapClientObject::performSend(BoapPacket& tx){
BError err;
BSize nb = tx.nbytes();
char* data = tx.data();
BSize n = 0;
BSize nd = 0;
// Connect if required
if(err = connectService(oname))
return err;
// Send packet
while(n < nb){
if(err = send(&data[n], nb - n, nd, MSG_NOSIGNAL))
return err;
n = n + nd;
}
return err;
}
BError BoapClientObject::performRecv(BoapPacket& rx){
BError err;
BSize nb = 0;
char* data = 0;
BSize n = 0;
BSize nd = 0;
BoapPacketHead head;
// Connect if required
if(err = connectService(oname))
return err;
// Recieve Packet
// Read Head first
rx.resize(sizeof(BoapPacketHead));
nb = sizeof(BoapPacketHead);
data = rx.data();
n = 0;
while(n < nb){
if(err = recv(&data[n], nb - n, nd))
return err;
n = n + nd;
}
// Read rest of packet
rx.popHead(head);
rx.resize(head.length);
nb = head.length - sizeof(BoapPacketHead);
data = &rx.data()[sizeof(BoapPacketHead)];
n = 0;
while(n < nb){
if(err = recv(&data[n], nb - n, nd))
return err;
n = n + nd;
}
return err;
}
BError BoapClientObject::performCall(BoapPacket& tx, BoapPacket& rx){
BError err;
if(! (err = performSend(tx))){
err = performRecv(rx);
}
return err;
}
BoapSignalObject::BoapSignalObject(): BSocket(DGRAM){
setBroadCast(1);
}
BError BoapSignalObject::performSend(BoapPacket& tx){
BError err;
BSize nb = tx.nbytes();
char* data = tx.data();
BSize n = 0;
BSize nd = 0;
BSocketAddressINET nadd;
nadd.set(INADDR_BROADCAST, 14000);
// Send packet
while(n < nb){
if(err = sendTo(nadd, &data[n], nb - n, nd, MSG_NOSIGNAL))
return err;
n = n + nd;
}
return err;
}
/*******************************************************************************
* Boap Server communications object
*******************************************************************************
*/
BoapServer::BoapServer() : onet(BSocket::STREAM), onetEvent(BSocket::DGRAM) {
oboapNs = 0;
onet.setReuseAddress(1);
}
BError BoapServer::init(int boapNs){
BError err;
Boapns::Boapns boapns("boapns");
BIter i;
BoapService s;
BList<uint32_t> l;
uint32_t ipAddress;
BSocketAddressINET nadd;
oboapNs = boapNs;
ohostName = BSocketAddressINET::getHostName();
l = BSocketAddressINET::getIpAddresses();
if(l.number())
ipAddress = l[0];
else
ipAddress = 0;
if(boapNs)
nadd.set("", "boapns", "tcp");
else
nadd.set("", 0);
// Create service socket
if(err = onet.bind(nadd))
return err;
// Create event socket
if(err = onet.getAddress(nadd))
return err;
if(err = onetEvent.bind(nadd))
return err;
// BOAP event broadcast address
onetEventAddress.set("", "boapns", "udp");
onetEventAddress.set(INADDR_BROADCAST, onetEventAddress.port());
onetEvent.setBroadCast(1);
if(!boapNs){
for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
if(err = boapns.addEntry(Boapns::BoapEntry(oservices[i].oobject->name(), ipAddress, nadd.port(), s)))
break;
}
}
return err;
}
BError BoapServer::run(){
BError err;
BSocketAddressINET nadd;
int fd;
opoll.append(onet.getFd());
opoll.append(onetEvent.getFd());
onet.listen();
while(1){
opoll.doPoll(fd);
if(fd == onet.getFd()){
onet.accept(fd, nadd);
dprintf("Accept Connection: Fd: %d Add: %s\n", fd, nadd.getString().retStr());
opoll.append(fd);
}
else if(fd == onetEvent.getFd()){
processEvent(fd);
}
else if(fd >= 0){
if(process(fd)){
opoll.delFd(fd);
close(fd);
}
}
}
return err;
}
BError BoapServer::addObject(BoapServiceObject* object){
BError err;
oservices.append(BoapServiceEntry(oservices.number(), object));
return err;
}
BError BoapServer::process(int fd){
BError err;
int nb = 0;
char* data = 0;
int n = 0;
int nd = 0;
BoapPacketHead head;
BIter i;
// Recieve Packet
// Read Head first
orx.resize(sizeof(BoapPacketHead));
nb = sizeof(BoapPacketHead);
data = orx.data();
n = 0;
while(n < nb){
nd = recv(fd, &data[n], nb - n, 0);
if(nd <= 0)
return BError(1, strerror(errno));
n = n + nd;
}
// Read rest of packet
orx.popHead(head);
orx.resize(head.length);
nb = head.length - sizeof(BoapPacketHead);
data = &orx.data()[sizeof(BoapPacketHead)];
n = 0;
while(n < nb){
nd = recv(fd, &data[n], nb - n, 0);
if(nd <= 0)
return BError(1, strerror(errno));
n = n + nd;
}
dprintf("Command received: Len(%d) Type(%d) Service(%d) Cmd(%d)\n", head.length, head.type, head.service, head.cmd);
for(oservices.start(i); !oservices.isEnd(i); oservices.next(i)){
if(oservices[i].oservice == head.service){
oservices[i].oobject->process(orx, otx);
break;
}
}
// Send packet
nb = otx.nbytes();
data = otx.data();
n = 0;
while(n < nb){
nd = send(fd, &data[n], nb - n, MSG_NOSIGNAL);
if(nd <= 0)
return BError(1, strerror(errno));
n = n + nd;
}
return err;
}
BError BoapServer::processEvent(BoapPacket& rx){
BError err;
BIter i;
int s;
dprintf("BoapServer::processEvent\n");
for(oservices.start(i), s = 0; !oservices.isEnd(i); oservices.next(i), s++){
oservices[i].oobject->processEvent(rx);
}
return err;
}
BError BoapServer::processEvent(int fd){
BError err;
int nb = 0;
BoapPacketHead head;
BoapPacket rx;
// Recieve Packet
rx.resize(1500);
if((nb = recv(fd, rx.data(), 1500, 0)) < 0)
return BError(errno, strerror(errno));
orx.popHead(head);
dprintf("Event received: Len(%d) Service(%d) Cmd(%d)\n", head.length, head.service, head.cmd);
return processEvent(rx);
}
BError BoapServer::sendEvent(BoapPacket& tx){
BError err;
int nb = 0;
char* data = 0;
int n = 0;
uint32_t nsent;
// Send packet
nb = tx.nbytes();
data = tx.data();
n = 0;
while(n < nb){
dprintf("BoapServer::sendEvent: Address: %s\n", onetEventAddress.getString().retStr());
err = onetEvent.sendTo(onetEventAddress, &data[n], nb - n, nsent);
dprintf("BoapServer::sendEvent: result: %d:%s\n", err.getErrorNo(), err.getString().retStr());
if(err)
return err;
n = n + nsent;
}
return err;
}
BSocket& BoapServer::getSocket(){
return onet;
}
BSocket& BoapServer::getEventSocket(){
return onetEvent;
}
BString BoapServer::getHostName(){
return ohostName;
}
/*******************************************************************************
* Base for all Server Objects
*******************************************************************************
*/
BoapFuncEntry::BoapFuncEntry(int cmd, BoapFunc func) :
ocmd(cmd), ofunc(func){
}
BoapServiceObject::BoapServiceObject(BoapServer& server, BString name) : oserver(server), oname(name){
oserver.addObject(this);
}
BoapServiceObject::~BoapServiceObject(){
}
BString BoapServiceObject::name(){
return oname;
}
BError BoapServiceObject::process(BoapPacket& rx, BoapPacket& tx){
BError err;
BoapPacketHead head;
BIter i;
rx.popHead(head);
for(ofuncList.start(i); !ofuncList.isEnd(i); ofuncList.next(i)){
if(ofuncList[i].ocmd == head.cmd){
err = (this->*ofuncList[i].ofunc)(rx, tx);
break;
}
}
return err;
}
BError BoapServiceObject::sendEvent(BString name, Int32 arg){
BError err;
BoapPacketHead txhead;
BoapPacket tx;
BString fullName = BString("//") + oserver.getHostName() + "/" + oname;
txhead.type = BoapTypeSignal;
txhead.service = 0;
txhead.cmd = 0;
tx.pushHead(txhead);
tx.push(fullName);
tx.push(name);
tx.push(arg);
if(err = sendEvent(tx))
return err;
return err;
}
BError BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg){
BError err;
dprintf("BoapServiceObject::processEvent(BString objectName, BString signalName, Int32 arg)\n");
return err;
}
BError BoapServiceObject::sendEvent(BoapPacket& tx){
return oserver.sendEvent(tx);
}
BError BoapServiceObject::processEvent(BoapPacket& rx){
BError err;
BoapPacketHead rxhead;
BError ret;
BString objectName;
BString signalName;
Int32 arg;
dprintf("BoapServiceObject::processEvent\n");
rx.popHead(rxhead);
rx.pop(objectName);
rx.pop(signalName);
rx.pop(arg);
ret = processEvent(objectName, signalName, arg);
return err;
}