Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

src/MessageController.cpp

Go to the documentation of this file.
00001 /*
00002 
00003 Traplas visualisation.
00004 
00005 Copyright (C) 2006 Herbert de Vos & Willem Drost
00006 
00007 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
00008 
00009 This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
00010 
00011 You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00012 
00013 (For full Licence see ../GPL-Licence.txt)
00014 
00015 */
00016 
00017 #ifndef _GLIBCXX_VECTOR
00018 #include <vector>
00019 #endif
00020 
00021 #ifndef _GLIBCXX_STRING
00022 #include <string>
00023 #endif
00024 
00025 #ifndef _GLIBCXX_IOSTREAM
00026 #include <iostream>
00027 #endif
00028 
00029 #ifndef _GLIBCXX_SSTREAM
00030 #include <sstream>
00031 #endif
00032 
00033 #ifndef _TIME_H
00034 #include <time.h>
00035 #endif
00036 
00037 #ifndef _SYS_TIMEB_H
00038 #include <sys/timeb.h>
00039 #endif
00040 
00041 #ifndef MESSAGECONTROLLER_H
00042 #include "MessageController.h"
00043 #endif
00044 
00045 #ifndef DISPATCHER_H
00046 #include "Dispatcher.h"
00047 #endif
00048 
00049 #ifndef FILEREADER_H
00050 #include "FileReader.h"
00051 #endif
00052 
00053 #ifndef DBUSCONNECTION_H
00054 #include "DbusConnection.h"
00055 #endif
00056 
00057 
00058 #ifndef GLOBALS_H
00059 #include "Globals.h"
00060 #endif
00061 
00062 #ifndef _GLIBCXX_STDEXCEPT
00063 #include <stdexcept>
00064 #endif
00065 
00066 long time_millisec()
00067 {
00068         long tm = 0;
00069         struct timeb *t = new struct timeb;
00070         if ( ftime(t) == -1 )
00071                         cout << "Error getting time." << endl;
00072         tm = 0;
00073         tm += ((long)t->time) * (long)1000;
00074         tm += t->millitm;
00075         delete t;
00076         return tm;
00077 }
00078 
00079 void* msgcontrollerThread(void *args)
00080 {
00081         MessageController *myc = (MessageController*) args;
00082 
00083         double lasttime = time_millisec();
00084         
00085         bool done;
00086         double nextmessagetime = 0;
00087         double newtime;
00088         string temp;
00089         double messagetimestamp;
00090 
00091         while( myc->msgcontrollerthread_run ) {
00092                 done = false;
00093                 
00094                 //      Update time according to the timeflowfactor.
00095                 newtime = time_millisec();
00096                 myc->currenttime += ((newtime - lasttime) * myc->timeflowfactor);
00097                 lasttime = newtime;
00098                 
00099                 if ( pthread_mutex_trylock( &(myc->mutex) ) != 0 ) 
00100                         pthread_cond_wait( &(myc->cond) , &(myc->mutex) );
00101                                 
00102                 //      Check if there are messages to be sent.
00103                 if( !myc->msgqueue->empty() && (nextmessagetime <= myc->currenttime/1000) ) {
00104                         //      Decide if message from the back of the queue must be send. (back of the queue have the lowest timestamp!)
00105                         temp = *(myc->msgqueue->back()->at(1));
00106                         messagetimestamp = atof( temp.c_str() );
00107                         
00108                         #if STRICT
00109                         //Check if conversion has worked.
00110                         assert( !(messagetimestamp == 0 && temp.at(0) != '0') );
00111                         #endif
00112                         
00113                         if ( messagetimestamp <= myc->currenttime/1000 ) {
00114                                 
00115                                 myc->disp->proccessMessageFromMC( myc->msgqueue->back() );
00116                                 myc->msgqueue->pop_back();
00117 
00118                         } else {
00119                                 nextmessagetime = messagetimestamp;
00120                                 done = true;
00121                         }
00122                 } else
00123                         done = true;
00124 
00125                 //      Release mutex, signal.
00126                 pthread_mutex_unlock( &(myc->mutex) );
00127                 pthread_cond_signal( &(myc->cond) );
00128                 //      And sleep.
00129                 if ( done )
00130                         usleep(50);     // Nothing more to be done at this time, go sleep.
00131         }
00132 
00133         cout << "Messagecontroller thread exit." << endl;
00134         int status = 0;
00135         pthread_exit(&status);
00136 }
00137 
00138 MessageController::MessageController(Dispatcher* dp) {
00139         comm = NULL;
00140         disp = dp;
00141         msgqueue = shared_ptr< list< shared_ptr< vector< shared_ptr<string> > > > >( new list< shared_ptr< vector< shared_ptr<string> > > >() );
00142 
00143         pthread_mutex_init(&mutex, NULL);       
00144         pthread_cond_init(&cond, NULL);
00145 
00146         currenttime = 0;
00147         timeflowfactor = 0;
00148 
00149         msgcontrollerthread_run = true;
00150         pthread_create(&msgcontrollerthread, NULL, msgcontrollerThread, (void*) this);
00151 }
00152 
00153 MessageController::~MessageController() {
00154         msgcontrollerthread_run = false;        //      Signal the msgcontrollerthread to stop.
00155 
00156         if ( comm != NULL )
00157                 delete comm;
00158 
00159         pthread_join( msgcontrollerthread, NULL);
00160 }
00161 
00162 bool MessageController::processMessageFromComm(shared_ptr< vector< shared_ptr< string > > > messagebody) {
00163         //      Adds the message to the queue at the correct place so the queue remains ordered by timestamp. The first element of the queue is the element with the highest timestamp and the last is with the lowest timestamp. This is because it is impossible to insert while using a reverse_iterator.
00164 
00165         string type = *(messagebody->at(0));
00166         string ts = *(messagebody->at(1));
00167         
00168         double msg_timestamp = atof( ts.c_str() );
00169         
00170         #if STRICT
00171         //      Check if conversion has worked.
00172         assert( !(msg_timestamp == 0 && ts.at(0) != '0') );     
00173         #endif
00174         
00175         //      Check the type of the message and take appropriate action.
00176         if ( type == "BEGIN" ) {                                                //      BEGIN message
00177                 timeflowfactor = INITIAL_TIMESCALE;
00178         } else {                                                                                //      Put message in the queue.
00179                 if ( pthread_mutex_trylock( &mutex ) != 0 )
00180                         pthread_cond_wait( &cond , &mutex );
00181 
00182                 if ( !msgqueue->empty() ) {
00183                         bool done = false;
00184                         list< shared_ptr< vector< shared_ptr< string> > > >::iterator iter;
00185                         iter = msgqueue->begin();
00186                         while( !(iter == msgqueue->end() || done) )
00187                         {
00188                                 double queue_msg_timestamp = atof ( (*((*iter)->at(1))).c_str() );
00189         
00190                                 if ( msg_timestamp >= queue_msg_timestamp ) {
00191                                         msgqueue->insert(iter, messagebody);
00192                                         done = true;
00193                                 }
00194                                 iter++;
00195                         }
00196                         if (!done) {
00197                                 //      Message timestamp is the lowest thusfar.
00198                                 msgqueue->push_back(messagebody);
00199                         }
00200                 } else {
00201                         //      Queue is empty.
00202                         msgqueue->push_front(messagebody);
00203                 }
00204         
00205                 //      Release mutex and signal.
00206                 pthread_mutex_unlock(&mutex);
00207                 pthread_cond_signal(&cond);
00208         }
00209 
00210         return true;
00211 }
00212 
00213 bool MessageController::sendMessageToComm(shared_ptr< vector< shared_ptr< string > > > messagebody) {
00214         //      Verify that a communication-object exists.
00215         assert (comm != NULL);
00216         
00217         //      Add timestamp to the message.
00218         stringstream s;
00219         s << currenttime / 1000;
00220 
00221         shared_ptr< string > timestamp( new string( s.str() ) );
00222 
00223         try{
00224                 messagebody->at(1) = timestamp;
00225         } catch ( out_of_range ) {
00226                 cout << "Warning: (MessageController::sendMessageToComm) message malformed: Message too short. Aborting send." << endl;
00227                 return false;
00228         }
00229 
00230         //      Send the message.
00231         return comm->sendMessage(messagebody);
00232 }
00233 
00234 bool MessageController::communicationMode( int cMode , string filename) {
00235         // We are going to read from file.
00236         comm = new FileReader( filename , this );;
00237         cmode = cMode;
00238 
00239         return true;
00240 }
00241 
00242 bool MessageController::communicationMode( int cMode , string thisname , string thatname ) {
00243         // Using Dbus.
00244         comm = new DbusConnection( thisname , thatname , this );
00245         cmode = cMode;
00246 
00247         return true;
00248 }
00249 
00250 bool MessageController::setTimeFlow(double ntimeflowfactor) {
00251         timeflowfactor = ntimeflowfactor;
00252         
00253         return true;
00254 }

Generated on Mon Jun 19 10:22:04 2006 for TraplasVisualisation by  doxygen 1.4.4