sthread.h

Go to the documentation of this file.
00001 /* -*- mode:C++; c-basic-offset:4 -*-
00002      Shore-MT -- Multi-threaded port of the SHORE storage manager
00003    
00004                        Copyright (c) 2007-2009
00005       Data Intensive Applications and Systems Labaratory (DIAS)
00006                Ecole Polytechnique Federale de Lausanne
00007    
00008                          All Rights Reserved.
00009    
00010    Permission to use, copy, modify and distribute this software and
00011    its documentation is hereby granted, provided that both the
00012    copyright notice and this permission notice appear in all copies of
00013    the software, derivative works or modified versions, and any
00014    portions thereof, and that both notices appear in supporting
00015    documentation.
00016    
00017    This code is distributed in the hope that it will be useful, but
00018    WITHOUT ANY WARRANTY; without even the implied warranty of
00019    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. THE AUTHORS
00020    DISCLAIM ANY LIABILITY OF ANY KIND FOR ANY DAMAGES WHATSOEVER
00021    RESULTING FROM THE USE OF THIS SOFTWARE.
00022 */
00023 
00024 // -*- mode:c++; c-basic-offset:4 -*-
00025 /*<std-header orig-src='shore' incl-file-exclusion='STHREAD_H'>
00026 
00027  $Id: sthread.h,v 1.209 2012/01/02 17:02:22 nhall Exp $
00028 
00029 SHORE -- Scalable Heterogeneous Object REpository
00030 
00031 Copyright (c) 1994-99 Computer Sciences Department, University of
00032                       Wisconsin -- Madison
00033 All Rights Reserved.
00034 
00035 Permission to use, copy, modify and distribute this software and its
00036 documentation is hereby granted, provided that both the copyright
00037 notice and this permission notice appear in all copies of the
00038 software, derivative works or modified versions, and any portions
00039 thereof, and that both notices appear in supporting documentation.
00040 
00041 THE AUTHORS AND THE COMPUTER SCIENCES DEPARTMENT OF THE UNIVERSITY
00042 OF WISCONSIN - MADISON ALLOW FREE USE OF THIS SOFTWARE IN ITS
00043 "AS IS" CONDITION, AND THEY DISCLAIM ANY LIABILITY OF ANY KIND
00044 FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
00045 
00046 This software was developed with support by the Advanced Research
00047 Project Agency, ARPA order number 018 (formerly 8230), monitored by
00048 the U.S. Army Research Laboratory under contract DAAB07-91-C-Q518.
00049 Further funding for this work was provided by DARPA through
00050 Rome Research Laboratory Contract No. F30602-97-2-0247.
00051 
00052 */
00053 
00054 /*  -- do not edit anything above this line --   </std-header>*/
00055 
00056 /*
00057  * The SHORE threads layer has some historical roots in the
00058  * the NewThreads implementation wrapped up as c++ objects.
00059  *
00060  * With release 6.0 of the SHORE Storage Manager, the NewThreads
00061  * functionality was substantially obviated.  Some bits and pieces
00062  * of the SHORE threads layer remains in the synchronization variables
00063  * in the sthread_t API.
00064  *
00065  * To the extent that any NewThreads code remains here, 
00066  * the following copyright applies: 
00067  *
00068  *   NewThreads is Copyright 1992, 1993, 1994, 1995, 1996, 1997 by:
00069  *
00070  *    Josef Burger    <bolo@cs.wisc.edu>
00071  *    Dylan McNamee   <dylan@cse.ogi.edu>
00072  *    Ed Felten       <felten@cs.princeton.edu>
00073  *
00074  *   All Rights Reserved.
00075  *
00076  *   NewThreads may be freely used as long as credit is given
00077  *   to the above authors and the above copyright is maintained.
00078  */
00079 
00080 /**\file sthread.h
00081  *\ingroup MACROS
00082  *
00083  * This file contains the Shore Threads API.
00084  */
00085 
00086 #ifndef STHREAD_H
00087 #define STHREAD_H
00088 
00089 #include "w_defines.h"
00090 #include "w_rc.h"
00091 #include "atomic_templates.h"
00092 #include "w_strstream.h"
00093 #include "stime.h"
00094 #include "gethrtime.h"
00095 #include <vtable.h>
00096 #include <w_list.h>
00097 
00098 // this #include reflects the fact that sthreads is now just a pthreads wrapper
00099 #include <w_pthread.h>
00100 #include <sthread_stats.h>
00101 
00102 class sthread_t;
00103 class smthread_t;
00104 
00105 
00106 #ifdef __GNUC__
00107 #pragma interface
00108 #endif
00109 
00110 #ifndef SDISK_H
00111 #include <sdisk.h>
00112 #endif
00113 
00114 class vtable_row_t;
00115 class vtable_t;
00116 
00117 struct sthread_core_t;
00118 
00119 extern "C" void         dumpthreads(); // for calling from debugger
00120 
00121 
00122 /**\brief Base class for sthreads.  See \ref timeout_in_ms, \ref timeout_t
00123  */
00124 class sthread_base_t : public w_base_t {
00125 public:
00126 /**\cond skip */
00127     typedef unsigned int w_thread_id_t; // TODO REMOVE
00128     typedef w_thread_id_t id_t;
00129 /**\endcond skip */
00130 
00131     /* XXX this is really something for the SM, not the threads package;
00132        only WAIT_IMMEDIATE should ever make it to the threads package. */
00133 
00134     /**\enum timeout_t
00135      * \brief Special values for timeout_in_ms.
00136      *
00137      * \details sthreads package recognizes 2 WAIT_* values:
00138      * == WAIT_IMMEDIATE
00139      * and != WAIT_IMMEDIATE.
00140      *
00141      * If it's not WAIT_IMMEDIATE, it's assumed to be
00142      * a positive integer (milliseconds) used for the
00143      * select timeout.
00144      * WAIT_IMMEDIATE: no wait
00145      * WAIT_FOREVER:   may block indefinitely
00146      * The user of the thread (e.g., sm) had better
00147      * convert timeout that are negative values (WAIT_* below)
00148      * to something >= 0 before calling block().
00149      *
00150      * All other WAIT_* values other than WAIT_IMMEDIATE
00151      * are handled by sm layer:
00152      * WAIT_SPECIFIED_BY_THREAD: pick up a timeout_in_ms from the smthread.
00153      * WAIT_SPECIFIED_BY_XCT: pick up a timeout_in_ms from the transaction.
00154      * Anything else: not legitimate.
00155      * 
00156      * \sa timeout_in_ms
00157      */
00158     enum timeout_t {
00159     WAIT_IMMEDIATE     = 0, 
00160     WAIT_FOREVER     = -1,
00161     WAIT_SPECIFIED_BY_THREAD     = -4, // used by lock manager
00162     WAIT_SPECIFIED_BY_XCT = -5, // used by lock manager
00163     WAIT_NOT_USED = -6 // indicates last negative number used by sthreads
00164     };
00165     /* XXX int would also work, sized type not necessary */
00166     /**\typedef int4_t timeout_in_ms;
00167      * \brief Timeout in milliseconds if > 0
00168      * \details
00169      * sthread_t blocking methods take a timeout in milliseconds.
00170      * If the value is < 0, then it's expected to be a member of the
00171      * enumeration type timeout_t.
00172      *
00173      * \sa timeout_t
00174      */
00175     typedef int4_t timeout_in_ms;
00176 
00177 /**\cond skip */
00178     static const w_error_t::info_t     error_info[];
00179     static void  init_errorcodes();
00180 
00181 #include "st_error_enum_gen.h"
00182 
00183     enum {
00184     stOS = fcOS,
00185     stINTERNAL = fcINTERNAL,
00186     stNOTIMPLEMENTED = fcNOTIMPLEMENTED 
00187     };
00188 
00189     /* import sdisk base */
00190     typedef sdisk_base_t::fileoff_t    fileoff_t;
00191     typedef sdisk_base_t::filestat_t   filestat_t;
00192     typedef sdisk_base_t::iovec_t      iovec_t;
00193 
00194 
00195     /* XXX magic number */
00196     enum { iovec_max = 8 };
00197 
00198     enum {
00199     OPEN_RDWR = sdisk_base_t::OPEN_RDWR,
00200     OPEN_RDONLY = sdisk_base_t::OPEN_RDONLY,
00201     OPEN_WRONLY = sdisk_base_t::OPEN_WRONLY,
00202 
00203     OPEN_SYNC = sdisk_base_t::OPEN_SYNC,
00204     OPEN_TRUNC = sdisk_base_t::OPEN_TRUNC,
00205     OPEN_CREATE = sdisk_base_t::OPEN_CREATE,
00206     OPEN_EXCL = sdisk_base_t::OPEN_EXCL,
00207     OPEN_APPEND = sdisk_base_t::OPEN_APPEND,
00208     OPEN_RAW = sdisk_base_t::OPEN_RAW
00209     };
00210     enum {
00211     SEEK_AT_SET = sdisk_base_t::SEEK_AT_SET,
00212     SEEK_AT_CUR = sdisk_base_t::SEEK_AT_CUR,
00213     SEEK_AT_END = sdisk_base_t::SEEK_AT_END
00214     };
00215 /**\endcond skip */
00216 };
00217 
00218 /**\cond skip */
00219 class sthread_name_t {
00220 public:
00221     enum { NAME_ARRAY = 64 };
00222 
00223     char        _name[NAME_ARRAY];
00224 
00225     sthread_name_t();
00226     ~sthread_name_t();
00227 
00228     void rename(const char *n1, const char *n2=0, const char *n3=0);
00229 };
00230 
00231 class sthread_named_base_t: public sthread_base_t
00232 {
00233 public:
00234     NORET            sthread_named_base_t(
00235     const char*            n1 = 0,
00236     const char*            n2 = 0,
00237     const char*            n3 = 0);
00238     NORET            ~sthread_named_base_t();
00239     
00240     void            rename(
00241     const char*            n1,
00242     const char*            n2 = 0,
00243     const char*            n3 = 0);
00244 
00245     const char*            name() const;
00246     void                   unname();
00247 
00248 private:
00249     sthread_name_t        _name;
00250 };
00251 
00252 inline NORET
00253 sthread_named_base_t::sthread_named_base_t(
00254     const char*        n1,
00255     const char*        n2,
00256     const char*        n3)
00257 {
00258     rename(n1, n2, n3);
00259 
00260 }
00261 
00262 inline const char*
00263 sthread_named_base_t::name() const
00264 {
00265     return _name._name;
00266 }
00267 
00268 class sthread_main_t;
00269 
00270 /**\endcond skip */
00271 
00272 /**\brief A callback class for traversing the list of all sthreads.
00273  * \details
00274  * Use with for_each_thread. Somewhat costly because it's thread-safe.
00275  */
00276 class ThreadFunc
00277 {
00278     public:
00279     virtual void operator()(const sthread_t& thread) = 0;
00280     virtual NORET ~ThreadFunc() {}
00281 };
00282 
00283 
00284 class sthread_init_t;
00285 class sthread_main_t;
00286 
00287 // these macros allow us to notify the SunStudio race detector about lock acquires/releases
00288 
00289 #include "os_interface.h"
00290 
00291 /**\brief A test-and-test-and-set spinlock. 
00292  *
00293  * This lock is good for short, uncontended critical sections. 
00294  * If contention is high, use an mcs_lock. 
00295  * Long critical sections should use pthread_mutex_t.
00296  *
00297  * Tradeoffs are:
00298  *  - test-and-test-and-set locks: low-overhead but not scalable
00299  *  - queue-based locks: higher overhead but scalable
00300  *  - pthread mutexes : very high overhead and blocks, but frees up 
00301  *  cpu for other threads when number of cpus is fewer than number of threads
00302  *
00303  *  \sa REFSYNC
00304  */
00305 struct tatas_lock {
00306     /**\cond skip */
00307     enum { NOBODY=0 };
00308     typedef union  {
00309         pthread_t         handle;
00310 #undef CASFUNC 
00311 #if SIZEOF_PTHREAD_T==4
00312 #define CASFUNC atomic_cas_32
00313         unsigned int       bits;
00314 #elif SIZEOF_PTHREAD_T==8
00315 # define CASFUNC atomic_cas_64
00316         uint64_t           bits;
00317 #elif SIZEOF_PTHREAD_T==0
00318 #error  Configuration could not determine size of pthread_t. Fix configure.ac.
00319 #else 
00320 #error  Configuration determined size of pthread_t is unexpected. Fix sthread.h.
00321 #endif
00322     } holder_type_t;
00323     volatile holder_type_t _holder;
00324     /**\endcond skip */
00325 
00326     tatas_lock() { _holder.bits=NOBODY; }
00327 
00328 private:
00329     // CC mangles this as __1cKtatas_lockEspin6M_v_
00330     /// spin until lock is free
00331     void spin() { while(*&(_holder.handle)) ; }
00332 
00333 public:
00334     /// Try to acquire the lock immediately.
00335     bool try_lock() 
00336     {
00337         holder_type_t tid = { pthread_self() };
00338         bool success = false;
00339         unsigned int old_holder = 
00340                         CASFUNC(&_holder.bits, NOBODY, tid.bits);
00341         if(old_holder == NOBODY) {
00342             membar_enter();
00343             success = true;
00344         }
00345         
00346         return success;
00347     }
00348 
00349     /// Acquire the lock, spinning as long as necessary. 
00350     void acquire() {
00351         w_assert1(!is_mine());
00352         holder_type_t tid = { pthread_self() };
00353         do {
00354             spin();
00355         }
00356         while(CASFUNC(&_holder.bits, NOBODY, tid.bits));
00357         membar_enter();
00358         w_assert1(is_mine());
00359     }
00360 
00361     /// Release the lock
00362     void release() {
00363         membar_exit();
00364         w_assert1(is_mine()); // moved after the membar
00365         _holder.bits= NOBODY;
00366 #if W_DEBUG_LEVEL > 0
00367         {
00368             membar_enter(); // needed for the assert?
00369             w_assert1(!is_mine());
00370         }
00371 #endif
00372     }
00373 
00374     /// True if this thread is the lock holder
00375     bool is_mine() const { return 
00376         pthread_equal(_holder.handle, pthread_self()) ? true : false; }
00377 #undef CASFUNC 
00378 };
00379 
00380 /**\brief Wrapper for pthread mutexes, with a queue-based lock API.
00381  *
00382  * When the storage manager is configured with the default,
00383  * --enable-pthread-mutex, this lock uses a Pthreads mutex for the lock.
00384  * In this case, it is not a true queue-based lock, since
00385  * release doesn't inform the next node in the queue, and in fact the
00386  * nodes aren't kept in a queue.
00387  * It just gives pthread mutexes the same API as the other
00388  * queue-based locks so that we use the same idioms for
00389  * critical sections based on different kinds of locks.
00390  * By configuring with pthreads mutexes implementing this class, the
00391  * server can spawn any number of threads, regardless of the number
00392  * of hardware contexts available; threads will block as necessary.
00393  *
00394  * When the storage manager is configured with 
00395  * --disable-pthread-mutex, this lock uses an MCS (\ref MCS1) queue-based
00396  * lock for the lock.
00397  * In this case, it is a true queue-based lock.
00398  * By configuring with MCS locks implementing this class, if the
00399  * server spawn many more threads than hardware contexts, time can be wasted
00400  * spinning; threads will not block until the operating system (or underlying 
00401  * thread scheduler) determines to block the thread.
00402  *
00403  * The idiom for using these locks is
00404  * that the qnode is on a threads's stack, so the qnode
00405  * implicitly identifies the owning thread.
00406  *
00407  * This allows us to add an is_mine() capability that otherwise
00408  * the pthread mutexen don't have.
00409  *
00410  * Finally, using this class ensures that the pthread_mutex_init/destroy
00411  * is done (in the --enable-pthread-mutex case).
00412  *
00413  *  See also: \ref REFSYNC
00414  *
00415  */
00416 struct w_pthread_lock_t 
00417 {
00418     /**\cond skip */
00419     struct ext_qnode {
00420         w_pthread_lock_t* _held;
00421     };
00422 #define PTHREAD_EXT_QNODE_INITIALIZER { NULL }
00423 #define PTHREAD_EXT_QNODE_INITIALIZE(x) (x)._held =  NULL
00424 
00425     typedef ext_qnode volatile* ext_qnode_ptr;
00426     /**\endcond skip */
00427 
00428 private:
00429     pthread_mutex_t     _mutex; // w_pthread_lock_t blocks on this
00430     /// Holder is this struct if acquire is successful.
00431     w_pthread_lock_t *  _holder;
00432 
00433 public:
00434     w_pthread_lock_t() :_holder(0) { pthread_mutex_init(&_mutex, 0); }
00435 
00436     ~w_pthread_lock_t() { w_assert1(!_holder); pthread_mutex_destroy(&_mutex);}
00437     
00438     /// Returns true if success.
00439     bool attempt(ext_qnode* me) {
00440         if(attempt( *me)) {
00441             me->_held = this;
00442             _holder = this;
00443             return true;
00444         }
00445         return false;
00446     }
00447 
00448 private:
00449     /// Returns true if success. Helper for attempt(ext_qnode *).
00450     bool attempt(ext_qnode & me) {
00451         w_assert1(!is_mine(&me));
00452         w_assert0( me._held == 0 );  // had better not 
00453         // be using this qnode for another lock!
00454         return pthread_mutex_trylock(&_mutex) == 0;
00455     }
00456 
00457 public:
00458     /// Acquire the lock and set the qnode to refer to this lock.
00459     void* acquire(ext_qnode* me) {
00460         w_assert1(!is_mine(me));
00461         w_assert1( me->_held == 0 );  // had better not 
00462         // be using this qnode for another lock!
00463         pthread_mutex_lock(&_mutex);
00464         me->_held = this;
00465         _holder = this;
00466 #if W_DEBUG_LEVEL > 0
00467         {
00468             membar_enter(); // needed for the assert
00469             w_assert1(is_mine(me)); // TODO: change to assert2
00470         }
00471 #endif
00472         return 0;
00473     }
00474 
00475     /// Release the lock and clear the qnode.
00476     void release(ext_qnode &me) { release(&me); }
00477 
00478     /// Release the lock and clear the qnode.
00479     void release(ext_qnode_ptr me) { 
00480         // assert is_mine:
00481         w_assert1( _holder == me->_held ); 
00482         w_assert1(me->_held == this); 
00483          me->_held = 0; 
00484         _holder = 0;
00485         pthread_mutex_unlock(&_mutex); 
00486 #if W_DEBUG_LEVEL > 10
00487         // This is racy since the containing structure could
00488         // have been freed by the time we do this check.  Thus,
00489         // we'll remove it.
00490         {
00491             membar_enter(); // needed for the assertions?
00492             w_pthread_lock_t *h =  _holder;
00493             w_pthread_lock_t *m =  me->_held;
00494             w_assert1( (h==NULL && m==NULL)
00495                 || (h  != m) );
00496         }
00497 #endif
00498     }
00499 
00500     /**\brief Return true if this thread holds the lock.
00501      *
00502      * This method doesn't actually check for this pthread
00503      * holding the lock, but it checks that the qnode reference
00504      * is to this lock.  
00505      * The idiom for using these locks is
00506      * that the qnode is on a threads's stack, so the qnode
00507      * implicitly identifies the owning thread.
00508      */
00509     
00510     bool is_mine(ext_qnode* me) const { 
00511        if( me->_held == this ) {
00512            // only valid if is_mine 
00513           w_assert1( _holder == me->_held ); 
00514           return true;
00515        }
00516        return false;
00517     }
00518 };
00519 
00520 /**\def USE_PTHREAD_MUTEX
00521  * \brief If defined and value is 1, use pthread-based mutex for queue_based_lock_t
00522  *
00523  * \details
00524  * The Shore-MT release contained alternatives for scalable locks in
00525  * certain places in the storage manager; it was released with
00526  * these locks replaced by pthreads-based mutexes.
00527  *
00528  * You can disable the use of pthreads-based mutexes and use the
00529  * mcs-based locks by configuring with --disable-pthread-mutex.
00530  */
00531 
00532 /**\defgroup SYNCPRIM Synchronization Primitives
00533  *\ingroup UNUSED 
00534  *
00535  * sthread/sthread.h: As distributed, a queue-based lock 
00536  * is a w_pthread_lock_t,
00537  * which is a wrapper around a pthread lock to give it a queue-based-lock API.
00538  * True queue-based locks are not used, nor are time-published
00539  * locks.
00540  * Code for these implementations is included for future 
00541  * experimentation, along with typedefs that should allow
00542  * easy substitution, as they all should have the same API.
00543  *
00544  * We don't offer the spin implementations at the moment.
00545  */
00546 /*
00547  * These typedefs are included to allow substitution at some  point.
00548  * Where there is a preference, the code should use the appropriate typedef.
00549  */
00550 
00551 typedef w_pthread_lock_t queue_based_block_lock_t; // blocking impl always ok
00552 #define QUEUE_BLOCK_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00553 // non-static initialize:
00554 #define QUEUE_BLOCK_EXT_QNODE_INITIALIZE(x) x._held = NULL
00555 
00556 #if defined(USE_PTHREAD_MUTEX) && USE_PTHREAD_MUTEX==1
00557 typedef w_pthread_lock_t queue_based_spin_lock_t; // spin impl preferred
00558 typedef w_pthread_lock_t queue_based_lock_t; // might want to use spin impl
00559 #define QUEUE_SPIN_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00560 #define QUEUE_EXT_QNODE_INITIALIZER      PTHREAD_EXT_QNODE_INITIALIZER
00561 // non-static initialize:
00562 #define QUEUE_EXT_QNODE_INITIALIZE(x) x._held = NULL;
00563 #else
00564 #include <mcs_lock.h>
00565 typedef mcs_lock queue_based_spin_lock_t; // spin preferred
00566 typedef mcs_lock queue_based_lock_t;
00567 #define QUEUE_SPIN_EXT_QNODE_INITIALIZER MCS_EXT_QNODE_INITIALIZER
00568 #define QUEUE_EXT_QNODE_INITIALIZER      MCS_EXT_QNODE_INITIALIZER
00569 // non-static initialize:
00570 #define QUEUE_EXT_QNODE_INITIALIZE(x) MCS_EXT_QNODE_INITIALIZE(x)
00571 #endif
00572 
00573 #ifndef SRWLOCK_H
00574 #include <srwlock.h>
00575 #endif
00576 
00577 /**\brief A multiple-reader/single-writer lock based on pthreads (blocking)
00578  *
00579  * Use this to protect data structures that get hammered by
00580  *  reads and where updates are very rare.
00581  * It is used in the storage manager by the histograms (histo.cpp), 
00582  * and in place of some mutexen, where strict exclusion isn't required.
00583  *
00584  * This lock is used in the storage manager by the checkpoint thread
00585  * (the only acquire-writer) and other threads to be sure they don't
00586  * do certain nasty things when a checkpoint is going on.
00587  *
00588  * The idiom for using these locks is
00589  * that the qnode is on a threads's stack, so the qnode
00590  * implicitly identifies the owning thread.
00591  *
00592  *  See also: \ref REFSYNC
00593  *
00594  */
00595 struct occ_rwlock {
00596     occ_rwlock();
00597     ~occ_rwlock();
00598     /// The normal way to acquire a read lock.
00599     void acquire_read();
00600     /// The normal way to release a read lock.
00601     void release_read();
00602     /// The normal way to acquire a write lock.
00603     void acquire_write();
00604     /// The normal way to release a write lock.
00605     void release_write();
00606 
00607     /**\cond skip */
00608     /// Exposed for critical_section<>. Do not use directly.
00609     struct occ_rlock {
00610         occ_rwlock* _lock;
00611         void acquire() { _lock->acquire_read(); }
00612         void release() { _lock->release_read(); }
00613     };
00614     /// Exposed for critical_section<>. Do not use directly.
00615     struct occ_wlock {
00616         occ_rwlock* _lock;
00617         void acquire() { _lock->acquire_write(); }
00618         void release() { _lock->release_write(); }
00619     };
00620 
00621     /// Exposed for the latch manager.. Do not use directly.
00622     occ_rlock *read_lock() { return &_read_lock; }
00623     /// Exposed for the latch manager.. Do not use directly.
00624     occ_wlock *write_lock() { return &_write_lock; }
00625     /**\endcond skip */
00626 private:
00627     enum { WRITER=1, READER=2 };
00628     unsigned int volatile _active_count;
00629     occ_rlock _read_lock;
00630     occ_wlock _write_lock;
00631 
00632     pthread_mutex_t _read_write_mutex; // paired w/ _read_cond, _write_cond
00633     pthread_cond_t _read_cond; // paired w/ _read_write_mutex
00634     pthread_cond_t _write_cond; // paired w/ _read_write_mutex
00635 };
00636 
00637 typedef w_list_t<sthread_t, queue_based_lock_t>        sthread_list_t;
00638 
00639 
00640 /**\brief Thread class for all threads that use the Shore Storage Manager.
00641  *  
00642  *  All threads that perform \b any work on behalf of the storage
00643  *  manager or call any storage manager API \b must be an sthread_t or
00644  *  a class derived from sthread_t.
00645  *
00646  *  Storage manager threads use block/unblock methods provided by
00647  *  sthread, and use thread-local storage (data attributes of
00648  *  sthread_t).
00649  *
00650  *  This class also provides an os-independent API for file-system
00651  *  calls (open, read, write, close, etc.) used by the storage manager.
00652  *
00653  *  This class is a fairly thin layer over pthreads.  Client threads
00654  *  may use pthread synchronization primitives. 
00655  */
00656 class sthread_t : public sthread_named_base_t  
00657 {
00658     friend class sthread_init_t;
00659     friend class sthread_main_t;
00660     /* For access to block() and unblock() */
00661     friend class latch_t;
00662     /* For access to I/O stats */
00663 
00664 
00665 public:
00666     static void  initialize_sthreads_package();
00667 
00668     enum status_t {
00669         t_defunct,    // thread has terminated
00670         t_virgin,    // thread hasn't started yet    
00671         t_ready,    // thread is ready to run
00672         t_running,    // when me() is this thread 
00673         t_blocked,      // thread is blocked on something
00674         t_boot        // system boot
00675     };
00676     static const char *status_strings[];
00677 
00678     enum priority_t {
00679         t_time_critical = 1,
00680         t_regular    = 0,
00681         max_priority    = t_time_critical,
00682         min_priority    = t_regular
00683     };
00684     static const char *priority_strings[];
00685 
00686     /* Default stack size for a thread */
00687     enum { default_stack = 64*1024 };
00688 
00689     /*
00690      *  Class member variables
00691      */
00692     void*             user;    // user can use this 
00693     const id_t        id;
00694 
00695     // max_os_file_size is used by the sm and set in
00696     // static initialization of sthreads (sthread_init_t in sthread.cpp)
00697     static w_base_t::int8_t     max_os_file_size;
00698 
00699 private:
00700 
00701     // ASSUMES WE ALREADY LOCKED self->_wait_lock
00702     static w_rc_t::errcode_t        _block(
00703                             timeout_in_ms          timeout = WAIT_FOREVER,
00704                             const char* const      caller = 0,
00705                             const void *           id = 0);
00706 
00707     static w_rc_t::errcode_t        _block(
00708                             pthread_mutex_t        *lock, 
00709                             timeout_in_ms          timeout = WAIT_FOREVER,
00710                             sthread_list_t*        list = 0,
00711                             const char* const      caller = 0,
00712                             const void *           id = 0);
00713 
00714     w_rc_t               _unblock(w_rc_t::errcode_t e);
00715 
00716 public:
00717     static void          timeout_to_timespec(timeout_in_ms timeout, 
00718                                              struct timespec &when);
00719     w_rc_t               unblock(w_rc_t::errcode_t e);
00720     static w_rc_t        block(
00721                             pthread_mutex_t        &lock,
00722                             timeout_in_ms          timeout = WAIT_FOREVER,
00723                             sthread_list_t*        list = 0,
00724                             const char* const      caller = 0,
00725                             const void *           id = 0);
00726     static w_rc_t::errcode_t       block(int4_t  timeout = WAIT_FOREVER);
00727 
00728     virtual void        _dump(ostream &) const; // to be over-ridden
00729 
00730     // these traverse all threads
00731     static void       dumpall(const char *, ostream &);
00732     static void       dumpall(ostream &);
00733 
00734     static void       dump_io(ostream &);
00735     static void       dump_event(ostream &);
00736 
00737     static void       dump_stats(ostream &);
00738     static void       reset_stats();
00739 
00740     /// Collect a row of a virtual table. One row per thread.
00741     /// Subclasses override this.
00742     virtual void      vtable_collect(vtable_row_t &); // to be over-ridden
00743     /// Stuff the attribute names in this row.
00744     static  void      vtable_collect_names(vtable_row_t &); // to be over-ridden
00745 
00746     /// Collect an entire table, one row per thread that the sthreads package
00747     /// knows about. If attr_names_too is true, the first row will be
00748     /// attribute names.
00749     static int        collect(vtable_t&v, bool attr_names_too=true); 
00750                         // in vtable_sthread.cpp
00751 
00752     static void      find_stack(void *address);
00753     static void      for_each_thread(ThreadFunc& f);
00754 
00755     /* request stack overflow check, die on error. */
00756     static void      check_all_stacks(const char *file = "",
00757                              int line = 0);
00758     bool             isStackOK(const char *file = "", int line = 0) const;
00759 
00760     /* Recursion, etc stack depth estimator */
00761     bool             isStackFrameOK(size_t size = 0);
00762 
00763     w_rc_t           set_priority(priority_t priority);
00764     priority_t       priority() const;
00765     status_t         status() const;
00766 
00767 private:
00768 
00769 // WITHOUT_MMAP is controlled by configure
00770 #ifdef WITHOUT_MMAP
00771     static w_rc_t     set_bufsize_memalign(size_t size, 
00772                         char *&buf_start /* in/out*/, long system_page_size);
00773 #endif
00774 #ifdef HAVE_HUGETLBFS
00775 public:
00776     // Must be called if we are configured with  hugetlbfs
00777     static w_rc_t     set_hugetlbfs_path(const char *path);
00778 private:
00779     static w_rc_t     set_bufsize_huge(size_t size, 
00780                         char *&buf_start /* in/out*/, long system_page_size);
00781 #endif
00782     static w_rc_t     set_bufsize_normal(size_t size, 
00783                         char *&buf_start /* in/out*/, long system_page_size);
00784     static void       align_bufsize(size_t size, long system_page_size,
00785                                                 long max_page_size);
00786     static long       get_max_page_size(long system_page_size);
00787     static void       align_for_sm(size_t requested_size);
00788 
00789 public:
00790     static int          do_unmap(); 
00791     /*
00792      *  Concurrent I/O ops
00793      */
00794     static char*        set_bufsize(size_t size);
00795     static w_rc_t       set_bufsize(size_t size, char *&buf_start /* in/out*/,
00796                                     bool use_normal_if_huge_fails=false);
00797 
00798     static w_rc_t        open(
00799                             const char*            path,
00800                             int                flags,
00801                             int                mode,
00802                             int&                fd);
00803     static w_rc_t        close(int fd);
00804     static w_rc_t        read(
00805                             int                 fd,
00806                             void*                 buf,
00807                             int                 n);
00808     static w_rc_t        write(
00809                             int                 fd, 
00810                             const void*             buf, 
00811                             int                 n);
00812     static w_rc_t        readv(
00813                             int                 fd, 
00814                             const iovec_t*             iov,
00815                             size_t                iovcnt);
00816     static w_rc_t        writev(
00817                             int                 fd,
00818                             const iovec_t*                iov,
00819                             size_t                 iovcnt);
00820 
00821     static w_rc_t        pread(int fd, void *buf, int n, fileoff_t pos);
00822     static w_rc_t        pwrite(int fd, const void *buf, int n,
00823                            fileoff_t pos);
00824     static w_rc_t        lseek(
00825                             int                fd,
00826                             fileoff_t            offset,
00827                             int                whence,
00828                             fileoff_t&            ret);
00829     /* returns an error if the seek doesn't match its destination */
00830     static w_rc_t        lseek(
00831                             int                fd,
00832                             fileoff_t                offset,
00833                             int                whence);
00834     static w_rc_t        fsync(int fd);
00835     static w_rc_t        ftruncate(int fd, fileoff_t sz);
00836     static w_rc_t        fstat(int fd, filestat_t &sb);
00837     static w_rc_t        fisraw(int fd, bool &raw);
00838 
00839 
00840     /*
00841      *  Misc
00842      */
00843 private:
00844     // NOTE: this returns a REFERENCE to a pointer
00845     /* #\fn static sthread_t*& sthread_t::me_lval()
00846      ** \brief Returns a (writable) reference to the a 
00847      * pointer to the running sthread_t.
00848      * \ingroup TLS
00849      */
00850     inline static sthread_t*& me_lval() {
00851         /**\var sthread_t* _me;
00852          * \brief A pointer to the running sthread_t.
00853          * \ingroup TLS
00854          */
00855         static __thread sthread_t* _TLSme(NULL);
00856         return _TLSme;
00857     }
00858 public:
00859     // NOTE: this returns a POINTER
00860     static sthread_t*    me() { return me_lval(); }
00861                          // for debugging:
00862     pthread_t            myself(); // pthread_t associated with this 
00863     static int           rand(); // returns an int in [0, 2**31)
00864     static double        drand(); // returns a double in [0.0, 1)
00865     static int           randn(int max); // returns an int in [0, max)
00866 
00867     /* XXX  sleep, fork, and wait exit overlap the unix version. */
00868 
00869     // sleep for timeout milliseconds
00870     void                 sleep(timeout_in_ms timeout = WAIT_IMMEDIATE,
00871                          const char *reason = 0);
00872     void                 wakeup();
00873 
00874     // wait for a thread to finish running
00875     w_rc_t            join(timeout_in_ms timeout = WAIT_FOREVER);
00876 
00877     // start a thread
00878     w_rc_t            fork();
00879 
00880     // give up the processor
00881     static void        yield();
00882     ostream            &print(ostream &) const;
00883 
00884     // anyone can wait and delete a thread
00885     virtual            ~sthread_t();
00886 
00887     // function to do runtime up-cast to smthread_t
00888     // return 0 if the sthread is not derrived from sm_thread_t.
00889     // should be removed when RTTI is supported
00890     virtual smthread_t*        dynamic_cast_to_smthread();
00891     virtual const smthread_t*  dynamic_cast_to_const_smthread() const;
00892 
00893     w_rc_t::errcode_t    error_code() const { return _rce;}
00894 
00895 protected:
00896     sthread_t(
00897           priority_t    priority = t_regular,
00898           const char    *name = 0,
00899           unsigned        stack_size = default_stack);
00900 
00901     virtual void        before_run() { }
00902     virtual void        run() = 0;
00903     virtual void        after_run() { }
00904 
00905 private:
00906 
00907     /* start offset of sthread FDs, to differentiate from system FDs */
00908     enum { fd_base = 4000 };
00909     void *                      _start_frame;
00910     void *                      _danger;
00911     size_t                      _stack_size;
00912 
00913     pthread_mutex_t             _wait_lock; // paired with _wait_cond, also
00914                                 // protects _link
00915     pthread_cond_t              _wait_cond; // posted when thread should unblock
00916 
00917     pthread_mutex_t*            _start_terminate_lock; // _start_cond, _terminate_cond, _forked
00918     pthread_cond_t *            _start_cond; // paired w/ _start_terminate_lock
00919 
00920     volatile bool               _sleeping;
00921     volatile bool               _forked;
00922     bool                        _terminated; // protects against double calls
00923                                 // to sthread_core_exit
00924     volatile bool               _unblock_flag; // used internally by _block()
00925 
00926     fill4                       _dummy4valgrind;
00927     
00928     sthread_core_t *            _core;        // registers, stack, etc
00929     volatile status_t           _status;    // thread status
00930     priority_t                  _priority;     // thread priority
00931     w_rc_t::errcode_t           _rce;        // used in block/unblock
00932 
00933     w_link_t                    _link;        // protected by _wait_lock
00934 
00935     w_link_t                    _class_link;    // used in _class_list,
00936                                  // protected by _class_list_lock
00937     static sthread_list_t*      _class_list;
00938     static queue_based_lock_t   _class_list_lock; // for protecting _class_list
00939 
00940 
00941     /* XXX alignment probs in derived thread classes.  Sigh */
00942     // fill4                       _ex_fill;
00943 
00944     /* I/O subsystem */
00945     static    sdisk_t        **_disks;
00946     static    unsigned       open_max;
00947     static    unsigned       open_count;
00948 
00949     /* in-thread startup and shutdown */ 
00950     static void            __start(void *arg_thread);
00951     void                   _start();
00952 
00953 
00954     /* system initialization and shutdown */
00955     static w_rc_t        cold_startup();
00956     static w_rc_t        shutdown();
00957     static stime_t        boot_time;
00958     static sthread_t*    _main_thread; 
00959     static uint4_t        _next_id;    // unique id generator
00960 
00961 private:
00962     static int           _disk_buffer_disalignment;
00963     static size_t        _disk_buffer_size;
00964     static char *        _disk_buffer;
00965 public:
00966     // export so smthread can read it and so latch/srwlock can write it:
00967     sthread_stats        SthreadStats;
00968 };
00969 
00970 extern ostream &operator<<(ostream &o, const sthread_t &t);
00971 
00972 void print_timeout(ostream& o, const sthread_base_t::timeout_in_ms timeout);
00973 
00974 
00975 /**\cond skip */
00976 /**\brief The main thread. 
00977 *
00978 * Called from sthread_t::cold_startup(), which is
00979 * called from sthread_init_t::do_init(), which is 
00980 * called from sthread_t::initialize_sthreads_package(), which is called 
00981 * when the storage manager sets up options, among other places.
00982 */
00983 class sthread_main_t : public sthread_t  {
00984     friend class sthread_t;
00985     
00986 protected:
00987     NORET            sthread_main_t();
00988     virtual void        run();
00989 };
00990 /**\endcond skip */
00991 
00992 
00993 /**\cond skip */
00994 
00995 #define MUTEX_ACQUIRE(mutex)    W_COERCE((mutex).acquire());
00996 #define MUTEX_RELEASE(mutex)    (mutex).release();
00997 #define MUTEX_IS_MINE(mutex)    (mutex).is_mine()
00998 
00999 // critical_section.h contains the macros needed for the following
01000 // SPECIALIZE_CS
01001 #include "critical_section.h"
01002 
01003 // tatas_lock doesn't have is_mine, but I changed its release()
01004 // to Release and through compiling saw everywhere that uses release,
01005 // and fixed those places
01006 SPECIALIZE_CS(tatas_lock, int _dummy, (_dummy=0), 
01007     _mutex->acquire(), _mutex->release());
01008 
01009 // queue_based_lock_t asserts is_mine() in release()
01010 SPECIALIZE_CS(w_pthread_lock_t, w_pthread_lock_t::ext_qnode _me, (_me._held=0), 
01011     _mutex->acquire(&_me), _mutex->release(&_me));
01012 #if !defined(USE_PTHREAD_MUTEX) || USE_PTHREAD_MUTEX==0
01013 SPECIALIZE_CS(mcs_lock, mcs_lock::ext_qnode _me, (_me._held=0), 
01014     _mutex->acquire(&_me), _mutex->release(&_me));
01015 #endif
01016 
01017 SPECIALIZE_CS(occ_rwlock::occ_rlock, int _dummy, (_dummy=0), 
01018     _mutex->acquire(), _mutex->release());
01019 
01020 SPECIALIZE_CS(occ_rwlock::occ_wlock, int _dummy, (_dummy=0), 
01021     _mutex->acquire(), _mutex->release());
01022 
01023 inline sthread_t::priority_t
01024 sthread_t::priority() const
01025 {
01026     return _priority;
01027 }
01028 
01029 inline sthread_t::status_t
01030 sthread_t::status() const
01031 {
01032     return _status;
01033 }
01034 
01035 #include <w_strstream.h>
01036 // Need string.h to get strerror_r 
01037 #include <string.h>
01038 /**\endcond skip */
01039 
01040 
01041 /*<std-footer incl-file-exclusion='STHREAD_H'>  -- do not edit anything below this line -- */
01042 
01043 #endif          /*</std-footer>*/

Generated on Mon Jan 2 15:13:57 2012 for Shore Storage Manager by  doxygen 1.4.7