00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef _GLIBCXX_VECTOR
00018 #include <vector>
00019 #endif
00020
00021 #ifndef _GLIBCXX_STRING
00022 #include <string>
00023 #endif
00024
00025 #ifndef _GLIBCXX_IOSTREAM
00026 #include <iostream>
00027 #endif
00028
00029 #ifndef _GLIBCXX_SSTREAM
00030 #include <sstream>
00031 #endif
00032
00033 #ifndef _TIME_H
00034 #include <time.h>
00035 #endif
00036
00037 #ifndef _SYS_TIMEB_H
00038 #include <sys/timeb.h>
00039 #endif
00040
00041 #ifndef MESSAGECONTROLLER_H
00042 #include "MessageController.h"
00043 #endif
00044
00045 #ifndef DISPATCHER_H
00046 #include "Dispatcher.h"
00047 #endif
00048
00049 #ifndef FILEREADER_H
00050 #include "FileReader.h"
00051 #endif
00052
00053 #ifndef DBUSCONNECTION_H
00054 #include "DbusConnection.h"
00055 #endif
00056
00057
00058 #ifndef GLOBALS_H
00059 #include "Globals.h"
00060 #endif
00061
00062 #ifndef _GLIBCXX_STDEXCEPT
00063 #include <stdexcept>
00064 #endif
00065
00066 long time_millisec()
00067 {
00068 long tm = 0;
00069 struct timeb *t = new struct timeb;
00070 if ( ftime(t) == -1 )
00071 cout << "Error getting time." << endl;
00072 tm = 0;
00073 tm += ((long)t->time) * (long)1000;
00074 tm += t->millitm;
00075 delete t;
00076 return tm;
00077 }
00078
00079 void* msgcontrollerThread(void *args)
00080 {
00081 MessageController *myc = (MessageController*) args;
00082
00083 double lasttime = time_millisec();
00084
00085 bool done;
00086 double nextmessagetime = 0;
00087 double newtime;
00088 string temp;
00089 double messagetimestamp;
00090
00091 while( myc->msgcontrollerthread_run ) {
00092 done = false;
00093
00094
00095 newtime = time_millisec();
00096 myc->currenttime += ((newtime - lasttime) * myc->timeflowfactor);
00097 lasttime = newtime;
00098
00099 if ( pthread_mutex_trylock( &(myc->mutex) ) != 0 )
00100 pthread_cond_wait( &(myc->cond) , &(myc->mutex) );
00101
00102
00103 if( !myc->msgqueue->empty() && (nextmessagetime <= myc->currenttime/1000) ) {
00104
00105 temp = *(myc->msgqueue->back()->at(1));
00106 messagetimestamp = atof( temp.c_str() );
00107
00108 #if STRICT
00109
00110 assert( !(messagetimestamp == 0 && temp.at(0) != '0') );
00111 #endif
00112
00113 if ( messagetimestamp <= myc->currenttime/1000 ) {
00114
00115 myc->disp->proccessMessageFromMC( myc->msgqueue->back() );
00116 myc->msgqueue->pop_back();
00117
00118 } else {
00119 nextmessagetime = messagetimestamp;
00120 done = true;
00121 }
00122 } else
00123 done = true;
00124
00125
00126 pthread_mutex_unlock( &(myc->mutex) );
00127 pthread_cond_signal( &(myc->cond) );
00128
00129 if ( done )
00130 usleep(50);
00131 }
00132
00133 cout << "Messagecontroller thread exit." << endl;
00134 int status = 0;
00135 pthread_exit(&status);
00136 }
00137
00138 MessageController::MessageController(Dispatcher* dp) {
00139 comm = NULL;
00140 disp = dp;
00141 msgqueue = shared_ptr< list< shared_ptr< vector< shared_ptr<string> > > > >( new list< shared_ptr< vector< shared_ptr<string> > > >() );
00142
00143 pthread_mutex_init(&mutex, NULL);
00144 pthread_cond_init(&cond, NULL);
00145
00146 currenttime = 0;
00147 timeflowfactor = 0;
00148
00149 msgcontrollerthread_run = true;
00150 pthread_create(&msgcontrollerthread, NULL, msgcontrollerThread, (void*) this);
00151 }
00152
00153 MessageController::~MessageController() {
00154 msgcontrollerthread_run = false;
00155
00156 if ( comm != NULL )
00157 delete comm;
00158
00159 pthread_join( msgcontrollerthread, NULL);
00160 }
00161
00162 bool MessageController::processMessageFromComm(shared_ptr< vector< shared_ptr< string > > > messagebody) {
00163
00164
00165 string type = *(messagebody->at(0));
00166 string ts = *(messagebody->at(1));
00167
00168 double msg_timestamp = atof( ts.c_str() );
00169
00170 #if STRICT
00171
00172 assert( !(msg_timestamp == 0 && ts.at(0) != '0') );
00173 #endif
00174
00175
00176 if ( type == "BEGIN" ) {
00177 timeflowfactor = INITIAL_TIMESCALE;
00178 } else {
00179 if ( pthread_mutex_trylock( &mutex ) != 0 )
00180 pthread_cond_wait( &cond , &mutex );
00181
00182 if ( !msgqueue->empty() ) {
00183 bool done = false;
00184 list< shared_ptr< vector< shared_ptr< string> > > >::iterator iter;
00185 iter = msgqueue->begin();
00186 while( !(iter == msgqueue->end() || done) )
00187 {
00188 double queue_msg_timestamp = atof ( (*((*iter)->at(1))).c_str() );
00189
00190 if ( msg_timestamp >= queue_msg_timestamp ) {
00191 msgqueue->insert(iter, messagebody);
00192 done = true;
00193 }
00194 iter++;
00195 }
00196 if (!done) {
00197
00198 msgqueue->push_back(messagebody);
00199 }
00200 } else {
00201
00202 msgqueue->push_front(messagebody);
00203 }
00204
00205
00206 pthread_mutex_unlock(&mutex);
00207 pthread_cond_signal(&cond);
00208 }
00209
00210 return true;
00211 }
00212
00213 bool MessageController::sendMessageToComm(shared_ptr< vector< shared_ptr< string > > > messagebody) {
00214
00215 assert (comm != NULL);
00216
00217
00218 stringstream s;
00219 s << currenttime / 1000;
00220
00221 shared_ptr< string > timestamp( new string( s.str() ) );
00222
00223 try{
00224 messagebody->at(1) = timestamp;
00225 } catch ( out_of_range ) {
00226 cout << "Warning: (MessageController::sendMessageToComm) message malformed: Message too short. Aborting send." << endl;
00227 return false;
00228 }
00229
00230
00231 return comm->sendMessage(messagebody);
00232 }
00233
00234 bool MessageController::communicationMode( int cMode , string filename) {
00235
00236 comm = new FileReader( filename , this );;
00237 cmode = cMode;
00238
00239 return true;
00240 }
00241
00242 bool MessageController::communicationMode( int cMode , string thisname , string thatname ) {
00243
00244 comm = new DbusConnection( thisname , thatname , this );
00245 cmode = cMode;
00246
00247 return true;
00248 }
00249
00250 bool MessageController::setTimeFlow(double ntimeflowfactor) {
00251 timeflowfactor = ntimeflowfactor;
00252
00253 return true;
00254 }