00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef NDEBUG
00018
00019 #if 1
00020 #define PRINTF(x) printf(x)
00021 #define PRINTF1(x,y) printf(x,y)
00022 #define PRINTF2(x,y,z) printf(x,y,z)
00023 #define PRINTF3(x,y,z,a) printf(x,y,z,a)
00024 #else
00025 #include <windows.h>
00026 #define PRINTF(x) {char szMsg[255]; wsprintf(szMsg, x); MessageBox(NULL,szMsg,"rrconsume", MB_OK);}
00027 #define PRINTF1(x,y) {char szMsg[255]; wsprintf(szMsg, x, y); MessageBox(NULL,szMsg,"rrconsume", MB_OK);}
00028 #define PRINTF2(x,y,z) printf(x,y,z)
00029 #endif
00030
00031 #else
00032 #define PRINTF(x)
00033 #define PRINTF1(x,y)
00034 #define PRINTF2(x,y,z)
00035 #define PRINTF3(x,y,z,a)
00036 #endif
00037
00038 #define WIN32 1
00039 #undef linux
00040 #undef unix
00041
00042 #include <stdlib.h>
00043 #include <iostream.h>
00044 #include <stdio.h>
00045
00046 #if defined(__SVR4) // Solaris
00047 #define THREADMODEL 1
00048 #include <pthread.h>
00049 #include <synch.h>
00050 typedef sema_t sem_t;
00051 #define sem_init(a,b,c) sema_init(a,b,c,NULL)
00052 #define sem_wait(a) sema_wait(a)
00053 #define sem_post(a) sema_post(a)
00054 #endif
00055
00056
00057 #if defined(WIN32) // Windows
00058 #define THREADMODEL 1
00059 #include <windows.h>
00060 #include <process.h>
00061 typedef HANDLE sem_t;
00062 #define sem_init(a,b,c) (*(a) = CreateSemaphore(NULL, 0, 255, NULL))
00063 #define sem_wait(a) WaitForSingleObject(*(a), INFINITE)
00064 #define sem_post(a) ReleaseSemaphore(*(a), 1, NULL)
00065 typedef unsigned long pthread_attr_t;
00066 typedef HANDLE pthread_t;
00067 typedef DWORD WINAPI ThreadProc(
00068 LPVOID lpParameter
00069 );
00070 #define pthread_attr_init(pa)
00071 #define pthread_attr_setscope(pa,s)
00072 #define pthread_create(pt,pa,fn,arg) (*(pt) = CreateThread(NULL,0,(ThreadProc *) fn,arg,0,NULL))
00073 #define pthread_join(pt,res) WaitForSingleObject((pt), INFINITE)
00074
00075 extern "C" void * wsbrk (long);
00076 #endif
00077
00078
00079 #ifndef THREADMODEL
00080 #include <pthread.h>
00081 #include <semaphore.h>
00082 #endif
00083
00084
00085 class parameters {
00086 public:
00087 parameters (void)
00088 : iterations (0),
00089 self (-1)
00090 {}
00091
00092 parameters (int it, int tid)
00093 : iterations (it),
00094 self (tid)
00095 {}
00096
00097 int iterations;
00098 int self;
00099 };
00100
00101 typedef int bufObjType;
00102
00103 const int MAX_THREADS = 64;
00104
00105 bufObjType *** bufObjs;
00106 sem_t startConsumer[MAX_THREADS];
00107 sem_t startProducer[MAX_THREADS];
00108
00109 int nthreads = 2;
00110 int niterations = 1;
00111 int bufferSize = 1;
00112
00113 void * consumer (void * arg)
00114 {
00115 parameters * p = (parameters *) arg;
00116
00117
00118
00119 sem_wait (&startConsumer[p->self]);
00120 for (int j = 0; j < bufferSize; j++) {
00121 delete bufObjs[p->self][j];
00122 }
00123 PRINTF1 ("consumer %d done.\n", p->self);
00124 sem_post (&startProducer[p->self]);
00125 return NULL;
00126 }
00127
00128
00129 void * producer (void * arg)
00130 {
00131 parameters * p = (parameters *) arg;
00132
00133
00134
00135 sem_wait (&startProducer[p->self]);
00136 for (int k = 0; k < bufferSize; k++) {
00137 bufObjs[p->self][k] = new bufObjType;
00138 }
00139 PRINTF1 ("producer %d done.\n", p->self);
00140 sem_post (&startConsumer[(p->self + 1) % nthreads]);
00141 return NULL;
00142 }
00143
00144 extern "C" void malloc_stats (void);
00145
00146
00147 int main (int argc, char * argv[])
00148 {
00149 if (argc > 3) {
00150 nthreads = atoi(argv[1]);
00151 niterations = atoi(argv[2]);
00152 bufferSize = atoi(argv[3]);
00153 } else {
00154 cerr << "Usage: " << argv[0] << " nthreads niterations bufferSize" << endl;
00155 exit (1);
00156 }
00157
00158 if (nthreads > MAX_THREADS) {
00159 printf ("Error: The number of threads must be no more than %d.\n", MAX_THREADS);
00160 exit(1);
00161 }
00162
00163
00164
00165 for (int i = 0; i < nthreads; i++) {
00166 sem_init (&startConsumer[i], 0, 0);
00167 sem_init (&startProducer[i], 0, 0);
00168 }
00169
00170 pthread_t consume[nthreads];
00171 pthread_t produce[nthreads];
00172 parameters parms[nthreads];
00173
00174 pthread_attr_t consume_attr[nthreads];
00175 pthread_attr_t produce_attr[nthreads];
00176
00177 bufObjs = new bufObjType ** [nthreads];
00178 for (int bo = 0; bo < nthreads; bo++) {
00179 bufObjs[bo] = new bufObjType * [bufferSize];
00180 }
00181
00182
00183
00184
00185
00186 for (int i = 0; i < nthreads; i++) {
00187 pthread_attr_init (&consume_attr[i]);
00188 pthread_attr_setscope (&consume_attr[i], PTHREAD_SCOPE_SYSTEM);
00189 pthread_attr_init (&produce_attr[i]);
00190 pthread_attr_setscope (&produce_attr[i], PTHREAD_SCOPE_SYSTEM);
00191 }
00192
00193 malloc_stats();
00194
00195 #if defined(SBRK)
00196 int initialBreak = (int) SBRK(0);
00197 cout << "Current break value = " << initialBreak << endl;
00198 #endif
00199
00200
00201
00202 for (int it = 0; it < niterations; it++) {
00203 for (int i = 0; i < nthreads; i++) {
00204 parms[i] = parameters (it, i);
00205 pthread_create (&produce[i], &produce_attr[i], producer, (void *) &parms[i]);
00206 pthread_create (&consume[i], &consume_attr[i], consumer, (void *) &parms[i]);
00207 }
00208
00209
00210 sem_post (&startProducer[0]);
00211
00212
00213 void * result;
00214 for (int i = 0; i < nthreads; i++) {
00215 pthread_join (produce[i], &result);
00216 pthread_join (consume[i], &result);
00217 }
00218 }
00219
00220 malloc_stats();
00221
00222 #if defined(SBRK)
00223 cout << "Memory consumed (via sbrk) = " << ((int) SBRK(0)) - initialBreak << endl;
00224 #endif
00225 }