|
INET Framework for OMNeT++/OMNEST
|
00001 // 00002 // Copyright (C) 2008 Irene Ruengeler 00003 // Copyright (C) 2009 Thomas Dreibholz 00004 // 00005 // This program is free software; you can redistribute it and/or 00006 // modify it under the terms of the GNU General Public License 00007 // as published by the Free Software Foundation; either version 2 00008 // of the License, or (at your option) any later version. 00009 // 00010 // This program is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 // GNU General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU General Public License 00016 // along with this program; if not, see <http://www.gnu.org/licenses/>. 00017 // 00018 00019 00020 #include "SCTPServer.h" 00021 #include "SCTPSocket.h" 00022 #include "SCTPCommand_m.h" 00023 #include "SCTPMessage_m.h" 00024 #include <stdlib.h> 00025 #include <stdio.h> 00026 #include "SCTPAssociation.h" 00027 00028 #define MSGKIND_CONNECT 0 00029 #define MSGKIND_SEND 1 00030 #define MSGKIND_ 2 00031 00032 Define_Module(SCTPServer); 00033 00034 void SCTPServer::initialize() 00035 { 00036 char * token; 00037 cPar *delT; 00038 AddressVector addresses; 00039 socket = NULL; 00040 sctpEV3<<"initialize SCTP Server\n"; 00041 numSessions = packetsSent = packetsRcvd = bytesSent = notifications = 0; 00042 WATCH(numSessions); 00043 WATCH(packetsSent); 00044 WATCH(packetsRcvd); 00045 WATCH(bytesSent); 00046 WATCH(numRequestsToSend); 00047 // parameters 00048 finishEndsSimulation = (bool)par("finishEndsSimulation"); 00049 const char* address = par("address"); 00050 token = strtok((char*)address,","); 00051 while (token != NULL) 00052 { 00053 addresses.push_back(IPvXAddress(token)); 00054 token = strtok(NULL, ","); 00055 } 00056 int32 port = par("port"); 00057 echoFactor = par("echoFactor"); 00058 delay = par("echoDelay"); 00059 delayFirstRead = par("delayFirstRead"); 00060 delT = &par("readingInterval"); 00061 if (delT->isNumeric() && (double)*delT==0) 00062 readInt=false; 00063 else 00064 readInt=true; 00065 int32 messagesToPush = par("messagesToPush"); 00066 inboundStreams = par("inboundStreams"); 00067 outboundStreams = par("outboundStreams"); 00068 ordered = (bool)par("ordered"); 00069 queueSize = par("queueSize"); 00070 lastStream = 0; 00071 //abort = NULL; 00072 //abortSent = false; 00073 timeoutMsg = new cMessage("SrvAppTimer"); 00074 delayTimer = new cMessage("delayTimer"); 00075 delayTimer->setContextPointer(this); 00076 delayFirstReadTimer = new cMessage("delayFirstReadTimer"); 00077 firstData = true; 00078 SCTPSocket *socket = new SCTPSocket(); 00079 socket->setOutputGate(gate("sctpOut")); 00080 socket->setInboundStreams(inboundStreams); 00081 socket->setOutboundStreams(outboundStreams); 00082 if (strcmp(address,"")==0) 00083 socket->bind(port); 00084 else 00085 { 00086 socket->bindx(addresses, port); 00087 } 00088 socket->listen(true, par("numPacketsToSendPerClient"), messagesToPush); 00089 sctpEV3<<"SCTPServer::initialized listen port="<<port<<"\n"; 00090 schedule = false; 00091 shutdownReceived = false; 00092 } 00093 00094 void SCTPServer::sendOrSchedule(cPacket *msg) 00095 { 00096 if (delay==0) 00097 { 00098 send(msg, "sctpOut"); 00099 } 00100 else 00101 { 00102 scheduleAt(simulation.getSimTime()+delay, msg); 00103 } 00104 } 00105 00106 void SCTPServer::generateAndSend() 00107 { 00108 uint32 numBytes; 00109 00110 cPacket* cmsg = new cPacket("CMSG"); 00111 SCTPSimpleMessage* msg = new SCTPSimpleMessage("Server"); 00112 numBytes = (uint32)par("requestLength"); 00113 msg->setDataArraySize(numBytes); 00114 for (uint32 i=0; i<numBytes; i++) 00115 { 00116 msg->setData(i, 's'); 00117 } 00118 msg->setDataLen(numBytes); 00119 msg->setBitLength(numBytes * 8); 00120 cmsg->encapsulate(msg); 00121 SCTPSendCommand *cmd = new SCTPSendCommand("Send1"); 00122 cmd->setAssocId(assocId); 00123 if (ordered) 00124 cmd->setSendUnordered(COMPLETE_MESG_ORDERED); 00125 else 00126 cmd->setSendUnordered(COMPLETE_MESG_UNORDERED); 00127 lastStream=(lastStream+1)%outboundStreams; 00128 cmd->setSid(lastStream); 00129 if (queueSize>0 && numRequestsToSend > 0 && count < queueSize*2) 00130 cmd->setLast(false); 00131 else 00132 cmd->setLast(true); 00133 cmsg->setKind(SCTP_C_SEND); 00134 cmsg->setControlInfo(cmd); 00135 packetsSent++; 00136 bytesSent+=msg->getBitLength()/8; 00137 sendOrSchedule(cmsg); 00138 } 00139 00140 cPacket* SCTPServer::makeReceiveRequest(cPacket* msg) 00141 { 00142 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); 00143 cPacket* cmsg = new cPacket("ReceiveRequest"); 00144 SCTPSendCommand *cmd = new SCTPSendCommand("Send2"); 00145 cmd->setAssocId(ind->getAssocId()); 00146 cmd->setSid(ind->getSid()); 00147 cmd->setNumMsgs(ind->getNumMsgs()); 00148 cmsg->setKind(SCTP_C_RECEIVE); 00149 cmsg->setControlInfo(cmd); 00150 delete ind; 00151 return cmsg; 00152 } 00153 00154 cPacket* SCTPServer::makeDefaultReceive() 00155 { 00156 cPacket* cmsg = new cPacket("DefaultReceive"); 00157 SCTPSendCommand *cmd = new SCTPSendCommand("Send3"); 00158 cmd->setAssocId(assocId); 00159 cmd->setSid(0); 00160 cmd->setNumMsgs(1); 00161 cmsg->setKind(SCTP_C_RECEIVE); 00162 cmsg->setControlInfo(cmd); 00163 return cmsg; 00164 } 00165 00166 cPacket* SCTPServer::makeAbortNotification(SCTPCommand* msg) 00167 { 00168 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg); 00169 cPacket* cmsg = new cPacket("AbortNotification"); 00170 SCTPSendCommand *cmd = new SCTPSendCommand("Send4"); 00171 assocId = ind->getAssocId(); 00172 cmd->setAssocId(assocId); 00173 cmd->setSid(ind->getSid()); 00174 cmd->setNumMsgs(ind->getNumMsgs()); 00175 cmsg->setControlInfo(cmd); 00176 delete ind; 00177 //delete msg; 00178 cmsg->setKind(SCTP_C_ABORT); 00179 return cmsg; 00180 } 00181 00182 void SCTPServer::handleMessage(cMessage *msg) 00183 { 00184 int32 id; 00185 cPacket* cmsg; 00186 00187 if (msg->isSelfMessage()) 00188 { 00189 00190 handleTimer(msg); 00191 } 00192 else 00193 { 00194 switch (msg->getKind()) 00195 { 00196 case SCTP_I_PEER_CLOSED: 00197 case SCTP_I_ABORT: 00198 { 00199 SCTPCommand *command = dynamic_cast<SCTPCommand *>(msg->removeControlInfo()); 00200 assocId = command->getAssocId(); 00201 serverAssocStatMap[assocId].peerClosed = true; 00202 if ((long) par("numPacketsToReceivePerClient")==0) 00203 { 00204 if (serverAssocStatMap[assocId].abortSent==false) 00205 { 00206 sendOrSchedule(makeAbortNotification(command->dup())); 00207 serverAssocStatMap[assocId].abortSent = true; 00208 } 00209 } 00210 else 00211 { 00212 if (serverAssocStatMap[assocId].rcvdPackets==(unsigned long) par("numPacketsToReceivePerClient") && 00213 serverAssocStatMap[assocId].abortSent==false) 00214 { 00215 sendOrSchedule(makeAbortNotification(command->dup())); 00216 serverAssocStatMap[assocId].abortSent = true; 00217 } 00218 } 00219 if (delayTimer->isScheduled()) 00220 cancelEvent(delayTimer); 00221 if (delayFirstReadTimer->isScheduled()) 00222 cancelEvent(delayFirstReadTimer); 00223 delete command; 00224 delete msg; 00225 break; 00226 } 00227 case SCTP_I_ESTABLISHED: 00228 { 00229 count=0; 00230 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->removeControlInfo()); 00231 numSessions++; 00232 assocId = connectInfo->getAssocId(); 00233 inboundStreams = connectInfo->getInboundStreams(); 00234 outboundStreams = connectInfo->getOutboundStreams(); 00235 serverAssocStatMap[assocId].rcvdPackets= (long) par("numPacketsToReceivePerClient"); 00236 serverAssocStatMap[assocId].sentPackets= (long) par("numPacketsToSendPerClient"); 00237 serverAssocStatMap[assocId].rcvdBytes=0; 00238 serverAssocStatMap[assocId].start=0; 00239 serverAssocStatMap[assocId].stop=0; 00240 serverAssocStatMap[assocId].lifeTime=0; 00241 serverAssocStatMap[assocId].abortSent=false; 00242 serverAssocStatMap[assocId].peerClosed = false; 00243 char text[30]; 00244 sprintf(text, "App: Received Bytes of assoc %d",assocId); 00245 bytesPerAssoc[assocId] = new cOutVector(text); 00246 sprintf(text, "App: EndToEndDelay of assoc %d",assocId); 00247 endToEndDelay[assocId] = new cOutVector(text); 00248 00249 delete connectInfo; 00250 delete msg; 00251 if ((long) par("numPacketsToSendPerClient") > 0) 00252 { 00253 ServerAssocStatMap::iterator i = serverAssocStatMap.find(assocId); 00254 numRequestsToSend = i->second.sentPackets; 00255 if ((simtime_t)par("thinkTime") > 0) 00256 { 00257 generateAndSend(); 00258 timeoutMsg->setKind(SCTP_C_SEND); 00259 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg); 00260 numRequestsToSend--; 00261 i->second.sentPackets = numRequestsToSend; 00262 } 00263 else 00264 { 00265 if (queueSize==0) 00266 { 00267 while (numRequestsToSend > 0) 00268 { 00269 generateAndSend(); 00270 numRequestsToSend--; 00271 i->second.sentPackets = numRequestsToSend; 00272 } 00273 } 00274 else if (queueSize>0) 00275 { 00276 while (numRequestsToSend > 0 && count++ < queueSize*2) 00277 { 00278 generateAndSend(); 00279 numRequestsToSend--; 00280 i->second.sentPackets = numRequestsToSend; 00281 } 00282 00283 cPacket* cmsg = new cPacket("Queue"); 00284 SCTPInfo* qinfo = new SCTPInfo("Info1"); 00285 qinfo->setText(queueSize); 00286 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT); 00287 qinfo->setAssocId(id); 00288 cmsg->setControlInfo(qinfo); 00289 sendOrSchedule(cmsg); 00290 } 00291 ServerAssocStatMap::iterator j=serverAssocStatMap.find(assocId); 00292 if (j->second.rcvdPackets == 0 && (simtime_t)par("waitToClose")>0) 00293 { 00294 char as[5]; 00295 sprintf(as, "%d",assocId); 00296 cPacket* abortMsg = new cPacket(as); 00297 abortMsg->setKind(SCTP_I_ABORT); 00298 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), abortMsg); 00299 } 00300 else 00301 { 00302 sctpEV3<<"no more packets to send, call shutdown for assoc "<<assocId<<"\n"; 00303 cPacket* cmsg = new cPacket("ShutdownRequest"); 00304 SCTPCommand* cmd = new SCTPCommand("Send5"); 00305 cmsg->setKind(SCTP_C_SHUTDOWN); 00306 cmd->setAssocId(assocId); 00307 cmsg->setControlInfo(cmd); 00308 sendOrSchedule(cmsg); 00309 } 00310 } 00311 } 00312 break; 00313 } 00314 case SCTP_I_DATA_NOTIFICATION: 00315 { 00316 notifications++; 00317 00318 00319 if (schedule==false) 00320 { 00321 if (delayFirstRead>0 && !delayFirstReadTimer->isScheduled()) 00322 { 00323 00324 cmsg=makeReceiveRequest(PK(msg)); 00325 scheduleAt(simulation.getSimTime()+delayFirstRead, cmsg); 00326 scheduleAt(simulation.getSimTime()+delayFirstRead, delayFirstReadTimer); 00327 } 00328 else if (readInt && firstData) 00329 { 00330 firstData=false; 00331 cmsg=makeReceiveRequest(PK(msg)); 00332 scheduleAt(simulation.getSimTime()+(simtime_t)par("readingInterval"), delayTimer); 00333 sendOrSchedule(cmsg); 00334 } 00335 else if (delayFirstRead==0 && readInt==false) 00336 { 00337 cmsg=makeReceiveRequest(PK(msg)); 00338 sendOrSchedule(cmsg); 00339 } 00340 00341 } 00342 else 00343 { 00344 sctpEV3<<simulation.getSimTime()<<" makeReceiveRequest\n"; 00345 cmsg=makeReceiveRequest(PK(msg)); 00346 sendOrSchedule(cmsg); 00347 } 00348 delete msg; 00349 break; 00350 } 00351 case SCTP_I_DATA: 00352 { 00353 notifications--; 00354 packetsRcvd++; 00355 sctpEV3<<simulation.getSimTime()<<" server: data arrived. "<<packetsRcvd<<" Packets received now\n"; 00356 SCTPRcvCommand *ind = check_and_cast<SCTPRcvCommand *>(msg->removeControlInfo()); 00357 id = ind->getAssocId(); 00358 ServerAssocStatMap::iterator j=serverAssocStatMap.find(id); 00359 BytesPerAssoc::iterator k=bytesPerAssoc.find(id); 00360 if (j->second.rcvdBytes == 0) 00361 j->second.start = simulation.getSimTime(); 00362 00363 j->second.rcvdBytes+= PK(msg)->getByteLength(); 00364 k->second->record(j->second.rcvdBytes); 00365 00366 if (echoFactor==0) 00367 { 00368 if ((uint32)par("numPacketsToReceivePerClient")>0) 00369 { 00370 j->second.rcvdPackets--; 00371 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg); 00372 EndToEndDelay::iterator m=endToEndDelay.find(id); 00373 m->second->record(simulation.getSimTime()-smsg->getCreationTime()); 00374 sctpEV3<<"server: Data received. Left packets to receive="<<j->second.rcvdPackets<<"\n"; 00375 00376 if (j->second.rcvdPackets == 0) 00377 { 00378 if (serverAssocStatMap[assocId].peerClosed==true && serverAssocStatMap[assocId].abortSent==false) 00379 { 00380 sendOrSchedule(makeAbortNotification(ind)); 00381 serverAssocStatMap[assocId].abortSent = true; 00382 j->second.stop = simulation.getSimTime(); 00383 j->second.lifeTime = j->second.stop - j->second.start; 00384 break; 00385 } 00386 else 00387 { 00388 cPacket* cmsg = new cPacket("Request"); 00389 SCTPInfo* qinfo = new SCTPInfo("Info2"); 00390 cmsg->setKind(SCTP_C_NO_OUTSTANDING); 00391 qinfo->setAssocId(id); 00392 cmsg->setControlInfo(qinfo); 00393 sendOrSchedule(cmsg); 00394 j->second.stop = simulation.getSimTime(); 00395 j->second.lifeTime = j->second.stop - j->second.start; 00396 } 00397 } 00398 } 00399 delete msg; 00400 } 00401 else 00402 { 00403 SCTPSendCommand *cmd = new SCTPSendCommand("Send6"); 00404 cmd->setAssocId(id); 00405 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup()); 00406 EndToEndDelay::iterator n=endToEndDelay.find(id); 00407 n->second->record(simulation.getSimTime()-smsg->getCreationTime()); 00408 cPacket* cmsg = new cPacket("SVData"); 00409 bytesSent+=smsg->getBitLength()/8; 00410 cmd->setSendUnordered(cmd->getSendUnordered()); 00411 lastStream=(lastStream+1)%outboundStreams; 00412 cmd->setSid(lastStream); 00413 cmd->setLast(true); 00414 cmsg->encapsulate(smsg); 00415 cmsg->setKind(SCTP_C_SEND); 00416 cmsg->setControlInfo(cmd); 00417 packetsSent++; 00418 delete msg; 00419 sendOrSchedule(cmsg); 00420 } 00421 delete ind; 00422 break; 00423 } 00424 case SCTP_I_SHUTDOWN_RECEIVED: 00425 { 00426 SCTPCommand *command = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); 00427 id = command->getAssocId(); 00428 sctpEV3<<"server: SCTP_I_SHUTDOWN_RECEIVED for assoc "<<id<<"\n"; 00429 ServerAssocStatMap::iterator i=serverAssocStatMap.find(id); 00430 if (i->second.sentPackets == 0 || (long) par("numPacketsToSendPerClient")==0) 00431 { 00432 cPacket* cmsg = new cPacket("Request"); 00433 SCTPInfo* qinfo = new SCTPInfo("Info3"); 00434 cmsg->setKind(SCTP_C_NO_OUTSTANDING); 00435 qinfo->setAssocId(id); 00436 cmsg->setControlInfo(qinfo); 00437 sendOrSchedule(cmsg); 00438 i->second.stop = simulation.getSimTime(); 00439 i->second.lifeTime = i->second.stop - i->second.start; 00440 } 00441 delete command; 00442 shutdownReceived = true; 00443 delete msg; 00444 break; 00445 } 00446 case SCTP_I_CLOSED: 00447 if (delayTimer->isScheduled()) 00448 cancelEvent(delayTimer); 00449 if (finishEndsSimulation) { 00450 endSimulation(); 00451 } 00452 delete msg; 00453 break; 00454 default: delete msg; 00455 } 00456 } 00457 } 00458 00459 void SCTPServer::handleTimer(cMessage *msg) 00460 { 00461 cPacket* cmsg; 00462 SCTPCommand* cmd; 00463 int32 id; 00464 double tempInterval; 00465 00466 if (msg==delayTimer) 00467 { 00468 ServerAssocStatMap::iterator i=serverAssocStatMap.find(assocId); 00469 sctpEV3<<simulation.getSimTime()<<" delayTimer expired\n"; 00470 sendOrSchedule(makeDefaultReceive()); 00471 scheduleAt(simulation.getSimTime()+(double)par("readingInterval"), delayTimer); 00472 return; 00473 } 00474 else if (msg==delayFirstReadTimer) 00475 { 00476 delayFirstRead = 0; 00477 00478 if (readInt && !delayTimer->isScheduled()) 00479 { 00480 tempInterval = (double)par("readingInterval"); 00481 scheduleAt(simulation.getSimTime()+(simtime_t)tempInterval, delayTimer); 00482 scheduleAt(simulation.getSimTime()+(simtime_t)tempInterval, makeDefaultReceive()); 00483 } 00484 return; 00485 } 00486 00487 switch (msg->getKind()) 00488 { 00489 case SCTP_C_SEND: 00490 if (numRequestsToSend>0) 00491 { 00492 generateAndSend(); 00493 if ((simtime_t)par("thinkTime") > 0) 00494 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg); 00495 numRequestsToSend--; 00496 } 00497 break; 00498 case SCTP_I_ABORT: 00499 00500 cmsg = new cPacket("CLOSE", SCTP_C_CLOSE); 00501 cmd = new SCTPCommand("Send6"); 00502 id = atoi(msg->getName()); 00503 cmd->setAssocId(id); 00504 cmsg->setControlInfo(cmd); 00505 sendOrSchedule(cmsg); 00506 break; 00507 case SCTP_C_RECEIVE: 00508 sctpEV3<<simulation.getSimTime()<<" SCTPServer:SCTP_C_RECEIVE\n"; 00509 if (readInt || delayFirstRead > 0) 00510 schedule = false; 00511 else 00512 schedule = true; 00513 sendOrSchedule(PK(msg)); 00514 break; 00515 default: 00516 00517 sctpEV3<<"MsgKind ="<<msg->getKind()<<" unknown\n"; 00518 00519 break; 00520 } 00521 } 00522 00523 void SCTPServer::finish() 00524 { 00525 delete timeoutMsg; 00526 if (delayTimer->isScheduled()) 00527 cancelEvent(delayTimer); 00528 delete delayTimer; 00529 delete delayFirstReadTimer; 00530 00531 ev << getFullPath() << ": opened " << numSessions << " sessions\n"; 00532 ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n"; 00533 for (ServerAssocStatMap::iterator l=serverAssocStatMap.begin(); l!=serverAssocStatMap.end(); l++) 00534 { 00535 ev << getFullPath() << " Assoc: "<<l->first<<"\n"; 00536 ev << "\tstart time: "<<l->second.start <<"\n"; 00537 ev << "\tstop time: "<<l->second.stop <<"\n"; 00538 ev << "\tlife time: "<<l->second.lifeTime <<"\n"; 00539 ev << "\treceived bytes:" << l->second.rcvdBytes << "\n"; 00540 ev << "\tthroughput: "<<(l->second.rcvdBytes / l->second.lifeTime.dbl())*8 <<" bit/sec\n"; 00541 recordScalar("bytes rcvd", l->second.rcvdBytes); 00542 recordScalar("throughput", (l->second.rcvdBytes / l->second.lifeTime.dbl())*8); 00543 00544 } 00545 ev << getFullPath() << "Over all " << packetsRcvd << " packets received\n "; 00546 ev << getFullPath() << "Over all " << notifications << " notifications received\n "; 00547 00548 BytesPerAssoc::iterator j; 00549 while ((j = bytesPerAssoc.begin())!= bytesPerAssoc.end()) 00550 { 00551 delete j->second; 00552 bytesPerAssoc.erase(j); 00553 } 00554 EndToEndDelay::iterator k; 00555 while ((k = endToEndDelay.begin())!= endToEndDelay.end()) 00556 { 00557 delete k->second; 00558 endToEndDelay.erase(k); 00559 } 00560 serverAssocStatMap.clear(); 00561 sctpEV3<<"Server finished\n"; 00562 } 00563 00564 SCTPServer::~SCTPServer() 00565 { 00566 delete socket; 00567 }