00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 #ifndef STHREAD_H
00087 #define STHREAD_H
00088
00089 #include "w_defines.h"
00090 #include "w_rc.h"
00091 #include "atomic_templates.h"
00092 #include "w_strstream.h"
00093 #include "stime.h"
00094 #include "gethrtime.h"
00095 #include <vtable.h>
00096 #include <w_list.h>
00097
00098
00099 #include <w_pthread.h>
00100 #include <sthread_stats.h>
00101
00102 class sthread_t;
00103 class smthread_t;
00104
00105
00106 #ifdef __GNUC__
00107 #pragma interface
00108 #endif
00109
00110 #ifndef SDISK_H
00111 #include <sdisk.h>
00112 #endif
00113
00114 class vtable_row_t;
00115 class vtable_t;
00116
00117 struct sthread_core_t;
00118
00119 extern "C" void dumpthreads();
00120
00121
00122
00123
00124 class sthread_base_t : public w_base_t {
00125 public:
00126
00127 typedef unsigned int w_thread_id_t;
00128 typedef w_thread_id_t id_t;
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158 enum timeout_t {
00159 WAIT_IMMEDIATE = 0,
00160 WAIT_FOREVER = -1,
00161 WAIT_SPECIFIED_BY_THREAD = -4,
00162 WAIT_SPECIFIED_BY_XCT = -5,
00163 WAIT_NOT_USED = -6
00164 };
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175 typedef int4_t timeout_in_ms;
00176
00177
00178 static const w_error_t::info_t error_info[];
00179 static void init_errorcodes();
00180
00181 #include "st_error_enum_gen.h"
00182
00183 enum {
00184 stOS = fcOS,
00185 stINTERNAL = fcINTERNAL,
00186 stNOTIMPLEMENTED = fcNOTIMPLEMENTED
00187 };
00188
00189
00190 typedef sdisk_base_t::fileoff_t fileoff_t;
00191 typedef sdisk_base_t::filestat_t filestat_t;
00192 typedef sdisk_base_t::iovec_t iovec_t;
00193
00194
00195
00196 enum { iovec_max = 8 };
00197
00198 enum {
00199 OPEN_RDWR = sdisk_base_t::OPEN_RDWR,
00200 OPEN_RDONLY = sdisk_base_t::OPEN_RDONLY,
00201 OPEN_WRONLY = sdisk_base_t::OPEN_WRONLY,
00202
00203 OPEN_SYNC = sdisk_base_t::OPEN_SYNC,
00204 OPEN_TRUNC = sdisk_base_t::OPEN_TRUNC,
00205 OPEN_CREATE = sdisk_base_t::OPEN_CREATE,
00206 OPEN_EXCL = sdisk_base_t::OPEN_EXCL,
00207 OPEN_APPEND = sdisk_base_t::OPEN_APPEND,
00208 OPEN_RAW = sdisk_base_t::OPEN_RAW
00209 };
00210 enum {
00211 SEEK_AT_SET = sdisk_base_t::SEEK_AT_SET,
00212 SEEK_AT_CUR = sdisk_base_t::SEEK_AT_CUR,
00213 SEEK_AT_END = sdisk_base_t::SEEK_AT_END
00214 };
00215
00216 };
00217
00218
00219 class sthread_name_t {
00220 public:
00221 enum { NAME_ARRAY = 64 };
00222
00223 char _name[NAME_ARRAY];
00224
00225 sthread_name_t();
00226 ~sthread_name_t();
00227
00228 void rename(const char *n1, const char *n2=0, const char *n3=0);
00229 };
00230
00231 class sthread_named_base_t: public sthread_base_t
00232 {
00233 public:
00234 NORET sthread_named_base_t(
00235 const char* n1 = 0,
00236 const char* n2 = 0,
00237 const char* n3 = 0);
00238 NORET ~sthread_named_base_t();
00239
00240 void rename(
00241 const char* n1,
00242 const char* n2 = 0,
00243 const char* n3 = 0);
00244
00245 const char* name() const;
00246 void unname();
00247
00248 private:
00249 sthread_name_t _name;
00250 };
00251
00252 inline NORET
00253 sthread_named_base_t::sthread_named_base_t(
00254 const char* n1,
00255 const char* n2,
00256 const char* n3)
00257 {
00258 rename(n1, n2, n3);
00259
00260 }
00261
00262 inline const char*
00263 sthread_named_base_t::name() const
00264 {
00265 return _name._name;
00266 }
00267
00268 class sthread_main_t;
00269
00270
00271
00272
00273
00274
00275
00276 class ThreadFunc
00277 {
00278 public:
00279 virtual void operator()(const sthread_t& thread) = 0;
00280 virtual NORET ~ThreadFunc() {}
00281 };
00282
00283
00284 class sthread_init_t;
00285 class sthread_main_t;
00286
00287
00288
00289 #include "os_interface.h"
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305 struct tatas_lock {
00306
00307 enum { NOBODY=0 };
00308 typedef union {
00309 pthread_t handle;
00310 #undef CASFUNC
00311 #if SIZEOF_PTHREAD_T==4
00312 #define CASFUNC atomic_cas_32
00313 unsigned int bits;
00314 #elif SIZEOF_PTHREAD_T==8
00315 # define CASFUNC atomic_cas_64
00316 uint64_t bits;
00317 #elif SIZEOF_PTHREAD_T==0
00318 #error Configuration could not determine size of pthread_t. Fix configure.ac.
00319 #else
00320 #error Configuration determined size of pthread_t is unexpected. Fix sthread.h.
00321 #endif
00322 } holder_type_t;
00323 volatile holder_type_t _holder;
00324
00325
00326 tatas_lock() { _holder.bits=NOBODY; }
00327
00328 private:
00329
00330
00331 void spin() { while(*&(_holder.handle)) ; }
00332
00333 public:
00334
00335 bool try_lock()
00336 {
00337 holder_type_t tid = { pthread_self() };
00338 bool success = false;
00339 unsigned int old_holder =
00340 CASFUNC(&_holder.bits, NOBODY, tid.bits);
00341 if(old_holder == NOBODY) {
00342 membar_enter();
00343 success = true;
00344 }
00345
00346 return success;
00347 }
00348
00349
00350 void acquire() {
00351 w_assert1(!is_mine());
00352 holder_type_t tid = { pthread_self() };
00353 do {
00354 spin();
00355 }
00356 while(CASFUNC(&_holder.bits, NOBODY, tid.bits));
00357 membar_enter();
00358 w_assert1(is_mine());
00359 }
00360
00361
00362 void release() {
00363 membar_exit();
00364 w_assert1(is_mine());
00365 _holder.bits= NOBODY;
00366 #if W_DEBUG_LEVEL > 0
00367 {
00368 membar_enter();
00369 w_assert1(!is_mine());
00370 }
00371 #endif
00372 }
00373
00374
00375 bool is_mine() const { return
00376 pthread_equal(_holder.handle, pthread_self()) ? true : false; }
00377 #undef CASFUNC
00378 };
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416 struct w_pthread_lock_t
00417 {
00418
00419 struct ext_qnode {
00420 w_pthread_lock_t* _held;
00421 };
00422 #define PTHREAD_EXT_QNODE_INITIALIZER { NULL }
00423 #define PTHREAD_EXT_QNODE_INITIALIZE(x) (x)._held = NULL
00424
00425 typedef ext_qnode volatile* ext_qnode_ptr;
00426
00427
00428 private:
00429 pthread_mutex_t _mutex;
00430
00431 w_pthread_lock_t * _holder;
00432
00433 public:
00434 w_pthread_lock_t() :_holder(0) { pthread_mutex_init(&_mutex, 0); }
00435
00436 ~w_pthread_lock_t() { w_assert1(!_holder); pthread_mutex_destroy(&_mutex);}
00437
00438
00439 bool attempt(ext_qnode* me) {
00440 if(attempt( *me)) {
00441 me->_held = this;
00442 _holder = this;
00443 return true;
00444 }
00445 return false;
00446 }
00447
00448 private:
00449
00450 bool attempt(ext_qnode & me) {
00451 w_assert1(!is_mine(&me));
00452 w_assert0( me._held == 0 );
00453
00454 return pthread_mutex_trylock(&_mutex) == 0;
00455 }
00456
00457 public:
00458
00459 void* acquire(ext_qnode* me) {
00460 w_assert1(!is_mine(me));
00461 w_assert1( me->_held == 0 );
00462
00463 pthread_mutex_lock(&_mutex);
00464 me->_held = this;
00465 _holder = this;
00466 #if W_DEBUG_LEVEL > 0
00467 {
00468 membar_enter();
00469 w_assert1(is_mine(me));
00470 }
00471 #endif
00472 return 0;
00473 }
00474
00475
00476 void release(ext_qnode &me) { release(&me); }
00477
00478
00479 void release(ext_qnode_ptr me) {
00480
00481 w_assert1( _holder == me->_held );
00482 w_assert1(me->_held == this);
00483 me->_held = 0;
00484 _holder = 0;
00485 pthread_mutex_unlock(&_mutex);
00486 #if W_DEBUG_LEVEL > 10
00487
00488
00489
00490 {
00491 membar_enter();
00492 w_pthread_lock_t *h = _holder;
00493 w_pthread_lock_t *m = me->_held;
00494 w_assert1( (h==NULL && m==NULL)
00495 || (h != m) );
00496 }
00497 #endif
00498 }
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510 bool is_mine(ext_qnode* me) const {
00511 if( me->_held == this ) {
00512
00513 w_assert1( _holder == me->_held );
00514 return true;
00515 }
00516 return false;
00517 }
00518 };
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551 typedef w_pthread_lock_t queue_based_block_lock_t;
00552 #define QUEUE_BLOCK_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00553
00554 #define QUEUE_BLOCK_EXT_QNODE_INITIALIZE(x) x._held = NULL
00555
00556 #if defined(USE_PTHREAD_MUTEX) && USE_PTHREAD_MUTEX==1
00557 typedef w_pthread_lock_t queue_based_spin_lock_t;
00558 typedef w_pthread_lock_t queue_based_lock_t;
00559 #define QUEUE_SPIN_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00560 #define QUEUE_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00561
00562 #define QUEUE_EXT_QNODE_INITIALIZE(x) x._held = NULL;
00563 #else
00564 #include <mcs_lock.h>
00565 typedef mcs_lock queue_based_spin_lock_t;
00566 typedef mcs_lock queue_based_lock_t;
00567 #define QUEUE_SPIN_EXT_QNODE_INITIALIZER MCS_EXT_QNODE_INITIALIZER
00568 #define QUEUE_EXT_QNODE_INITIALIZER MCS_EXT_QNODE_INITIALIZER
00569
00570 #define QUEUE_EXT_QNODE_INITIALIZE(x) MCS_EXT_QNODE_INITIALIZE(x)
00571 #endif
00572
00573 #ifndef SRWLOCK_H
00574 #include <srwlock.h>
00575 #endif
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595 struct occ_rwlock {
00596 occ_rwlock();
00597 ~occ_rwlock();
00598
00599 void acquire_read();
00600
00601 void release_read();
00602
00603 void acquire_write();
00604
00605 void release_write();
00606
00607
00608
00609 struct occ_rlock {
00610 occ_rwlock* _lock;
00611 void acquire() { _lock->acquire_read(); }
00612 void release() { _lock->release_read(); }
00613 };
00614
00615 struct occ_wlock {
00616 occ_rwlock* _lock;
00617 void acquire() { _lock->acquire_write(); }
00618 void release() { _lock->release_write(); }
00619 };
00620
00621
00622 occ_rlock *read_lock() { return &_read_lock; }
00623
00624 occ_wlock *write_lock() { return &_write_lock; }
00625
00626 private:
00627 enum { WRITER=1, READER=2 };
00628 unsigned int volatile _active_count;
00629 occ_rlock _read_lock;
00630 occ_wlock _write_lock;
00631
00632 pthread_mutex_t _read_write_mutex;
00633 pthread_cond_t _read_cond;
00634 pthread_cond_t _write_cond;
00635 };
00636
00637 typedef w_list_t<sthread_t, queue_based_lock_t> sthread_list_t;
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656 class sthread_t : public sthread_named_base_t
00657 {
00658 friend class sthread_init_t;
00659 friend class sthread_main_t;
00660
00661 friend class latch_t;
00662
00663
00664
00665 public:
00666 static void initialize_sthreads_package();
00667
00668 enum status_t {
00669 t_defunct,
00670 t_virgin,
00671 t_ready,
00672 t_running,
00673 t_blocked,
00674 t_boot
00675 };
00676 static const char *status_strings[];
00677
00678 enum priority_t {
00679 t_time_critical = 1,
00680 t_regular = 0,
00681 max_priority = t_time_critical,
00682 min_priority = t_regular
00683 };
00684 static const char *priority_strings[];
00685
00686
00687 enum { default_stack = 64*1024 };
00688
00689
00690
00691
00692 void* user;
00693 const id_t id;
00694
00695
00696
00697 static w_base_t::int8_t max_os_file_size;
00698
00699 private:
00700
00701
00702 static w_rc_t::errcode_t _block(
00703 timeout_in_ms timeout = WAIT_FOREVER,
00704 const char* const caller = 0,
00705 const void * id = 0);
00706
00707 static w_rc_t::errcode_t _block(
00708 pthread_mutex_t *lock,
00709 timeout_in_ms timeout = WAIT_FOREVER,
00710 sthread_list_t* list = 0,
00711 const char* const caller = 0,
00712 const void * id = 0);
00713
00714 w_rc_t _unblock(w_rc_t::errcode_t e);
00715
00716 public:
00717 static void timeout_to_timespec(timeout_in_ms timeout,
00718 struct timespec &when);
00719 w_rc_t unblock(w_rc_t::errcode_t e);
00720 static w_rc_t block(
00721 pthread_mutex_t &lock,
00722 timeout_in_ms timeout = WAIT_FOREVER,
00723 sthread_list_t* list = 0,
00724 const char* const caller = 0,
00725 const void * id = 0);
00726 static w_rc_t::errcode_t block(int4_t timeout = WAIT_FOREVER);
00727
00728 virtual void _dump(ostream &) const;
00729
00730
00731 static void dumpall(const char *, ostream &);
00732 static void dumpall(ostream &);
00733
00734 static void dump_io(ostream &);
00735 static void dump_event(ostream &);
00736
00737 static void dump_stats(ostream &);
00738 static void reset_stats();
00739
00740
00741
00742 virtual void vtable_collect(vtable_row_t &);
00743
00744 static void vtable_collect_names(vtable_row_t &);
00745
00746
00747
00748
00749 static int collect(vtable_t&v, bool attr_names_too=true);
00750
00751
00752 static void find_stack(void *address);
00753 static void for_each_thread(ThreadFunc& f);
00754
00755
00756 static void check_all_stacks(const char *file = "",
00757 int line = 0);
00758 bool isStackOK(const char *file = "", int line = 0) const;
00759
00760
00761 bool isStackFrameOK(size_t size = 0);
00762
00763 w_rc_t set_priority(priority_t priority);
00764 priority_t priority() const;
00765 status_t status() const;
00766
00767 private:
00768
00769
00770 #ifdef WITHOUT_MMAP
00771 static w_rc_t set_bufsize_memalign(size_t size,
00772 char *&buf_start , long system_page_size);
00773 #endif
00774 #ifdef HAVE_HUGETLBFS
00775 public:
00776
00777 static w_rc_t set_hugetlbfs_path(const char *path);
00778 private:
00779 static w_rc_t set_bufsize_huge(size_t size,
00780 char *&buf_start , long system_page_size);
00781 #endif
00782 static w_rc_t set_bufsize_normal(size_t size,
00783 char *&buf_start , long system_page_size);
00784 static void align_bufsize(size_t size, long system_page_size,
00785 long max_page_size);
00786 static long get_max_page_size(long system_page_size);
00787 static void align_for_sm(size_t requested_size);
00788
00789 public:
00790 static int do_unmap();
00791
00792
00793
00794 static char* set_bufsize(size_t size);
00795 static w_rc_t set_bufsize(size_t size, char *&buf_start ,
00796 bool use_normal_if_huge_fails=false);
00797
00798 static w_rc_t open(
00799 const char* path,
00800 int flags,
00801 int mode,
00802 int& fd);
00803 static w_rc_t close(int fd);
00804 static w_rc_t read(
00805 int fd,
00806 void* buf,
00807 int n);
00808 static w_rc_t write(
00809 int fd,
00810 const void* buf,
00811 int n);
00812 static w_rc_t readv(
00813 int fd,
00814 const iovec_t* iov,
00815 size_t iovcnt);
00816 static w_rc_t writev(
00817 int fd,
00818 const iovec_t* iov,
00819 size_t iovcnt);
00820
00821 static w_rc_t pread(int fd, void *buf, int n, fileoff_t pos);
00822 static w_rc_t pwrite(int fd, const void *buf, int n,
00823 fileoff_t pos);
00824 static w_rc_t lseek(
00825 int fd,
00826 fileoff_t offset,
00827 int whence,
00828 fileoff_t& ret);
00829
00830 static w_rc_t lseek(
00831 int fd,
00832 fileoff_t offset,
00833 int whence);
00834 static w_rc_t fsync(int fd);
00835 static w_rc_t ftruncate(int fd, fileoff_t sz);
00836 static w_rc_t fstat(int fd, filestat_t &sb);
00837 static w_rc_t fisraw(int fd, bool &raw);
00838
00839
00840
00841
00842
00843 private:
00844
00845
00846
00847
00848
00849
00850 inline static sthread_t*& me_lval() {
00851
00852
00853
00854
00855 static __thread sthread_t* _TLSme(NULL);
00856 return _TLSme;
00857 }
00858 public:
00859
00860 static sthread_t* me() { return me_lval(); }
00861
00862 pthread_t myself();
00863 static int rand();
00864 static double drand();
00865 static int randn(int max);
00866
00867
00868
00869
00870 void sleep(timeout_in_ms timeout = WAIT_IMMEDIATE,
00871 const char *reason = 0);
00872 void wakeup();
00873
00874
00875 w_rc_t join(timeout_in_ms timeout = WAIT_FOREVER);
00876
00877
00878 w_rc_t fork();
00879
00880
00881 static void yield();
00882 ostream &print(ostream &) const;
00883
00884
00885 virtual ~sthread_t();
00886
00887
00888
00889
00890 virtual smthread_t* dynamic_cast_to_smthread();
00891 virtual const smthread_t* dynamic_cast_to_const_smthread() const;
00892
00893 protected:
00894 sthread_t(
00895 priority_t priority = t_regular,
00896 const char *name = 0,
00897 unsigned stack_size = default_stack);
00898
00899 virtual void before_run() { }
00900 virtual void run() = 0;
00901 virtual void after_run() { }
00902
00903 private:
00904
00905
00906 enum { fd_base = 4000 };
00907 void * _start_frame;
00908 void * _danger;
00909 size_t _stack_size;
00910
00911 pthread_mutex_t _wait_lock;
00912
00913 pthread_cond_t _wait_cond;
00914
00915 pthread_mutex_t* _start_terminate_lock;
00916 pthread_cond_t * _start_cond;
00917
00918 volatile bool _sleeping;
00919 volatile bool _forked;
00920 bool _terminated;
00921
00922 volatile bool _unblock_flag;
00923
00924 fill4 _dummy4valgrind;
00925
00926 sthread_core_t * _core;
00927 volatile status_t _status;
00928 priority_t _priority;
00929 w_rc_t::errcode_t _rce;
00930
00931 w_link_t _link;
00932
00933 w_link_t _class_link;
00934
00935 static sthread_list_t* _class_list;
00936 static queue_based_lock_t _class_list_lock;
00937
00938
00939
00940
00941
00942
00943 static sdisk_t **_disks;
00944 static unsigned open_max;
00945 static unsigned open_count;
00946
00947
00948 static void __start(void *arg_thread);
00949 void _start();
00950
00951
00952
00953 static w_rc_t cold_startup();
00954 static w_rc_t shutdown();
00955 static stime_t boot_time;
00956 static sthread_t* _main_thread;
00957 static uint4_t _next_id;
00958
00959 private:
00960 static int _disk_buffer_disalignment;
00961 static size_t _disk_buffer_size;
00962 static char * _disk_buffer;
00963 public:
00964
00965 sthread_stats SthreadStats;
00966 };
00967
00968 extern ostream &operator<<(ostream &o, const sthread_t &t);
00969
00970 void print_timeout(ostream& o, const sthread_base_t::timeout_in_ms timeout);
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980
00981 class sthread_main_t : public sthread_t {
00982 friend class sthread_t;
00983
00984 protected:
00985 NORET sthread_main_t();
00986 virtual void run();
00987 };
00988
00989
00990
00991
00992
00993 #define MUTEX_ACQUIRE(mutex) W_COERCE((mutex).acquire());
00994 #define MUTEX_RELEASE(mutex) (mutex).release();
00995 #define MUTEX_IS_MINE(mutex) (mutex).is_mine()
00996
00997
00998
00999 #include "critical_section.h"
01000
01001
01002
01003
01004 SPECIALIZE_CS(tatas_lock, int _dummy, (_dummy=0),
01005 _mutex->acquire(), _mutex->release());
01006
01007
01008 SPECIALIZE_CS(w_pthread_lock_t, w_pthread_lock_t::ext_qnode _me, (_me._held=0),
01009 _mutex->acquire(&_me), _mutex->release(&_me));
01010 #if !defined(USE_PTHREAD_MUTEX) || USE_PTHREAD_MUTEX==0
01011 SPECIALIZE_CS(mcs_lock, mcs_lock::ext_qnode _me, (_me._held=0),
01012 _mutex->acquire(&_me), _mutex->release(&_me));
01013 #endif
01014
01015 SPECIALIZE_CS(occ_rwlock::occ_rlock, int _dummy, (_dummy=0),
01016 _mutex->acquire(), _mutex->release());
01017
01018 SPECIALIZE_CS(occ_rwlock::occ_wlock, int _dummy, (_dummy=0),
01019 _mutex->acquire(), _mutex->release());
01020
01021 inline sthread_t::priority_t
01022 sthread_t::priority() const
01023 {
01024 return _priority;
01025 }
01026
01027 inline sthread_t::status_t
01028 sthread_t::status() const
01029 {
01030 return _status;
01031 }
01032
01033 #include <w_strstream.h>
01034
01035 #include <string.h>
01036
01037
01038
01039
01040
01041 #endif