00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include <stdlib.h>
00017 #include <iostream.h>
00018 #include <stdio.h>
00019 #include <pthread.h>
00020
00021 #if __SVR4 // Solaris
00022 #include <synch.h>
00023 typedef sema_t sem_t;
00024 #define sem_init(a,b,c) sema_init(a,b,c,NULL)
00025 #define sem_wait(a) sema_wait(a)
00026 #define sem_post(a) sema_post(a)
00027 #else
00028 #include <semaphore.h>
00029 #endif
00030
00031
00032 #include <timer.h>
00033
00034 typedef struct {
00035 int iterations;
00036 int self;
00037 } parameters;
00038
00039 typedef int bufObjType;
00040
00041 bufObjType *** bufObjs;
00042 sem_t bufferSemaphore[64];
00043 sem_t consumed[64];
00044
00045 int nthreads;
00046 int niterations = 16800;
00047 int bufferSize;
00048
00049 void * consumer (void * arg)
00050 {
00051 parameters * parms = (parameters *) arg;
00052
00053 for (int i = 0; i < niterations; i++) {
00054 sem_wait (&bufferSemaphore[parms->self]);
00055 for (int j = 0; j < bufferSize; j++) {
00056 delete bufObjs[parms->self][j];
00057 }
00058 sem_post (&consumed[parms->self]);
00059 }
00060 return NULL;
00061 }
00062
00063
00064 void * producer (void * arg)
00065 {
00066 for (int i = 0; i < niterations; i++) {
00067 for (int j = 0; j < nthreads; j++) {
00068 for (int k = 0; k < bufferSize; k++) {
00069 bufObjs[j][k] = new bufObjType;
00070 *bufObjs[j][k] = 12;
00071 }
00072 sem_post (&bufferSemaphore[j]);
00073 }
00074 for (int j = 0; j < nthreads; j++) {
00075 sem_wait (&consumed[j]);
00076 }
00077 }
00078 return NULL;
00079 }
00080
00081
00082 int main (int argc, char * argv[])
00083 {
00084 if (argc > 3) {
00085 nthreads = atoi(argv[1]);
00086 niterations = atoi(argv[2]);
00087 bufferSize = atoi(argv[3]);
00088 } else {
00089 cerr << "Usage: " << argv[0] << " nthreads niterations bufferSize" << endl;
00090 exit (1);
00091 }
00092
00093 if (nthreads > 64) {
00094 printf ("Error: The number of threads must be no more than 64.\n");
00095 exit(1);
00096 }
00097
00098 for (int i = 0; i < nthreads; i++) {
00099 sem_init (&bufferSemaphore[i], 0, 0);
00100 sem_init (&consumed[i], 0, 0);
00101 }
00102
00103 pthread_t consume[nthreads];
00104 pthread_t produce;
00105
00106 pthread_attr_t consume_attr[nthreads];
00107 pthread_attr_t produce_attr;
00108
00109 bufObjs = new bufObjType ** [nthreads];
00110
00111 for (int i = 0; i < nthreads; i++) {
00112 bufObjs[i] = new bufObjType * [bufferSize];
00113 pthread_attr_init (&consume_attr[i]);
00114 pthread_attr_setscope (&consume_attr[i], PTHREAD_SCOPE_SYSTEM);
00115 }
00116
00117 pthread_attr_init (&produce_attr);
00118 pthread_attr_setscope (&produce_attr, PTHREAD_SCOPE_SYSTEM);
00119
00120 Timer t;
00121 t.start();
00122
00123 for (int i = 0; i < nthreads; i++) {
00124 parameters * p = new parameters;
00125 p->self = i;
00126 pthread_create (&consume[i], &consume_attr[i], consumer, (void *) p);
00127 }
00128
00129 pthread_create (&produce, &produce_attr, producer, NULL);
00130
00131 void * result;
00132
00133 for (int i = 0; i < nthreads; i++) {
00134 pthread_join (consume[i], &result);
00135 }
00136 pthread_join (produce, &result);
00137
00138 t.stop();
00139 cout << "Time elapsed: " << (double) t << " seconds." << endl;
00140 }