INET Framework for OMNeT++/OMNEST
WeightedFairQueue.cc
Go to the documentation of this file.
00001 
00002 #include <omnetpp.h>
00003 #include "WeightedFairQueue.h"
00004 
00005 
00006 Define_Module(WeightedFairQueue);
00007 
00008 WeightedFairQueue::WeightedFairQueue()
00009 {
00010     numQueues = 256;
00011     holdCapacity = 1024;
00012     cdt = 256;
00013     currentHold = 0;
00014     queues = NULL;
00015 }
00016 
00017 WeightedFairQueue::WeightedFairQueue(int c_hold, int c_flows, int c_cdt)
00018 {
00019     numQueues = c_flows;
00020     holdCapacity = c_hold;
00021     cdt = c_cdt;
00022     currentHold = 0;
00023     queues = NULL;
00024 }
00025 
00026 WeightedFairQueue::~WeightedFairQueue()
00027 {
00028     for (int i=0; i<numQueues; i++)
00029         delete queues[i];
00030     delete [] queues;
00031 }
00032 
00033 
00034 void WeightedFairQueue::initialize()
00035 {
00036     PassiveQueueBase::initialize();
00037 
00038     outGate = gate("out");
00039 
00040     // configuration
00041 
00042     
00043     queues = new WFQ::QueueRecord *[numQueues];
00044     for (int i=0; i<numQueues; i++)
00045     {
00046         queues[i] = new WFQ::QueueRecord();
00047     }
00048 
00049     
00050 }
00051 
00052 bool WeightedFairQueue::enqueue(cMessage *msg)
00053 {
00054     WFQ::Flow packetInfo;
00055     
00056 
00057     if(currentHold >= holdCapacity)
00058     {
00059         delete msg;
00060         return true;
00061     }
00062     
00063     cMessage *copy = msg->dup();
00064 
00065     if (dynamic_cast<IPDatagram *>(copy))
00066     {
00067         // IPv4 QoS: map DSCP to queue number
00068         IPDatagram *datagram = (IPDatagram *)copy;
00069         
00070         packetInfo.srcAddress = datagram->getSrcAddress();
00071         packetInfo.destAddress = datagram->getDestAddress();
00072         packetInfo.transportProtocol = datagram->getTransportProtocol();
00073         packetInfo.diffServCodePoint = (unsigned char) datagram->getDiffServCodePoint()/32;
00074         if(packetInfo.transportProtocol == IP_PROT_TCP)
00075         {
00076             TCPSegment *tcpSegm = dynamic_cast<TCPSegment *> (datagram->decapsulate());
00077             packetInfo.srcPort = tcpSegm->getSrcPort();
00078             packetInfo.destPort = tcpSegm->getDestPort();
00079             delete tcpSegm;
00080         }
00081         else if(packetInfo.transportProtocol == IP_PROT_UDP)
00082         {
00083             UDPPacket *udpDatag  = dynamic_cast<UDPPacket *> (datagram->decapsulate());
00084             packetInfo.srcPort = udpDatag->getSourcePort();
00085             packetInfo.destPort = udpDatag->getDestinationPort();
00086             delete udpDatag;
00087         }
00088         else
00089         {
00090             packetInfo.srcPort = 0;
00091             packetInfo.destPort = 0;
00092         }
00093         
00094         
00095     }
00096     else
00097     {
00098         packetInfo.srcAddress = IPAddress::UNSPECIFIED_ADDRESS;
00099         packetInfo.destAddress = IPAddress::UNSPECIFIED_ADDRESS;
00100         packetInfo.transportProtocol = 0;
00101         packetInfo.diffServCodePoint = 0;
00102         packetInfo.srcPort = 0;
00103         packetInfo.destPort = 0;
00104     }
00105     
00106     delete copy;
00107     
00108     if(!putIntoQueue(packetInfo, msg))
00109     {
00110         delete msg;
00111         return true;
00112     }
00113 
00114     return false;
00115 }
00116 
00117 cMessage *WeightedFairQueue::dequeue()
00118 {
00119     if(currentHold <= 0)
00120     {
00121         return NULL;
00122     }
00123     
00124     
00125     int index = 0;
00126     while (queues[index]->isQueueEmpty())
00127       ++index;
00128     
00129     int sn = queues[index]->getLowestSn();
00130     
00131     for (int i=0; i<numQueues; i++)
00132     {
00133         if(!queues[i]->isQueueEmpty() && queues[i]->getLowestSn() < sn )
00134         {
00135             sn = queues[i]->getLowestSn();
00136             index = i;
00137         }
00138     }
00139     
00140     
00141     --currentHold;
00142     cMessage *msg = queues[index]->dequeue();
00143     
00144     
00145     std::cout << "---- Dequeuing ---- " << std::endl;
00146     std::cout << "Queue ID: " << index << " Queue Length: " << queues[index]->getQueueLength() << " SN: " << sn << std::endl;
00147     std::cout << "Hold Queue Length: " << currentHold << std::endl << std::endl;
00148     
00149     if(currentHold > 0)
00150     {
00151       index = 0;
00152       while (queues[index]->isQueueEmpty())
00153         ++index;
00154       sn = queues[index]->getLowestSn();
00155     
00156       for (int i=0; i<numQueues; i++)
00157       {
00158         if(!queues[i]->isQueueEmpty() && queues[i]->getLowestSn() < sn )
00159         {
00160             sn = queues[i]->getLowestSn();
00161         }
00162       }
00163     
00164       for (int i=0; i<numQueues; i++)
00165       {
00166         queues[i]->substractSnFromAll(sn);
00167 
00168       }
00169     }
00170     
00171     
00172     return msg;
00173 }
00174 
00175 void WeightedFairQueue::sendOut(cMessage *msg)
00176 {
00177     send(msg, outGate);
00178 }
00179 
00180 bool WeightedFairQueue::putIntoQueue(const WFQ::Flow& packetInfo, cMessage * msg)
00181 {
00182     int index = findQueueIndex(packetInfo);
00183     if(index >= 0 && queues[index]->getQueueLength() < cdt)
00184     {
00185         if(queues[index]->isQueueEmpty())
00186         {
00187             queues[index]->setQueueID(packetInfo);
00188             queues[index]->setLastSn(0);
00189         }
00190 
00191         cPacket *pkt = dynamic_cast<cPacket *> (msg->dup());
00192         int sn = queues[index]->getLastSn() + ((32384 / (packetInfo.diffServCodePoint + 1)) * pkt->getByteLength());
00193         WFQ::SNPacket newSNPacket(msg, sn);
00194         
00195         
00196         delete pkt;
00197 
00198         queues[index]->enqueue(newSNPacket);
00199         queues[index]->setLastSn(sn);
00200         ++currentHold;
00201         
00202         std::cout << "---- Enqueuing ---- " << std::endl;
00203         std::cout << "Flow = sIP:" << packetInfo.srcAddress.str();
00204         std::cout << " dIP:" << packetInfo.destAddress.str();
00205         std::cout << " Pre:" << packetInfo.diffServCodePoint;
00206         std::cout << " Pro:" << packetInfo.transportProtocol;
00207         std::cout << " sP:" << packetInfo.srcPort;
00208         std::cout << " dP:" << packetInfo.destPort << std::endl;
00209         std::cout << "Queue ID: " << index << " Queue Length: " << queues[index]->getQueueLength() << " SN: " << sn << std::endl << std::endl;
00210         
00211         return true;
00212     }
00213 
00214     return false;
00215 }
00216 
00217 int WeightedFairQueue::findQueueIndex(const WFQ::Flow& packetInfo)
00218 {
00219     int empty = -1;
00220     
00221     for (int i=0; i<numQueues; i++)
00222     {
00223         if(empty == -1)
00224         {
00225             if(queues[i]->isQueueEmpty())
00226                 empty = i;
00227         }
00228         
00229         if(queues[i]->isFromThisFlow(packetInfo))
00230             return i;
00231     }
00232     
00233     return empty;
00234 
00235 }
00236 
00237 
00238 bool WFQ::Flow::operator==(const Flow& flow) const
00239 {
00240     if(srcAddress != flow.srcAddress)
00241         return false;
00242     if(destAddress != flow.destAddress)
00243         return false;
00244     if(transportProtocol != flow.transportProtocol)
00245         return false;
00246     if(diffServCodePoint != flow.diffServCodePoint)
00247         return false;
00248     if(srcPort != flow.srcPort)
00249         return false;
00250     if(destPort != flow.destPort)
00251         return false;
00252     
00253     return true;
00254 }
00255 
00256 
00257 WFQ::QueueRecord::~QueueRecord()
00258 {
00259     if(!isQueueEmpty())
00260     {
00261         for (std::vector<SNPacket>::iterator it = snPackets.begin(); it!=snPackets.end(); ++it)
00262         {
00263             delete it->getMsg();
00264         }
00265     }    
00266 }
00267 
00268 bool WFQ::QueueRecord::isQueueEmpty()
00269 {
00270     if(getQueueLength() > 0)
00271         return false;
00272 
00273     return true;
00274 }
00275 
00276 
00277 
00278 bool WFQ::QueueRecord::isFromThisFlow(const Flow& flow)
00279 {
00280     if(!isQueueEmpty() && queueID == flow)
00281         return true;
00282 
00283     return false;
00284 }
00285 
00286 
00287 void WFQ::QueueRecord::setQueueID(const Flow& packetInfo)
00288 {
00289     queueID.srcAddress = packetInfo.srcAddress;
00290     queueID.destAddress = packetInfo.destAddress;
00291     queueID.transportProtocol = packetInfo.transportProtocol;
00292     queueID.diffServCodePoint = packetInfo.diffServCodePoint;
00293     queueID.srcPort = packetInfo.srcPort;
00294     queueID.destPort = packetInfo.destPort;
00295 }
00296 
00297 void WFQ::QueueRecord::enqueue(SNPacket& newSNPacket)
00298 {
00299     setLastSn(newSNPacket.getSn());
00300     snPackets.push_back(newSNPacket);
00301 }
00302 
00303 void WFQ::QueueRecord::substractSnFromAll(int subNum)
00304 {
00305     if(!isQueueEmpty())
00306     {
00307         for (std::vector<SNPacket>::iterator it = snPackets.begin(); it!=snPackets.end(); ++it)
00308         {
00309             it->substractSn(subNum);
00310         }
00311         
00312         lastSn -= subNum;
00313     }
00314 }
00315 
00316 cMessage *WFQ::QueueRecord::dequeue()
00317 {
00318     cMessage *msg = (snPackets.front()).getMsg();
00319     snPackets.erase(snPackets.begin());
00320     
00321     return msg;
00322 }
00323