|
INET Framework for OMNeT++/OMNEST
|
00001 // 00002 // Copyright (C) 2008 Irene Ruengeler 00003 // 00004 // This program is free software; you can redistribute it and/or 00005 // modify it under the terms of the GNU General Public License 00006 // as published by the Free Software Foundation; either version 2 00007 // of the License, or (at your option) any later version. 00008 // 00009 // This program is distributed in the hope that it will be useful, 00010 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00012 // GNU General Public License for more details. 00013 // 00014 // You should have received a copy of the GNU General Public License 00015 // along with this program; if not, see <http://www.gnu.org/licenses/>. 00016 // 00017 00018 00019 #include "SCTPPeer.h" 00020 #include "SCTPSocket.h" 00021 #include "SCTPCommand_m.h" 00022 #include "SCTPMessage_m.h" 00023 #include <stdlib.h> 00024 #include <stdio.h> 00025 #include "SCTPAssociation.h" 00026 #include "IPAddressResolver.h" 00027 00028 #define MSGKIND_CONNECT 0 00029 #define MSGKIND_SEND 1 00030 #define MSGKIND_ABORT 2 00031 #define MSGKIND_PRIMARY 3 00032 #define MSGKIND_STOP 5 00033 00034 Define_Module(SCTPPeer); 00035 00036 void SCTPPeer::initialize() 00037 { 00038 char * token; 00039 AddressVector addresses; 00040 00041 numSessions = packetsSent = packetsRcvd = bytesSent = notifications = 0; 00042 WATCH(numSessions); 00043 WATCH(packetsSent); 00044 WATCH(packetsRcvd); 00045 WATCH(bytesSent); 00046 WATCH(numRequestsToSend); 00047 // parameters 00048 const char* address = par("address"); 00049 00050 token = strtok((char*)address,","); 00051 while (token != NULL) 00052 { 00053 addresses.push_back(IPvXAddress(token)); 00054 token = strtok(NULL, ","); 00055 } 00056 int32 port = par("port"); 00057 echoFactor = par("echoFactor"); 00058 delay = par("echoDelay"); 00059 outboundStreams = par("outboundStreams"); 00060 ordered = (bool)par("ordered"); 00061 queueSize = par("queueSize"); 00062 lastStream = 0; 00063 timeoutMsg = new cMessage("SrvAppTimer"); 00064 SCTPSocket* socket = new SCTPSocket(); 00065 socket->setOutputGate(gate("sctpOut")); 00066 socket->setOutboundStreams(outboundStreams); 00067 if (strcmp(address,"")==0) 00068 { 00069 socket->bind(port); 00070 clientSocket.bind(port); 00071 } 00072 else 00073 { 00074 socket->bindx(addresses, port); 00075 clientSocket.bindx(addresses, port); 00076 } 00077 socket->listen(true, par("numPacketsToSendPerClient")); 00078 sctpEV3<<"SCTPPeer::initialized listen port="<<port<<"\n"; 00079 clientSocket.setCallbackObject(this); 00080 clientSocket.setOutputGate(gate("sctpOut")); 00081 00082 if ((simtime_t)par("startTime")>0) 00083 { 00084 connectTimer = new cMessage("ConnectTimer"); 00085 connectTimer->setKind(MSGKIND_CONNECT); 00086 scheduleAt((simtime_t)par("startTime"), connectTimer); 00087 } 00088 schedule = false; 00089 shutdownReceived = false; 00090 sendAllowed = true; 00091 } 00092 00093 void SCTPPeer::sendOrSchedule(cPacket *msg) 00094 { 00095 if (delay==0) 00096 { 00097 send(msg, "sctpOut"); 00098 } 00099 else 00100 { 00101 scheduleAt(simulation.getSimTime()+delay, msg); 00102 } 00103 } 00104 00105 void SCTPPeer::generateAndSend(SCTPConnectInfo *connectInfo) 00106 { 00107 uint32 numBytes; 00108 cPacket* cmsg = new cPacket("CMSG"); 00109 SCTPSimpleMessage* msg=new SCTPSimpleMessage("Server"); 00110 numBytes=(long)par("requestLength"); 00111 msg->setDataArraySize(numBytes); 00112 for (uint32 i=0; i<numBytes; i++) 00113 { 00114 msg->setData(i, 's'); 00115 } 00116 msg->setDataLen(numBytes); 00117 msg->setByteLength(numBytes); 00118 cmsg->encapsulate(msg); 00119 SCTPSendCommand *cmd = new SCTPSendCommand(); 00120 cmd->setAssocId(serverAssocId); 00121 if (ordered) 00122 cmd->setSendUnordered(COMPLETE_MESG_ORDERED); 00123 else 00124 cmd->setSendUnordered(COMPLETE_MESG_UNORDERED); 00125 lastStream=(lastStream+1)%outboundStreams; 00126 cmd->setSid(lastStream); 00127 cmd->setLast(true); 00128 cmsg->setKind(SCTP_C_SEND); 00129 cmsg->setControlInfo(cmd); 00130 packetsSent++; 00131 bytesSent+=msg->getBitLength()/8; 00132 sendOrSchedule(cmsg); 00133 } 00134 00135 void SCTPPeer::connect() 00136 { 00137 const char *connectAddress = par("connectAddress"); 00138 int32 connectPort = par("connectPort"); 00139 uint32 outStreams = par("outboundStreams"); 00140 clientSocket.setOutboundStreams(outStreams); 00141 00142 sctpEV3 << "issuing OPEN command\n"; 00143 sctpEV3<<"Assoc "<<clientSocket.getConnectionId()<<"::connect to address "<<connectAddress<<", port "<<connectPort<<"\n"; 00144 numSessions++; 00145 clientSocket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession")); 00146 00147 } 00148 00149 void SCTPPeer::handleMessage(cMessage *msg) 00150 { 00151 int32 id; 00152 00153 if (msg->isSelfMessage()) 00154 { 00155 00156 handleTimer(msg); 00157 } 00158 switch (msg->getKind()) 00159 { 00160 case SCTP_I_PEER_CLOSED: 00161 case SCTP_I_ABORT: 00162 { 00163 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->getControlInfo()->dup()); 00164 cPacket* cmsg = new cPacket("Notification"); 00165 SCTPSendCommand *cmd = new SCTPSendCommand(); 00166 id = ind->getAssocId(); 00167 cmd->setAssocId(id); 00168 cmd->setSid(ind->getSid()); 00169 cmd->setNumMsgs(ind->getNumMsgs()); 00170 cmsg->setControlInfo(cmd); 00171 delete ind; 00172 delete msg; 00173 cmsg->setKind(SCTP_C_ABORT); 00174 sendOrSchedule(cmsg); 00175 break; 00176 } 00177 case SCTP_I_ESTABLISHED: 00178 { 00179 if (clientSocket.getState()==SCTPSocket::CONNECTING) 00180 clientSocket.processMessage(PK(msg)); 00181 else 00182 { 00183 int32 count=0; 00184 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->removeControlInfo()); 00185 numSessions++; 00186 serverAssocId = connectInfo->getAssocId(); 00187 id = serverAssocId; 00188 outboundStreams = connectInfo->getOutboundStreams(); 00189 rcvdPacketsPerAssoc[serverAssocId]= (long) par("numPacketsToReceivePerClient"); 00190 sentPacketsPerAssoc[serverAssocId]= (long) par("numPacketsToSendPerClient"); 00191 char text[30]; 00192 sprintf(text, "App: Received Bytes of assoc %d",serverAssocId); 00193 bytesPerAssoc[serverAssocId] = new cOutVector(text); 00194 rcvdBytesPerAssoc[serverAssocId]= 0; 00195 sprintf(text, "App: EndToEndDelay of assoc %d",serverAssocId); 00196 endToEndDelay[serverAssocId] = new cOutVector(text); 00197 sprintf(text, "Hist: EndToEndDelay of assoc %d",serverAssocId); 00198 histEndToEndDelay[serverAssocId] = new cDoubleHistogram(text); 00199 00200 //delete connectInfo; 00201 delete msg; 00202 if ((long) par("numPacketsToSendPerClient") > 0) 00203 { 00204 SentPacketsPerAssoc::iterator i=sentPacketsPerAssoc.find(serverAssocId); 00205 numRequestsToSend = i->second; 00206 if ((simtime_t)par("thinkTime") > 0) 00207 { 00208 generateAndSend(connectInfo); 00209 timeoutMsg->setKind(SCTP_C_SEND); 00210 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg); 00211 numRequestsToSend--; 00212 i->second = numRequestsToSend; 00213 } 00214 else 00215 { 00216 if (queueSize==0) 00217 { 00218 while (numRequestsToSend > 0) 00219 { 00220 generateAndSend(connectInfo); 00221 numRequestsToSend--; 00222 i->second = numRequestsToSend; 00223 } 00224 } 00225 else if (queueSize>0) 00226 { 00227 while (numRequestsToSend > 0 && count++ < queueSize*2) 00228 { 00229 generateAndSend(connectInfo); 00230 numRequestsToSend--; 00231 i->second = numRequestsToSend; 00232 } 00233 00234 cPacket* cmsg = new cPacket("Queue"); 00235 SCTPInfo* qinfo = new SCTPInfo(); 00236 qinfo->setText(queueSize); 00237 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT); 00238 qinfo->setAssocId(id); 00239 cmsg->setControlInfo(qinfo); 00240 sendOrSchedule(cmsg); 00241 } 00242 00243 sctpEV3<<"!!!!!!!!!!!!!!!All data sent from Server !!!!!!!!!!\n"; 00244 00245 RcvdPacketsPerAssoc::iterator j=rcvdPacketsPerAssoc.find(serverAssocId); 00246 if (j->second == 0 && (simtime_t)par("waitToClose")>0) 00247 { 00248 char as[5]; 00249 sprintf(as, "%d",serverAssocId); 00250 cMessage* abortMsg = new cMessage(as); 00251 abortMsg->setKind(SCTP_I_ABORT); 00252 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), abortMsg); 00253 } 00254 else 00255 { 00256 sctpEV3<<"no more packets to send, call shutdown for assoc "<<serverAssocId<<"\n"; 00257 cPacket* cmsg = new cPacket("ShutdownRequest"); 00258 SCTPCommand* cmd = new SCTPCommand(); 00259 cmsg->setKind(SCTP_C_SHUTDOWN); 00260 cmd->setAssocId(serverAssocId); 00261 cmsg->setControlInfo(cmd); 00262 sendOrSchedule(cmsg); 00263 } 00264 } 00265 } 00266 } 00267 break; 00268 } 00269 case SCTP_I_DATA_NOTIFICATION: 00270 { 00271 notifications++; 00272 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); 00273 cPacket* cmsg = new cPacket("Notification"); 00274 SCTPSendCommand *cmd = new SCTPSendCommand(); 00275 id = ind->getAssocId(); 00276 cmd->setAssocId(id); 00277 cmd->setSid(ind->getSid()); 00278 cmd->setNumMsgs(ind->getNumMsgs()); 00279 cmsg->setKind(SCTP_C_RECEIVE); 00280 cmsg->setControlInfo(cmd); 00281 delete ind; 00282 delete msg; 00283 if (!cmsg->isScheduled() && schedule==false) 00284 { 00285 scheduleAt(simulation.getSimTime()+(simtime_t)par("delayFirstRead"), cmsg); 00286 } 00287 else if (schedule==true) 00288 sendOrSchedule(cmsg); 00289 break; 00290 } 00291 case SCTP_I_DATA: 00292 { 00293 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->getControlInfo()); 00294 id = ind->getAssocId(); 00295 RcvdBytesPerAssoc::iterator j=rcvdBytesPerAssoc.find(id); 00296 if (j==rcvdBytesPerAssoc.end() && (clientSocket.getState()==SCTPSocket::CONNECTED)) 00297 clientSocket.processMessage(PK(msg)); 00298 else 00299 { 00300 j->second+= PK(msg)->getByteLength(); 00301 BytesPerAssoc::iterator k=bytesPerAssoc.find(id); 00302 k->second->record(j->second); 00303 packetsRcvd++; 00304 if (echoFactor==0) 00305 { 00306 if ((long)par("numPacketsToReceivePerClient")>0) 00307 { 00308 RcvdPacketsPerAssoc::iterator i=rcvdPacketsPerAssoc.find(id); 00309 i->second--; 00310 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg); 00311 EndToEndDelay::iterator j=endToEndDelay.find(id); 00312 j->second->record(simulation.getSimTime()-smsg->getCreationTime()); 00313 HistEndToEndDelay::iterator k=histEndToEndDelay.find(id); 00314 k->second->collect(simulation.getSimTime()-smsg->getCreationTime()); 00315 if (i->second == 0) 00316 { 00317 cPacket* cmsg = new cPacket("Request"); 00318 SCTPInfo* qinfo = new SCTPInfo(); 00319 cmsg->setKind(SCTP_C_NO_OUTSTANDING); 00320 qinfo->setAssocId(id); 00321 cmsg->setControlInfo(qinfo); 00322 sendOrSchedule(cmsg); 00323 } 00324 } 00325 delete msg; 00326 } 00327 else 00328 { 00329 SCTPSendCommand *cmd = new SCTPSendCommand(); 00330 cmd->setAssocId(id); 00331 00332 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup()); 00333 EndToEndDelay::iterator j=endToEndDelay.find(id); 00334 j->second->record(simulation.getSimTime()-smsg->getCreationTime()); 00335 HistEndToEndDelay::iterator k=histEndToEndDelay.find(id); 00336 k->second->collect(simulation.getSimTime()-smsg->getCreationTime()); 00337 cPacket* cmsg = new cPacket("SVData"); 00338 bytesSent+=smsg->getByteLength(); 00339 cmd->setSendUnordered(cmd->getSendUnordered()); 00340 lastStream=(lastStream+1)%outboundStreams; 00341 cmd->setSid(lastStream); 00342 cmd->setLast(true); 00343 cmsg->encapsulate(smsg); 00344 cmsg->setKind(SCTP_C_SEND); 00345 cmsg->setControlInfo(cmd); 00346 packetsSent++; 00347 delete msg; 00348 sendOrSchedule(cmsg); 00349 } 00350 } 00351 00352 break; 00353 } 00354 case SCTP_I_SHUTDOWN_RECEIVED: 00355 { 00356 SCTPCommand *command = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); 00357 id = command->getAssocId(); 00358 sctpEV3<<"server: SCTP_I_SHUTDOWN_RECEIVED for assoc "<<id<<"\n"; 00359 RcvdPacketsPerAssoc::iterator i=rcvdPacketsPerAssoc.find(id); 00360 if (i==rcvdPacketsPerAssoc.end()&& (clientSocket.getState()==SCTPSocket::CONNECTED)) 00361 clientSocket.processMessage(PK(msg)); 00362 else 00363 { 00364 if (i->second == 0) 00365 { 00366 cPacket* cmsg = new cPacket("Request"); 00367 SCTPInfo* qinfo = new SCTPInfo(); 00368 cmsg->setKind(SCTP_C_NO_OUTSTANDING); 00369 qinfo->setAssocId(id); 00370 cmsg->setControlInfo(qinfo); 00371 sendOrSchedule(cmsg); 00372 } 00373 delete command; 00374 shutdownReceived = true; 00375 } 00376 delete msg; 00377 } 00378 case SCTP_I_CLOSED: delete msg; 00379 break; 00380 } 00381 00382 if (ev.isGUI()) 00383 { 00384 char buf[32]; 00385 RcvdBytesPerAssoc::iterator l=rcvdBytesPerAssoc.find(id); 00386 sprintf(buf, "rcvd: %ld bytes\nsent: %ld bytes", l->second, bytesSent); 00387 getDisplayString().setTagArg("t",0,buf); 00388 } 00389 } 00390 00391 void SCTPPeer::handleTimer(cMessage *msg) 00392 { 00393 cPacket* cmsg; 00394 SCTPCommand* cmd; 00395 int32 id; 00396 00397 00398 sctpEV3<<"SCTPPeer::handleTimer\n"; 00399 00400 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->getControlInfo()); 00401 switch (msg->getKind()) 00402 { 00403 case MSGKIND_CONNECT: 00404 sctpEV3 << "starting session call connect\n"; 00405 connect(); 00406 break; 00407 case SCTP_C_SEND: 00408 00409 if (numRequestsToSend>0) 00410 { 00411 generateAndSend(connectInfo); 00412 if ((simtime_t)par("thinkTime") > 0) 00413 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg); 00414 numRequestsToSend--; 00415 } 00416 break; 00417 case SCTP_I_ABORT: 00418 00419 cmsg = new cPacket("CLOSE", SCTP_C_CLOSE); 00420 cmd = new SCTPCommand(); 00421 id = atoi(msg->getName()); 00422 cmd->setAssocId(id); 00423 cmsg->setControlInfo(cmd); 00424 sendOrSchedule(cmsg); 00425 break; 00426 case SCTP_C_RECEIVE: 00427 schedule = true; 00428 sendOrSchedule(PK(msg)); 00429 break; 00430 default: 00431 00432 break; 00433 } 00434 } 00435 00436 void SCTPPeer::socketDataNotificationArrived(int32 connId, void *ptr, cPacket *msg) 00437 { 00438 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); 00439 cPacket* cmsg = new cPacket("CMSG"); 00440 SCTPSendCommand *cmd = new SCTPSendCommand(); 00441 cmd->setAssocId(ind->getAssocId()); 00442 cmd->setSid(ind->getSid()); 00443 cmd->setNumMsgs(ind->getNumMsgs()); 00444 cmsg->setKind(SCTP_C_RECEIVE); 00445 cmsg->setControlInfo(cmd); 00446 delete ind; 00447 clientSocket.sendNotification(cmsg); 00448 } 00449 00450 00451 void SCTPPeer::socketPeerClosed(int32, void *) 00452 { 00453 // close the connection (if not already closed) 00454 if (clientSocket.getState()==SCTPSocket::PEER_CLOSED) 00455 { 00456 ev << "remote SCTP closed, closing here as well\n"; 00457 setStatusString("closing"); 00458 clientSocket.close(); 00459 } 00460 } 00461 00462 void SCTPPeer::socketClosed(int32, void *) 00463 { 00464 // *redefine* to start another session etc. 00465 ev << "connection closed\n"; 00466 setStatusString("closed"); 00467 } 00468 00469 void SCTPPeer::socketFailure(int32, void *, int32 code) 00470 { 00471 // subclasses may override this function, and add code try to reconnect after a delay. 00472 ev << "connection broken\n"; 00473 setStatusString("broken"); 00474 // reconnect after a delay 00475 timeMsg->setKind(MSGKIND_CONNECT); 00476 scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg); 00477 } 00478 00479 void SCTPPeer::socketStatusArrived(int32 assocId, void *yourPtr, SCTPStatusInfo *status) 00480 { 00481 struct pathStatus ps; 00482 SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId()); 00483 if (i!=sctpPathStatus.end()) 00484 { 00485 ps = i->second; 00486 ps.active=status->getActive(); 00487 } 00488 else 00489 { 00490 ps.active = status->getActive(); 00491 ps.primaryPath = false; 00492 sctpPathStatus[ps.pid]=ps; 00493 } 00494 } 00495 00496 void SCTPPeer::setStatusString(const char *s) 00497 { 00498 if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s); 00499 } 00500 00501 void SCTPPeer::sendRequest(bool last) 00502 { 00503 sctpEV3 << "sending request, " << numRequestsToSend-1 << " more to go\n"; 00504 long numBytes = par("requestLength"); 00505 if (numBytes < 1) 00506 numBytes=1; 00507 00508 sctpEV3 << "SCTPClient: sending " << numBytes << " data bytes\n"; 00509 00510 cPacket* cmsg = new cPacket("AppData"); 00511 SCTPSimpleMessage* msg=new SCTPSimpleMessage("data"); 00512 00513 msg->setDataArraySize(numBytes); 00514 for (int32 i=0; i<numBytes; i++) 00515 { 00516 msg->setData(i, 'a'); 00517 } 00518 msg->setDataLen(numBytes); 00519 msg->setBitLength(numBytes * 8); 00520 msg->setCreationTime(simulation.getSimTime()); 00521 cmsg->encapsulate(msg); 00522 if (ordered) 00523 cmsg->setKind(SCTP_C_SEND_ORDERED); 00524 else 00525 cmsg->setKind(SCTP_C_SEND_UNORDERED); 00526 // send SCTPMessage with SCTPSimpleMessage enclosed 00527 clientSocket.send(cmsg, last); 00528 bytesSent+=numBytes; 00529 } 00530 00531 00532 void SCTPPeer::socketEstablished(int32, void *) 00533 { 00534 int32 count = 0; 00535 // *redefine* to perform or schedule first sending 00536 ev<<"SCTPClient: connected\n"; 00537 setStatusString("connected"); 00538 // determine number of requests in this session 00539 numRequestsToSend = (long) par("numRequestsPerSession"); 00540 numPacketsToReceive = (long) par("numPacketsToReceive"); 00541 if (numRequestsToSend<1) 00542 numRequestsToSend = 0; 00543 // perform first request (next one will be sent when reply arrives) 00544 if (numRequestsToSend>0) 00545 { 00546 if ((simtime_t)par("thinkTime") > 0) 00547 { 00548 if (sendAllowed) 00549 { 00550 sendRequest(); 00551 numRequestsToSend--; 00552 } 00553 timeMsg->setKind(MSGKIND_SEND); 00554 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg); 00555 00556 } 00557 else 00558 { 00559 if (queueSize>0) 00560 { 00561 while (numRequestsToSend > 0 && count++ < queueSize*2 && sendAllowed) 00562 { 00563 if (count == queueSize*2) 00564 sendRequest(); 00565 else 00566 sendRequest(false); 00567 numRequestsToSend--; 00568 } 00569 if (numRequestsToSend>0 && sendAllowed) 00570 sendQueueRequest(); 00571 } 00572 else 00573 { 00574 while (numRequestsToSend > 0 && sendAllowed) 00575 { 00576 sendRequest(); 00577 numRequestsToSend--; 00578 } 00579 } 00580 00581 if (numPacketsToReceive == 0 && (simtime_t)par("waitToClose")>0) 00582 { 00583 timeMsg->setKind(MSGKIND_ABORT); 00584 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg); 00585 } 00586 if (numRequestsToSend == 0 && (simtime_t)par("waitToClose")==0) 00587 { 00588 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n"; 00589 clientSocket.shutdown(); 00590 } 00591 } 00592 } 00593 } 00594 00595 void SCTPPeer::sendQueueRequest() 00596 { 00597 cPacket* cmsg = new cPacket("Queue"); 00598 SCTPInfo* qinfo = new SCTPInfo(); 00599 qinfo->setText(queueSize); 00600 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT); 00601 qinfo->setAssocId(clientSocket.getConnectionId()); 00602 cmsg->setControlInfo(qinfo); 00603 clientSocket.sendRequest(cmsg); 00604 00605 } 00606 00607 00608 void SCTPPeer::sendRequestArrived() 00609 { 00610 int32 count = 0; 00611 00612 sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n"; 00613 while (numRequestsToSend > 0 && count++ < queueSize && sendAllowed) 00614 { 00615 numRequestsToSend--; 00616 if (count == queueSize || numRequestsToSend==0) 00617 sendRequest(); 00618 else 00619 sendRequest(false); 00620 00621 if (numRequestsToSend == 0) 00622 { 00623 sctpEV3<<"no more packets to send, call shutdown\n"; 00624 clientSocket.shutdown(); 00625 } 00626 } 00627 00628 00629 } 00630 00631 void SCTPPeer::socketDataArrived(int32, void *, cPacket *msg, bool) 00632 { 00633 // *redefine* to perform or schedule next sending 00634 packetsRcvd++; 00635 00636 sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n"; 00637 00638 SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->getControlInfo()); 00639 00640 bytesRcvd+=msg->getByteLength(); 00641 00642 if (echoFactor > 0) 00643 { 00644 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup()); 00645 cPacket* cmsg = new cPacket("SVData"); 00646 echoedBytesSent+=smsg->getBitLength()/8; 00647 cmsg->encapsulate(smsg); 00648 if (ind->getSendUnordered()) 00649 cmsg->setKind(SCTP_C_SEND_UNORDERED); 00650 else 00651 cmsg->setKind(SCTP_C_SEND_ORDERED); 00652 packetsSent++; 00653 delete msg; 00654 clientSocket.send(cmsg,1); 00655 } 00656 if ((long)par("numPacketsToReceive")>0) 00657 { 00658 numPacketsToReceive--; 00659 if (numPacketsToReceive == 0) 00660 { 00661 setStatusString("closing"); 00662 clientSocket.close(); 00663 } 00664 } 00665 } 00666 00667 00668 00669 void SCTPPeer::shutdownReceivedArrived(int32 connId) 00670 { 00671 if (numRequestsToSend==0) 00672 { 00673 cPacket* cmsg = new cPacket("Request"); 00674 SCTPInfo* qinfo = new SCTPInfo(); 00675 cmsg->setKind(SCTP_C_NO_OUTSTANDING); 00676 qinfo->setAssocId(connId); 00677 cmsg->setControlInfo(qinfo); 00678 clientSocket.sendNotification(cmsg); 00679 } 00680 } 00681 00682 00683 00684 void SCTPPeer::sendqueueFullArrived(int32 assocId) 00685 { 00686 sendAllowed = false; 00687 } 00688 00689 00690 void SCTPPeer::finish() 00691 { 00692 delete timeoutMsg; 00693 delete connectTimer; 00694 ev << getFullPath() << ": opened " << numSessions << " sessions\n"; 00695 ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n"; 00696 for (RcvdBytesPerAssoc::iterator l=rcvdBytesPerAssoc.begin(); l!=rcvdBytesPerAssoc.end(); l++) 00697 { 00698 ev << getFullPath() << ": received " << l->second << " bytes in assoc " << l->first<< "\n"; 00699 } 00700 ev << getFullPath() << "Over all " << packetsRcvd << " packets received\n "; 00701 ev << getFullPath() << "Over all " << notifications << " notifications received\n "; 00702 for (BytesPerAssoc::iterator j = bytesPerAssoc.begin(); j!= bytesPerAssoc.end(); j++) 00703 { 00704 delete j->second; 00705 bytesPerAssoc.erase(j); 00706 } 00707 for (EndToEndDelay::iterator k = endToEndDelay.begin(); k!= endToEndDelay.end(); k++) 00708 { 00709 delete k->second; 00710 endToEndDelay.erase(k); 00711 } 00712 for (HistEndToEndDelay::iterator l = histEndToEndDelay.begin(); l!= histEndToEndDelay.end(); l++) 00713 { 00714 delete l->second; 00715 histEndToEndDelay.erase(l); 00716 } 00717 rcvdPacketsPerAssoc.clear(); 00718 sentPacketsPerAssoc.clear(); 00719 rcvdBytesPerAssoc.clear(); 00720 } 00721