|
INET Framework for OMNeT++/OMNEST
|
00001 00002 #include <omnetpp.h> 00003 #include "WeightedFairQueue.h" 00004 00005 00006 Define_Module(WeightedFairQueue); 00007 00008 WeightedFairQueue::WeightedFairQueue() 00009 { 00010 numQueues = 256; 00011 holdCapacity = 1024; 00012 cdt = 256; 00013 currentHold = 0; 00014 queues = NULL; 00015 } 00016 00017 WeightedFairQueue::WeightedFairQueue(int c_hold, int c_flows, int c_cdt) 00018 { 00019 numQueues = c_flows; 00020 holdCapacity = c_hold; 00021 cdt = c_cdt; 00022 currentHold = 0; 00023 queues = NULL; 00024 } 00025 00026 WeightedFairQueue::~WeightedFairQueue() 00027 { 00028 for (int i=0; i<numQueues; i++) 00029 delete queues[i]; 00030 delete [] queues; 00031 } 00032 00033 00034 void WeightedFairQueue::initialize() 00035 { 00036 PassiveQueueBase::initialize(); 00037 00038 outGate = gate("out"); 00039 00040 // configuration 00041 00042 00043 queues = new WFQ::QueueRecord *[numQueues]; 00044 for (int i=0; i<numQueues; i++) 00045 { 00046 queues[i] = new WFQ::QueueRecord(); 00047 } 00048 00049 00050 } 00051 00052 bool WeightedFairQueue::enqueue(cMessage *msg) 00053 { 00054 WFQ::Flow packetInfo; 00055 00056 00057 if(currentHold >= holdCapacity) 00058 { 00059 delete msg; 00060 return true; 00061 } 00062 00063 cMessage *copy = msg->dup(); 00064 00065 if (dynamic_cast<IPDatagram *>(copy)) 00066 { 00067 // IPv4 QoS: map DSCP to queue number 00068 IPDatagram *datagram = (IPDatagram *)copy; 00069 00070 packetInfo.srcAddress = datagram->getSrcAddress(); 00071 packetInfo.destAddress = datagram->getDestAddress(); 00072 packetInfo.transportProtocol = datagram->getTransportProtocol(); 00073 packetInfo.diffServCodePoint = (unsigned char) datagram->getDiffServCodePoint()/32; 00074 if(packetInfo.transportProtocol == IP_PROT_TCP) 00075 { 00076 TCPSegment *tcpSegm = dynamic_cast<TCPSegment *> (datagram->decapsulate()); 00077 packetInfo.srcPort = tcpSegm->getSrcPort(); 00078 packetInfo.destPort = tcpSegm->getDestPort(); 00079 delete tcpSegm; 00080 } 00081 else if(packetInfo.transportProtocol == IP_PROT_UDP) 00082 { 00083 UDPPacket *udpDatag = dynamic_cast<UDPPacket *> (datagram->decapsulate()); 00084 packetInfo.srcPort = udpDatag->getSourcePort(); 00085 packetInfo.destPort = udpDatag->getDestinationPort(); 00086 delete udpDatag; 00087 } 00088 else 00089 { 00090 packetInfo.srcPort = 0; 00091 packetInfo.destPort = 0; 00092 } 00093 00094 00095 } 00096 else 00097 { 00098 packetInfo.srcAddress = IPAddress::UNSPECIFIED_ADDRESS; 00099 packetInfo.destAddress = IPAddress::UNSPECIFIED_ADDRESS; 00100 packetInfo.transportProtocol = 0; 00101 packetInfo.diffServCodePoint = 0; 00102 packetInfo.srcPort = 0; 00103 packetInfo.destPort = 0; 00104 } 00105 00106 delete copy; 00107 00108 if(!putIntoQueue(packetInfo, msg)) 00109 { 00110 delete msg; 00111 return true; 00112 } 00113 00114 return false; 00115 } 00116 00117 cMessage *WeightedFairQueue::dequeue() 00118 { 00119 if(currentHold <= 0) 00120 { 00121 return NULL; 00122 } 00123 00124 00125 int index = 0; 00126 while (queues[index]->isQueueEmpty()) 00127 ++index; 00128 00129 int sn = queues[index]->getLowestSn(); 00130 00131 for (int i=0; i<numQueues; i++) 00132 { 00133 if(!queues[i]->isQueueEmpty() && queues[i]->getLowestSn() < sn ) 00134 { 00135 sn = queues[i]->getLowestSn(); 00136 index = i; 00137 } 00138 } 00139 00140 00141 --currentHold; 00142 cMessage *msg = queues[index]->dequeue(); 00143 00144 00145 std::cout << "---- Dequeuing ---- " << std::endl; 00146 std::cout << "Queue ID: " << index << " Queue Length: " << queues[index]->getQueueLength() << " SN: " << sn << std::endl; 00147 std::cout << "Hold Queue Length: " << currentHold << std::endl << std::endl; 00148 00149 if(currentHold > 0) 00150 { 00151 index = 0; 00152 while (queues[index]->isQueueEmpty()) 00153 ++index; 00154 sn = queues[index]->getLowestSn(); 00155 00156 for (int i=0; i<numQueues; i++) 00157 { 00158 if(!queues[i]->isQueueEmpty() && queues[i]->getLowestSn() < sn ) 00159 { 00160 sn = queues[i]->getLowestSn(); 00161 } 00162 } 00163 00164 for (int i=0; i<numQueues; i++) 00165 { 00166 queues[i]->substractSnFromAll(sn); 00167 00168 } 00169 } 00170 00171 00172 return msg; 00173 } 00174 00175 void WeightedFairQueue::sendOut(cMessage *msg) 00176 { 00177 send(msg, outGate); 00178 } 00179 00180 bool WeightedFairQueue::putIntoQueue(const WFQ::Flow& packetInfo, cMessage * msg) 00181 { 00182 int index = findQueueIndex(packetInfo); 00183 if(index >= 0 && queues[index]->getQueueLength() < cdt) 00184 { 00185 if(queues[index]->isQueueEmpty()) 00186 { 00187 queues[index]->setQueueID(packetInfo); 00188 queues[index]->setLastSn(0); 00189 } 00190 00191 cPacket *pkt = dynamic_cast<cPacket *> (msg->dup()); 00192 int sn = queues[index]->getLastSn() + ((32384 / (packetInfo.diffServCodePoint + 1)) * pkt->getByteLength()); 00193 WFQ::SNPacket newSNPacket(msg, sn); 00194 00195 00196 delete pkt; 00197 00198 queues[index]->enqueue(newSNPacket); 00199 queues[index]->setLastSn(sn); 00200 ++currentHold; 00201 00202 std::cout << "---- Enqueuing ---- " << std::endl; 00203 std::cout << "Flow = sIP:" << packetInfo.srcAddress.str(); 00204 std::cout << " dIP:" << packetInfo.destAddress.str(); 00205 std::cout << " Pre:" << packetInfo.diffServCodePoint; 00206 std::cout << " Pro:" << packetInfo.transportProtocol; 00207 std::cout << " sP:" << packetInfo.srcPort; 00208 std::cout << " dP:" << packetInfo.destPort << std::endl; 00209 std::cout << "Queue ID: " << index << " Queue Length: " << queues[index]->getQueueLength() << " SN: " << sn << std::endl << std::endl; 00210 00211 return true; 00212 } 00213 00214 return false; 00215 } 00216 00217 int WeightedFairQueue::findQueueIndex(const WFQ::Flow& packetInfo) 00218 { 00219 int empty = -1; 00220 00221 for (int i=0; i<numQueues; i++) 00222 { 00223 if(empty == -1) 00224 { 00225 if(queues[i]->isQueueEmpty()) 00226 empty = i; 00227 } 00228 00229 if(queues[i]->isFromThisFlow(packetInfo)) 00230 return i; 00231 } 00232 00233 return empty; 00234 00235 } 00236 00237 00238 bool WFQ::Flow::operator==(const Flow& flow) const 00239 { 00240 if(srcAddress != flow.srcAddress) 00241 return false; 00242 if(destAddress != flow.destAddress) 00243 return false; 00244 if(transportProtocol != flow.transportProtocol) 00245 return false; 00246 if(diffServCodePoint != flow.diffServCodePoint) 00247 return false; 00248 if(srcPort != flow.srcPort) 00249 return false; 00250 if(destPort != flow.destPort) 00251 return false; 00252 00253 return true; 00254 } 00255 00256 00257 WFQ::QueueRecord::~QueueRecord() 00258 { 00259 if(!isQueueEmpty()) 00260 { 00261 for (std::vector<SNPacket>::iterator it = snPackets.begin(); it!=snPackets.end(); ++it) 00262 { 00263 delete it->getMsg(); 00264 } 00265 } 00266 } 00267 00268 bool WFQ::QueueRecord::isQueueEmpty() 00269 { 00270 if(getQueueLength() > 0) 00271 return false; 00272 00273 return true; 00274 } 00275 00276 00277 00278 bool WFQ::QueueRecord::isFromThisFlow(const Flow& flow) 00279 { 00280 if(!isQueueEmpty() && queueID == flow) 00281 return true; 00282 00283 return false; 00284 } 00285 00286 00287 void WFQ::QueueRecord::setQueueID(const Flow& packetInfo) 00288 { 00289 queueID.srcAddress = packetInfo.srcAddress; 00290 queueID.destAddress = packetInfo.destAddress; 00291 queueID.transportProtocol = packetInfo.transportProtocol; 00292 queueID.diffServCodePoint = packetInfo.diffServCodePoint; 00293 queueID.srcPort = packetInfo.srcPort; 00294 queueID.destPort = packetInfo.destPort; 00295 } 00296 00297 void WFQ::QueueRecord::enqueue(SNPacket& newSNPacket) 00298 { 00299 setLastSn(newSNPacket.getSn()); 00300 snPackets.push_back(newSNPacket); 00301 } 00302 00303 void WFQ::QueueRecord::substractSnFromAll(int subNum) 00304 { 00305 if(!isQueueEmpty()) 00306 { 00307 for (std::vector<SNPacket>::iterator it = snPackets.begin(); it!=snPackets.end(); ++it) 00308 { 00309 it->substractSn(subNum); 00310 } 00311 00312 lastSn -= subNum; 00313 } 00314 } 00315 00316 cMessage *WFQ::QueueRecord::dequeue() 00317 { 00318 cMessage *msg = (snPackets.front()).getMsg(); 00319 snPackets.erase(snPackets.begin()); 00320 00321 return msg; 00322 } 00323