|
INET Framework for OMNeT++/OMNEST
|
00001 // 00002 // Copyright 2004 Andras Varga 00003 // 00004 // This library is free software, you can redistribute it and/or modify 00005 // it under the terms of the GNU Lesser General Public License 00006 // as published by the Free Software Foundation; 00007 // either version 2 of the License, or any later version. 00008 // The library is distributed in the hope that it will be useful, 00009 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 00011 // See the GNU Lesser General Public License for more details. 00012 // 00013 00014 00015 #include "TCPSessionApp.h" 00016 #include "IPAddressResolver.h" 00017 00018 00019 Define_Module(TCPSessionApp); 00020 00021 00022 void TCPSessionApp::parseScript(const char *script) 00023 { 00024 const char *s = script; 00025 while (*s) 00026 { 00027 Command cmd; 00028 00029 // parse time 00030 while (isspace(*s)) s++; 00031 if (!*s || *s==';') break; 00032 const char *s0 = s; 00033 cmd.tSend = strtod(s,&const_cast<char *&>(s)); 00034 if (s==s0) 00035 throw cRuntimeError("syntax error in script: simulation time expected"); 00036 00037 // parse number of bytes 00038 while (isspace(*s)) s++; 00039 if (!isdigit(*s)) 00040 throw cRuntimeError("syntax error in script: number of bytes expected"); 00041 cmd.numBytes = atoi(s); 00042 while (isdigit(*s)) s++; 00043 00044 // add command 00045 commands.push_back(cmd); 00046 00047 // skip delimiter 00048 while (isspace(*s)) s++; 00049 if (!*s) break; 00050 if (*s!=';') 00051 throw cRuntimeError("syntax error in script: separator ';' missing"); 00052 s++; 00053 while (isspace(*s)) s++; 00054 } 00055 } 00056 00057 void TCPSessionApp::count(cMessage *msg) 00058 { 00059 if(msg->isPacket()) 00060 { 00061 if (msg->getKind()==TCP_I_DATA || msg->getKind()==TCP_I_URGENT_DATA) 00062 { 00063 packetsRcvd++; 00064 bytesRcvd+=PK(msg)->getByteLength(); 00065 } 00066 else 00067 { 00068 EV << "TCPSessionApp received unknown message (kind:" << msg->getKind() << ", name:" << msg->getName() << ")\n"; 00069 } 00070 } 00071 else 00072 { 00073 indicationsRcvd++; 00074 } 00075 } 00076 00077 void TCPSessionApp::waitUntil(simtime_t t) 00078 { 00079 if (simTime()>=t) 00080 return; 00081 00082 cMessage *timeoutMsg = new cMessage("timeout"); 00083 scheduleAt(t, timeoutMsg); 00084 cMessage *msg=NULL; 00085 while ((msg=receive())!=timeoutMsg) 00086 { 00087 count(msg); 00088 socket.processMessage(msg); 00089 } 00090 delete timeoutMsg; 00091 } 00092 00093 void TCPSessionApp::activity() 00094 { 00095 packetsRcvd = indicationsRcvd = 0; 00096 bytesRcvd = bytesSent = 0; 00097 WATCH(packetsRcvd); 00098 WATCH(bytesRcvd); 00099 WATCH(indicationsRcvd); 00100 00101 // parameters 00102 const char *address = par("address"); 00103 int port = par("port"); 00104 const char *connectAddress = par("connectAddress"); 00105 int connectPort = par("connectPort"); 00106 00107 bool active = par("active"); 00108 simtime_t tOpen = par("tOpen"); 00109 simtime_t tSend = par("tSend"); 00110 long sendBytes = par("sendBytes"); 00111 simtime_t tClose = par("tClose"); 00112 00113 const char *script = par("sendScript"); 00114 parseScript(script); 00115 if (sendBytes>0 && commands.size()>0) 00116 throw cRuntimeError("cannot use both sendScript and tSend+sendBytes"); 00117 00118 socket.setOutputGate(gate("tcpOut")); 00119 00120 // open 00121 waitUntil(tOpen); 00122 00123 socket.bind(*address ? IPvXAddress(address) : IPvXAddress(), port); 00124 00125 EV << "issuing OPEN command\n"; 00126 if (ev.isGUI()) getDisplayString().setTagArg("t",0, active?"connecting":"listening"); 00127 00128 if (active) 00129 socket.connect(IPAddressResolver().resolve(connectAddress), connectPort); 00130 else 00131 socket.listenOnce(); 00132 00133 // wait until connection gets established 00134 while (socket.getState()!=TCPSocket::CONNECTED) 00135 { 00136 socket.processMessage(receive()); 00137 if (socket.getState()==TCPSocket::SOCKERROR) 00138 return; 00139 } 00140 00141 EV << "connection established, starting sending\n"; 00142 if (ev.isGUI()) getDisplayString().setTagArg("t",0,"connected"); 00143 00144 // send 00145 if (sendBytes>0) 00146 { 00147 waitUntil(tSend); 00148 EV << "sending " << sendBytes << " bytes\n"; 00149 cPacket *msg = new cPacket("data1"); 00150 msg->setByteLength(sendBytes); 00151 bytesSent += sendBytes; 00152 socket.send(msg); 00153 } 00154 for (CommandVector::iterator i=commands.begin(); i!=commands.end(); ++i) 00155 { 00156 waitUntil(i->tSend); 00157 EV << "sending " << i->numBytes << " bytes\n"; 00158 cPacket *msg = new cPacket("data1"); 00159 msg->setByteLength(i->numBytes); 00160 bytesSent += i->numBytes; 00161 socket.send(msg); 00162 } 00163 00164 // close 00165 if (tClose>=0) 00166 { 00167 waitUntil(tClose); 00168 EV << "issuing CLOSE command\n"; 00169 if (ev.isGUI()) getDisplayString().setTagArg("t",0,"closing"); 00170 socket.close(); 00171 } 00172 00173 // wait until peer closes too and all data arrive 00174 for (;;) 00175 { 00176 cMessage *msg = receive(); 00177 count(msg); 00178 socket.processMessage(msg); 00179 } 00180 } 00181 00182 void TCPSessionApp::finish() 00183 { 00184 EV << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n"; 00185 recordScalar("bytesRcvd", bytesRcvd); 00186 recordScalar("bytesSent", bytesSent); 00187 }