|
INET Framework for OMNeT++/OMNEST
|
00001 // 00002 // Copyright (C) 2008 Irene Ruengeler 00003 // Copyright (C) 2009 Thomas Dreibholz 00004 // 00005 // This program is free software; you can redistribute it and/or 00006 // modify it under the terms of the GNU General Public License 00007 // as published by the Free Software Foundation; either version 2 00008 // of the License, or (at your option) any later version. 00009 // 00010 // This program is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 // GNU General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU General Public License 00016 // along with this program; if not, see <http://www.gnu.org/licenses/>. 00017 // 00018 00019 00020 #include "IPAddressResolver.h" 00021 #include "SCTPAssociation.h" 00022 #include "SCTPClient.h" 00023 00024 #define MSGKIND_CONNECT 0 00025 #define MSGKIND_SEND 1 00026 #define MSGKIND_ABORT 2 00027 #define MSGKIND_PRIMARY 3 00028 #define MSGKIND_STOP 5 00029 00030 00031 Define_Module(SCTPClient); 00032 00033 void SCTPClient::initialize() 00034 { 00035 const char * address; 00036 char* token; 00037 AddressVector addresses; 00038 sctpEV3<<"initialize SCTP Client\n"; 00039 numSessions = numBroken = packetsSent = packetsRcvd = bytesSent = echoedBytesSent = bytesRcvd = 0; 00040 WATCH(numSessions); 00041 WATCH(numBroken); 00042 WATCH(packetsSent); 00043 WATCH(packetsRcvd); 00044 WATCH(bytesSent); 00045 WATCH(bytesRcvd); 00046 // parameters 00047 address=par("address"); 00048 00049 token = strtok((char*)address,","); 00050 while (token != NULL) 00051 { 00052 addresses.push_back(IPvXAddress(token)); 00053 token = strtok(NULL, ","); 00054 } 00055 int32 port = par("port"); 00056 echoFactor = par("echoFactor"); 00057 if (!echoFactor) echoFactor = false; 00058 ordered = (bool)par("ordered"); 00059 finishEndsSimulation = (bool)par("finishEndsSimulation"); 00060 if (strcmp(address,"")==0) 00061 { 00062 socket.bind(port); 00063 } 00064 else 00065 { 00066 socket.bindx(addresses, port); 00067 } 00068 00069 socket.setCallbackObject(this); 00070 socket.setOutputGate(gate("sctpOut")); 00071 setStatusString("waiting"); 00072 00073 timeMsg = new cMessage("CliAppTimer"); 00074 numRequestsToSend = 0; 00075 numPacketsToReceive = 0; 00076 queueSize = par("queueSize"); 00077 WATCH(numRequestsToSend); 00078 recordScalar("ums", (uint32) par("requestLength")); 00079 timeMsg->setKind(MSGKIND_CONNECT); 00080 scheduleAt((simtime_t)par("startTime"), timeMsg); 00081 sendAllowed = true; 00082 bufferSize = 0; 00083 if ((simtime_t)par("stopTime")!=0) 00084 { 00085 stopTimer = new cMessage("StopTimer"); 00086 stopTimer->setKind(MSGKIND_STOP); 00087 scheduleAt((simtime_t)par("stopTime"), stopTimer); 00088 timer = true; 00089 } 00090 else 00091 { 00092 timer = false; 00093 stopTimer = NULL; 00094 } 00095 if ((simtime_t)par("primaryTime")!=0) 00096 { 00097 primaryChangeTimer = new cMessage("PrimaryTime"); 00098 primaryChangeTimer->setKind(MSGKIND_PRIMARY); 00099 scheduleAt((simtime_t)par("primaryTime"), primaryChangeTimer); 00100 } 00101 else 00102 { 00103 primaryChangeTimer = NULL; 00104 } 00105 } 00106 00107 void SCTPClient::handleMessage(cMessage *msg) 00108 { 00109 if (msg->isSelfMessage()) 00110 handleTimer(msg); 00111 else 00112 { 00113 socket.processMessage(PK(msg)); 00114 } 00115 } 00116 00117 void SCTPClient::connect() 00118 { 00119 const char *connectAddress = par("connectAddress"); 00120 int32 connectPort = par("connectPort"); 00121 inStreams = par("inboundStreams"); 00122 outStreams = par("outboundStreams"); 00123 socket.setInboundStreams(inStreams); 00124 socket.setOutboundStreams(outStreams); 00125 ev << "issuing OPEN command\n"; 00126 setStatusString("connecting"); 00127 ev<<"connect to address "<<connectAddress<<"\n"; 00128 socket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession")); 00129 numSessions++; 00130 } 00131 00132 void SCTPClient::close() 00133 { 00134 setStatusString("closing"); 00135 socket.close(); 00136 } 00137 00138 00139 void SCTPClient::setStatusString(const char *s) 00140 { 00141 if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s); 00142 } 00143 00144 void SCTPClient::socketEstablished(int32, void *, uint64 buffer ) 00145 { 00146 int32 count = 0; 00147 ev<<"SCTPClient: connected\n"; 00148 setStatusString("connected"); 00149 bufferSize = buffer; 00150 // determine number of requests in this session 00151 numRequestsToSend = (long) par("numRequestsPerSession"); 00152 numPacketsToReceive = (long) par("numPacketsToReceive"); 00153 if (numRequestsToSend<1) 00154 numRequestsToSend = 0; 00155 sctpEV3<<"SCTPClient:numRequestsToSend="<<numRequestsToSend<<"\n"; 00156 // perform first request (next one will be sent when reply arrives) 00157 if ((numRequestsToSend>0 && !timer) || timer) 00158 { 00159 if ((simtime_t)par("thinkTime") > 0) 00160 { 00161 if (sendAllowed) 00162 { 00163 sendRequest(); 00164 if (!timer) 00165 numRequestsToSend--; 00166 } 00167 timeMsg->setKind(MSGKIND_SEND); 00168 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg); 00169 } 00170 else 00171 { 00172 if (queueSize>0) 00173 { 00174 while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize*2 && sendAllowed) 00175 { 00176 if (count == queueSize*2) 00177 sendRequest(); 00178 else 00179 sendRequest(false); 00180 if (!timer) 00181 { 00182 if (--numRequestsToSend == 0) 00183 sendAllowed = false; 00184 } 00185 } 00186 if (((!timer && numRequestsToSend>0) || timer) && sendAllowed) 00187 sendQueueRequest(); 00188 } 00189 else 00190 { 00191 while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) || 00192 (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0)) 00193 { 00194 if (!timer && numRequestsToSend==1) 00195 sendRequest(true); 00196 else 00197 sendRequest(false); 00198 if (!timer && (--numRequestsToSend == 0)) 00199 sendAllowed = false; 00200 } 00201 } 00202 } 00203 if ((!timer && numPacketsToReceive == 0) && (simtime_t)par("waitToClose")>0) 00204 { 00205 timeMsg->setKind(MSGKIND_ABORT); 00206 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg); 00207 } 00208 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) 00209 { 00210 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n"; 00211 socket.shutdown(); 00212 if (timeMsg->isScheduled()) 00213 cancelEvent(timeMsg); 00214 if (finishEndsSimulation) { 00215 endSimulation(); 00216 } 00217 } 00218 } 00219 } 00220 00221 void SCTPClient::sendQueueRequest() 00222 { 00223 cPacket* cmsg = new cPacket("Queue"); 00224 SCTPInfo* qinfo = new SCTPInfo(); 00225 qinfo->setText(queueSize); 00226 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT); 00227 qinfo->setAssocId(socket.getConnectionId()); 00228 cmsg->setControlInfo(qinfo); 00229 sctpEV3 << "Sending queue request ..." << endl; 00230 socket.sendRequest(cmsg); 00231 } 00232 00233 void SCTPClient::sendRequestArrived() 00234 { 00235 int32 count = 0; 00236 00237 sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n"; 00238 while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize && sendAllowed) 00239 { 00240 if (count == queueSize) 00241 sendRequest(); 00242 else 00243 sendRequest(false); 00244 00245 if (!timer) 00246 numRequestsToSend--; 00247 if ((!timer && numRequestsToSend == 0)) 00248 { 00249 sctpEV3<<"no more packets to send, call shutdown\n"; 00250 socket.shutdown(); 00251 if (timeMsg->isScheduled()) 00252 cancelEvent(timeMsg); 00253 if (finishEndsSimulation) { 00254 endSimulation(); 00255 } 00256 } 00257 } 00258 } 00259 00260 void SCTPClient::socketDataArrived(int32, void *, cPacket *msg, bool) 00261 { 00262 packetsRcvd++; 00263 sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n"; 00264 SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->removeControlInfo()); 00265 bytesRcvd+=msg->getByteLength(); 00266 if (echoFactor > 0) 00267 { 00268 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup()); 00269 cPacket* cmsg = new cPacket("SVData"); 00270 echoedBytesSent+=smsg->getBitLength()/8; 00271 cmsg->encapsulate(smsg); 00272 if (ind->getSendUnordered()) 00273 cmsg->setKind(SCTP_C_SEND_UNORDERED); 00274 else 00275 cmsg->setKind(SCTP_C_SEND_ORDERED); 00276 packetsSent++; 00277 delete msg; 00278 socket.send(cmsg, 1); 00279 } 00280 if ((long)par("numPacketsToReceive")>0) 00281 { 00282 numPacketsToReceive--; 00283 if (numPacketsToReceive == 0) 00284 { 00285 close(); 00286 } 00287 } 00288 delete ind; 00289 } 00290 00291 00292 void SCTPClient::sendRequest(bool last) 00293 { 00294 uint32 i, sendBytes; 00295 00296 sendBytes = par("requestLength"); 00297 00298 00299 if (sendBytes < 1) 00300 sendBytes=1; 00301 cPacket* cmsg = new cPacket("AppData"); 00302 SCTPSimpleMessage* msg=new SCTPSimpleMessage("data"); 00303 00304 msg->setDataArraySize(sendBytes); 00305 for (i=0; i<sendBytes; i++) 00306 { 00307 msg->setData(i, 'a'); 00308 } 00309 msg->setDataLen(sendBytes); 00310 msg->setByteLength(sendBytes); 00311 msg->setCreationTime(simulation.getSimTime()); 00312 cmsg->encapsulate(msg); 00313 if (ordered) 00314 cmsg->setKind(SCTP_C_SEND_ORDERED); 00315 else 00316 cmsg->setKind(SCTP_C_SEND_UNORDERED); 00317 // send SCTPMessage with SCTPSimpleMessage enclosed 00318 sctpEV3 << "Sending request ..." << endl; 00319 bufferSize -= sendBytes; 00320 if (bufferSize < 0) 00321 last = true; 00322 socket.send(cmsg, last); 00323 bytesSent+=sendBytes; 00324 } 00325 00326 void SCTPClient::handleTimer(cMessage *msg) 00327 { 00328 00329 switch (msg->getKind()) 00330 { 00331 case MSGKIND_CONNECT: 00332 ev << "starting session call connect\n"; 00333 connect(); 00334 break; 00335 case MSGKIND_SEND: 00336 00337 if (((!timer && numRequestsToSend>0) || timer)) 00338 { 00339 if (sendAllowed) 00340 { 00341 sendRequest(); 00342 if (!timer) 00343 numRequestsToSend--; 00344 } 00345 if ((simtime_t)par("thinkTime") > 0) 00346 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg); 00347 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) 00348 { 00349 socket.shutdown(); 00350 if (timeMsg->isScheduled()) 00351 cancelEvent(timeMsg); 00352 if (finishEndsSimulation) { 00353 endSimulation(); 00354 } 00355 } 00356 } 00357 else if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) 00358 { 00359 socket.shutdown(); 00360 if (timeMsg->isScheduled()) 00361 cancelEvent(timeMsg); 00362 if (finishEndsSimulation) { 00363 endSimulation(); 00364 } 00365 } 00366 break; 00367 case MSGKIND_ABORT: 00368 close(); 00369 break; 00370 case MSGKIND_PRIMARY: 00371 setPrimaryPath((const char*)par("newPrimary")); 00372 break; 00373 case MSGKIND_STOP: 00374 numRequestsToSend=0; 00375 sendAllowed = false; 00376 socket.abort(); 00377 socket.close(); 00378 if (timeMsg->isScheduled()) 00379 cancelEvent(timeMsg); 00380 socket.close(); 00381 if (finishEndsSimulation) { 00382 endSimulation(); 00383 } 00384 break; 00385 default: 00386 ev<<"MsgKind ="<<msg->getKind()<<" unknown\n"; 00387 break; 00388 } 00389 } 00390 00391 00392 void SCTPClient::socketDataNotificationArrived(int32 connId, void *ptr, cPacket *msg) 00393 { 00394 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); 00395 cPacket* cmsg = new cPacket("CMSG-DataArr"); 00396 SCTPSendCommand *cmd = new SCTPSendCommand(); 00397 cmd->setAssocId(ind->getAssocId()); 00398 cmd->setSid(ind->getSid()); 00399 cmd->setNumMsgs(ind->getNumMsgs()); 00400 cmsg->setKind(SCTP_C_RECEIVE); 00401 cmsg->setControlInfo(cmd); 00402 delete ind; 00403 socket.sendNotification(cmsg); 00404 } 00405 00406 void SCTPClient::shutdownReceivedArrived(int32 connId) 00407 { 00408 if (numRequestsToSend==0) 00409 { 00410 cPacket* cmsg = new cPacket("Request"); 00411 SCTPInfo* qinfo = new SCTPInfo(); 00412 cmsg->setKind(SCTP_C_NO_OUTSTANDING); 00413 qinfo->setAssocId(connId); 00414 cmsg->setControlInfo(qinfo); 00415 socket.sendNotification(cmsg); 00416 } 00417 } 00418 00419 void SCTPClient::socketPeerClosed(int32, void *) 00420 { 00421 // close the connection (if not already closed) 00422 if (socket.getState()==SCTPSocket::PEER_CLOSED) 00423 { 00424 ev << "remote SCTP closed, closing here as well\n"; 00425 close(); 00426 } 00427 } 00428 00429 void SCTPClient::socketClosed(int32, void *) 00430 { 00431 // *redefine* to start another session etc. 00432 ev << "connection closed\n"; 00433 setStatusString("closed"); 00434 if (primaryChangeTimer) 00435 { 00436 cancelEvent(primaryChangeTimer); 00437 delete primaryChangeTimer; 00438 primaryChangeTimer = NULL; 00439 } 00440 } 00441 00442 void SCTPClient::socketFailure(int32, void *, int32 code) 00443 { 00444 // subclasses may override this function, and add code try to reconnect after a delay. 00445 ev << "connection broken\n"; 00446 setStatusString("broken"); 00447 numBroken++; 00448 // reconnect after a delay 00449 timeMsg->setKind(MSGKIND_CONNECT); 00450 scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg); 00451 } 00452 00453 void SCTPClient::socketStatusArrived(int32 assocId, void *yourPtr, SCTPStatusInfo *status) 00454 { 00455 struct pathStatus ps; 00456 SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId()); 00457 if (i!=sctpPathStatus.end()) 00458 { 00459 ps = i->second; 00460 ps.active=status->getActive(); 00461 } 00462 else 00463 { 00464 ps.active = status->getActive(); 00465 ps.pid = status->getPathId(); 00466 ps.primaryPath = false; 00467 sctpPathStatus[ps.pid]=ps; 00468 } 00469 } 00470 00471 void SCTPClient::setPrimaryPath (const char* str) 00472 { 00473 00474 cPacket* cmsg = new cPacket("CMSG-SetPrimary"); 00475 SCTPPathInfo *pinfo = new SCTPPathInfo(); 00476 if (strcmp(str,"")!=0) 00477 { 00478 pinfo->setRemoteAddress(IPvXAddress(str)); 00479 } 00480 else 00481 { 00482 str = (const char*)par("newPrimary"); 00483 if (strcmp(str, "")!=0) 00484 pinfo->setRemoteAddress(IPvXAddress(str)); 00485 else 00486 { 00487 str = (const char*)par("connectAddress"); 00488 pinfo->setRemoteAddress(IPvXAddress(str)); 00489 } 00490 } 00491 00492 pinfo->setAssocId(socket.getConnectionId()); 00493 cmsg->setKind(SCTP_C_PRIMARY); 00494 cmsg->setControlInfo(pinfo); 00495 socket.sendNotification(cmsg); 00496 } 00497 00498 00499 00500 00501 void SCTPClient::sendqueueFullArrived(int32 assocId) 00502 { 00503 sendAllowed = false; 00504 } 00505 00506 void SCTPClient::sendqueueAbatedArrived(int32 assocId, uint64 buffer) 00507 { 00508 bufferSize = buffer; 00509 sendAllowed = true; 00510 while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) || 00511 (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0)) 00512 { 00513 if (!timer && numRequestsToSend==1) 00514 sendRequest(true); 00515 else 00516 sendRequest(false); 00517 if (!timer && (--numRequestsToSend == 0)) 00518 sendAllowed = false; 00519 } 00520 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) 00521 { 00522 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n"; 00523 socket.shutdown(); 00524 if (timeMsg->isScheduled()) 00525 cancelEvent(timeMsg); 00526 if (finishEndsSimulation) { 00527 endSimulation(); 00528 } 00529 } 00530 } 00531 00532 void SCTPClient::addressAddedArrived(int32 assocId, IPvXAddress remoteAddr) 00533 { 00534 } 00535 00536 void SCTPClient::finish() 00537 { 00538 if (timeMsg->isScheduled()) 00539 cancelEvent(timeMsg); 00540 delete timeMsg; 00541 if (stopTimer) 00542 { 00543 cancelEvent(stopTimer); 00544 delete stopTimer; 00545 } 00546 if (primaryChangeTimer) 00547 { 00548 cancelEvent(primaryChangeTimer); 00549 delete primaryChangeTimer; 00550 primaryChangeTimer = NULL; 00551 } 00552 ev << getFullPath() << ": opened " << numSessions << " sessions\n"; 00553 ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n"; 00554 ev << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n"; 00555 sctpEV3<<"Client finished\n"; 00556 } 00557