src/core/tx.c

Go to the documentation of this file.
00001 
00010 #include <sys/time.h>
00011 #include <errno.h>
00012 #include <pthread.h>
00013 #include <stdio.h>
00014 #include <stdlib.h>
00015 #include <setjmp.h>
00016 #include <assert.h>
00017 #include <sched.h>
00018 #include <misc/debug.h>
00019 #include <misc/pool.h>
00020 #include <misc/malloc.h>
00021 #include <misc/mutex.h>
00022 #include <core/config.h>
00023 #include <core/sentinel.h>
00024 #include <core/buffer.h>
00025 #include <core/koa.h>
00026 #include <core/fm.h>
00027 #include <core/tx.h>
00028 #include <core/txdesc.h>
00029 
00030 
00031 static void tx_generic_undo_action(txc_tx_t *);
00032 static void tx_generic_commit_action(txc_tx_t *);
00033 
00034 
00035 __thread txc_tx_t *txc_l_txd;     /* Thread specific data stored in 
00036                                    * thread local storage (TLS) */
00037 
00038 
00039 struct txc_tx_action_list_entry_s {
00040         txc_tx_function_t function;
00041         void *args;
00042         int *error_result;
00043         int order;
00044 };
00045 
00046 
00047 struct txc_tx_action_list_s {
00048         txc_tx_action_list_entry_t *entries;      /* Array of entries */
00049         unsigned int               num_entries;   /* Number of entries */
00050         unsigned int               size;          /* Size of array */
00051 };
00052 
00053                                                           
00054 struct txc_txmgr_s {
00055         txc_mutex_t     mutex;
00056         unsigned int    alloc_txd_num;
00057         txc_tx_t        *alloc_txd_list_head;
00058         txc_tx_t        *alloc_txd_list_tail;
00059         txc_pool_t      *pool_txd;
00060         txc_buffermgr_t *buffermgr;
00061         txc_statsmgr_t  *statsmgr;
00062 };
00063 
00064 
00065 /* Include TM system specific functions */
00066 #include <tm/itm.h>
00067 #include <tm/logtm.h>
00068 
00069 txc_txmgr_t *txc_g_txmgr;
00070 
00071 static inline 
00072 txc_result_t
00073 allocate_action_list_entries(txc_tx_action_list_t *la, int extend)
00074 {
00075         if (extend) {
00076                 /* Extend action list */
00077                 la->size *= 2;
00078                 if ((la->entries = 
00079                      (txc_tx_action_list_entry_t *) REALLOC (la->entries, 
00080                                                              la->size * sizeof(txc_tx_action_list_entry_t)))
00081                     == NULL)
00082                 {
00083                         return TXC_R_NOMEMORY;
00084                 }
00085         } else {
00086                 /* Allocate action list */
00087                 if ((la->entries = 
00088                      (txc_tx_action_list_entry_t *) MALLOC (la->size * sizeof(txc_tx_action_list_entry_t)))
00089                     == NULL)
00090                 {
00091                         return TXC_R_NOMEMORY;
00092                 }       
00093         }
00094         return TXC_R_SUCCESS;
00095 }
00096 
00097 
00098 static inline 
00099 txc_result_t
00100 deallocate_action_list_entries(txc_tx_action_list_t *la)
00101 {
00102         FREE(la->entries);
00103         return TXC_R_SUCCESS;
00104 }
00105 
00106 
00107 txc_result_t
00108 txc_txmgr_create(txc_txmgr_t **txmgrp, 
00109                  txc_buffermgr_t *buffermgr, 
00110                  txc_sentinelmgr_t *sentinelmgr,
00111                  txc_statsmgr_t *statsmgr)
00112 {
00113         txc_result_t      result;
00114         txc_pool_object_t *pool_object;
00115         txc_tx_t          *txd;
00116         
00117         *txmgrp = (txc_txmgr_t *) MALLOC(sizeof(txc_txmgr_t));
00118         if (*txmgrp == NULL) {
00119                 return TXC_R_NOMEMORY;
00120         }
00121         if ((result = txc_pool_create(&((*txmgrp)->pool_txd), 
00122                                       sizeof(txc_tx_t),
00123                                       TXC_MAX_NUM_THREADS, 
00124                                       NULL)) != TXC_R_SUCCESS) 
00125         {
00126                 FREE(*txmgrp);
00127                 return result;
00128         }
00129 
00130         /* 
00131          * Allocating memory for logs and buffers should be done only once here.
00132          * Then descriptors can be recycled by just re-initializing them.
00133          */
00134         for (pool_object = txc_pool_object_first((*txmgrp)->pool_txd, 
00135                                                  TXC_POOL_OBJECT_ALLOCATED & 
00136                                                  TXC_POOL_OBJECT_FREE);
00137              pool_object != NULL;
00138                  pool_object = txc_pool_object_next(pool_object))
00139         {        
00140                 txd = (txc_tx_t *) txc_pool_object_of(pool_object);
00141                 txd->manager = *txmgrp;
00142                 txc_sentinel_list_create(sentinelmgr, &(txd->sentinel_list));
00143                 txc_sentinel_list_create(sentinelmgr, &(txd->sentinel_list_preacquire));
00144                 txd->commit_action_list = (txc_tx_commit_action_list_t *) MALLOC(sizeof(txc_tx_commit_action_list_t));
00145                 txd->undo_action_list = (txc_tx_undo_action_list_t *) MALLOC(sizeof(txc_tx_undo_action_list_t));
00146                 txd->commit_action_list->size = TXC_ACTION_LIST_SIZE;  
00147                 txd->undo_action_list->size = TXC_ACTION_LIST_SIZE; 
00148                 allocate_action_list_entries(txd->commit_action_list, 0);
00149                 allocate_action_list_entries(txd->undo_action_list, 0);
00150                 txc_buffer_linear_create(buffermgr, &(txd->buffer_linear));
00151         }
00152 
00153         (*txmgrp)->alloc_txd_num = 0;
00154         (*txmgrp)->alloc_txd_list_head = (*txmgrp)->alloc_txd_list_tail = NULL;
00155         (*txmgrp)->buffermgr = buffermgr;
00156         (*txmgrp)->statsmgr = statsmgr;
00157         TXC_MUTEX_INIT(&(*txmgrp)->mutex, NULL);
00158 
00159         return TXC_R_SUCCESS;
00160 }
00161 
00162 
00163 txc_result_t
00164 txc_txmgr_destroy(txc_txmgr_t **txmgrp)
00165 {
00166         txc_tx_t          *txd;
00167         txc_pool_object_t *pool_object;
00168 
00169         for (pool_object = txc_pool_object_first((*txmgrp)->pool_txd, 
00170                                                  TXC_POOL_OBJECT_ALLOCATED & 
00171                                                  TXC_POOL_OBJECT_FREE);
00172              pool_object != NULL;
00173                  pool_object = txc_pool_object_next(pool_object))
00174         {        
00175                 txd = (txc_tx_t *) txc_pool_object_of(pool_object);
00176                 txd->manager = NULL;
00177                 deallocate_action_list_entries(txd->commit_action_list);
00178                 deallocate_action_list_entries(txd->undo_action_list);
00179                 txc_sentinel_list_destroy(&(txd->sentinel_list));
00180                 txc_sentinel_list_destroy(&(txd->sentinel_list_preacquire));
00181                 txc_buffer_linear_destroy(&(txd->buffer_linear));
00182         }
00183 
00184         txc_pool_destroy(&(*txmgrp)->pool_txd);
00185         FREE(*txmgrp);
00186         *txmgrp = NULL;
00187 
00188         return TXC_R_SUCCESS;
00189 }
00190 
00191 
00192 static inline
00193 txc_result_t
00194 txc_tx_init(txc_tx_t *txd)
00195 {
00196         txd->sentinelmgr_commit_action_registered = 0;
00197         txd->sentinelmgr_undo_action_registered = 0;
00198         txd->statsmgr_commit_action_registered = 0;
00199         txd->statsmgr_undo_action_registered = 0;
00200         txd->generic_undo_action_registered = 0;
00201         txd->generic_commit_action_registered = 0;
00202         txd->abort_reason = TXC_ABORTREASON_TMCONFLICT;
00203         txd->commit_action_list->num_entries = 0;
00204         txd->undo_action_list->num_entries = 0;
00205         txc_buffer_linear_init(txd->buffer_linear);
00206 
00207         return TXC_R_SUCCESS;
00208 }
00209 
00210 
00211 txc_result_t
00212 txc_tx_create(txc_txmgr_t *txmgr, txc_tx_t **txdp)
00213 {
00214         txc_tx_t     *txd;
00215         txc_result_t result;
00216 
00217         if ((result = txc_pool_object_alloc(txmgr->pool_txd,(void **) &txd, 1)) 
00218             != TXC_R_SUCCESS) 
00219         {
00220                 TXC_INTERNALERROR("Could not create TX object\n");
00221                 return result;
00222         }
00223 
00224         TXC_MUTEX_LOCK(&(txmgr->mutex));
00225         if (txmgr->alloc_txd_list_head == NULL) {
00226                 TXC_ASSERT(txmgr->alloc_txd_list_tail == NULL);
00227                 txmgr->alloc_txd_list_head = txd;
00228                 txmgr->alloc_txd_list_tail = txd;
00229                 txd->next = NULL;
00230                 txd->prev = NULL;
00231         } else {
00232                 txmgr->alloc_txd_list_tail->next = txd;
00233                 txd->prev = txmgr->alloc_txd_list_tail;
00234                 txmgr->alloc_txd_list_tail = txd;
00235         }
00236         txmgr->alloc_txd_num++;
00237         TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00238 
00239         txd->tid_pthread = pthread_self();
00240         txd->forced_retries = 0;
00241         txc_tx_init(txd);
00242         txc_sentinel_list_init(txd->sentinel_list);
00243         txc_sentinel_list_init(txd->sentinel_list_preacquire);
00244 
00245         /* TM system specific initialization */
00246         tmsystem_tx_init(txd);
00247         
00248         txc_fm_init(txd);
00249 
00250 #ifdef _TXC_STATS_BUILD 
00251         if (txc_runtime_settings.statistics == TXC_BOOL_TRUE) {
00252                 txc_stats_threadstat_create(txd->manager->statsmgr, &(txd->threadstat), txd);
00253                 txc_stats_txstat_create(&(txd->txstat));
00254         }       
00255 #endif          
00256 
00257         *txdp = txd;
00258 
00259         return TXC_R_SUCCESS;
00260 }
00261 
00262 
00263 txc_result_t
00264 txc_tx_destroy(txc_tx_t **txdp)
00265 {
00266         txc_tx_t    *txd;
00267         txc_txmgr_t *txmgr;
00268 
00269         txd = *txdp;
00270         txmgr = txd->manager;
00271 
00272         TXC_MUTEX_LOCK(&(txmgr->mutex));
00273         TXC_ASSERT(txmgr->alloc_txd_list_head != NULL); 
00274         TXC_ASSERT(txmgr->alloc_txd_list_tail != NULL); 
00275 
00276         if (txmgr->alloc_txd_list_head == txd) {
00277                 txmgr->alloc_txd_list_head = txd->next;
00278         }
00279         if (txmgr->alloc_txd_list_tail == txd)  {
00280                 txmgr->alloc_txd_list_tail = txd->prev;
00281         }
00282         if (txd->prev) {
00283                 txd->prev->next = txd->next;    
00284         }
00285         if (txd->next) {
00286                 txd->next->prev = txd->prev;
00287         }       
00288 
00289         txmgr->alloc_txd_num--;
00290         TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00291 
00292         txc_pool_object_free(txmgr->pool_txd, (void **) (&txd), 1);
00293         *txdp = NULL;
00294 
00295         return TXC_R_SUCCESS;
00296 }
00297 
00298 
00299 void                                                                          
00300 txc_tx_abort_transaction(txc_tx_t *txd, txc_tx_abortreason_t abort_reason)
00301 {
00302         TXC_DEBUG_PRINT(TXC_DEBUG_TX, "ABORT\n");
00303         tmsystem_abort_transaction(txd, abort_reason);
00304 }
00305 
00306 
00307 static inline txc_result_t
00308 register_action(txc_tx_action_list_t *la, 
00309                 txc_tx_function_t function, void *args, int *error_result, int order) 
00310 {
00311         txc_result_t ret;
00312 
00313         if (la->num_entries == la->size) {
00314                 if ((ret = allocate_action_list_entries(la, 1)) != TXC_R_SUCCESS) {
00315                         return ret;
00316                 }       
00317         }
00318 
00319         la->entries[la->num_entries].function = function;
00320         la->entries[la->num_entries].args = args;
00321         la->entries[la->num_entries].error_result = error_result;
00322         la->entries[la->num_entries].order = order;
00323         la->num_entries++; 
00324 
00325         return TXC_R_SUCCESS;
00326 
00327 }
00328 
00329 
00330 txc_result_t
00331 txc_tx_register_commit_action(txc_tx_t *txd, 
00332                               txc_tx_commit_function_t function, 
00333                               void *args, int *error_result, int order) 
00334 {
00335         txc_result_t                ret;
00336         txc_tx_commit_action_list_t *la;
00337 
00338         la = txd->commit_action_list;
00339         if ((ret = register_action(la, function, args, error_result, order)) 
00340             != TXC_R_SUCCESS) 
00341         {
00342                 return ret;
00343         }
00344         tmsystem_register_generic_commit_action(txd);
00345 
00346         return TXC_R_SUCCESS;
00347 }
00348 
00349 
00350 txc_result_t
00351 txc_tx_register_undo_action(txc_tx_t *txd, 
00352                             txc_tx_undo_function_t function, 
00353                             void *args, int *error_result, int order) 
00354 {
00355         txc_result_t              ret;
00356         txc_tx_undo_action_list_t *la;
00357 
00358         la = txd->undo_action_list;
00359         if ((ret = register_action(la, function, args, error_result, order)) 
00360             != TXC_R_SUCCESS) 
00361         {
00362                 return ret;
00363         }
00364 
00365         tmsystem_register_generic_undo_action(txd);
00366 
00367         return TXC_R_SUCCESS;
00368 }
00369 
00370 
00371 static
00372 void
00373 tx_generic_undo_action(txc_tx_t *txd)
00374 {
00375         txc_tx_undo_action_list_entry_t *entry;
00376         int                             i;
00377         int                             order;
00378         int                             num_actions_executed;
00379         int                             temp_error_result;
00380         int                             first_error_result = 0;
00381 
00382         for (order = 0, num_actions_executed = 0; 
00383              num_actions_executed != txd->undo_action_list->num_entries; 
00384              order++) 
00385         {
00386                 for (i = txd->undo_action_list->num_entries-1; i >= 0; i--) {
00387                         entry = &txd->undo_action_list->entries[i];
00388                         if (entry->order == order) {
00389                                 /* 
00390                                  * It is possible entry->error_result be NULL because the caller
00391                                  * might have not passed any result variable. Thus, if we simply
00392                                  * pass entry->error_result we might not get informed about any 
00393                                  * error happpened. Temporarily save any error of the action in 
00394                                  * temp_error_result to be able to correctly take any actions 
00395                                  * needed in case of failure (e.g. informing the failure manager
00396                                  */
00397                                 temp_error_result = 0; 
00398                                 entry->function(entry->args, &temp_error_result);
00399                                 if (entry->error_result) {
00400                                         *(entry->error_result) = temp_error_result;
00401                                 }
00402                                 if (first_error_result == 0) {
00403                                         first_error_result = temp_error_result;
00404                                 }
00405                                 num_actions_executed++;
00406                         }       
00407                 }
00408         }
00409         txc_tx_init(txd);
00410         txc_fm_handle_undo_failure(txd, first_error_result);
00411 }
00412 
00413 
00414 static
00415 void
00416 tx_generic_commit_action(txc_tx_t *txd)
00417 {
00418         txc_tx_commit_action_list_entry_t *entry;
00419         int                               i;
00420         int                               order;
00421         int                               num_actions_executed;
00422         int                               temp_error_result;
00423         int                               first_error_result = 0;
00424 
00425         for (order = 0, num_actions_executed = 0; 
00426              num_actions_executed != txd->commit_action_list->num_entries; 
00427              order++) 
00428         {
00429                 for (i = 0; i < txd->commit_action_list->num_entries; i++) {
00430                         entry = &txd->commit_action_list->entries[i];
00431                         if (entry->order == order) {
00432                                 /* 
00433                                  * It is possible entry->error_result be NULL because the caller
00434                                  * might have not passed any result variable. Thus, if we simply
00435                                  * pass entry->error_result we might not get informed about any 
00436                                  * error happpened. Temporarily save any error of the action in 
00437                                  * temp_error_result to be able to correctly take any actions 
00438                                  * needed in case of failure (e.g. informing the failure manager
00439                                  */
00440                                 temp_error_result = 0; 
00441                                 entry->function(entry->args, &temp_error_result);
00442                                 if (entry->error_result) {
00443                                         *(entry->error_result) = temp_error_result;
00444                                 }
00445                                 if (first_error_result == 0) {
00446                                         first_error_result = temp_error_result;
00447                                 }
00448                                 num_actions_executed++;
00449                         }       
00450                 }
00451         }       
00452         txc_tx_init(txd);
00453         txd->forced_retries = 0;
00454         txc_fm_handle_commit_failure(txd, first_error_result);
00455 }
00456 
00457 
00458 txc_tx_t *
00459 txc_tx_get_txd()
00460 {
00461         return txc_l_txd;
00462 }
00463 
00464 
00465 unsigned int
00466 txc_tx_get_tid(txc_tx_t *txd)
00467 {
00468         if (txd != NULL) {
00469                 return txd->tid;
00470         } else {        
00471                 return 0;
00472         }       
00473 }
00474 
00475 
00476 unsigned int
00477 txc_tx_get_tid_pthread(txc_tx_t *txd)
00478 {
00479         if (txd != NULL) {
00480                 return txd->tid_pthread;
00481         } else {        
00482                 return 0;
00483         }       
00484 }
00485 
00486 
00487 txc_tx_xactstate_t
00488 txc_tx_get_xactstate(txc_tx_t *txd) 
00489 {
00490         if (txd != NULL) {
00491                 return tmsystem_get_xactstate(txd);
00492         } else {        
00493                 return TXC_XACTSTATE_NONTRANSACTIONAL;
00494         }       
00495 }   
00496 
00497 
00498 unsigned int
00499 txc_tx_get_forced_retries(txc_tx_t *txd)
00500 {
00501         return txd->forced_retries;
00502 }
00503 
00504 
00505 static
00506 void
00507 tx_pre_outerbegin(txc_tx_t * txd, const char *srcloc_str, txc_tx_srcloc_t *srcloc)
00508 {
00509         txc_tx_init(txd);
00510         txc_fm_init(txd);
00511 #ifdef _TXC_STATS_BUILD 
00512         if (txc_runtime_settings.statistics == TXC_BOOL_TRUE) {
00513                 txc_stats_txstat_init(txd->txstat, srcloc_str, srcloc);
00514         }       
00515 #endif  
00516         txd->forced_retries = 0;
00517 }
00518 
00519 
00520 static
00521 void
00522 tx_pre_innerbegin(txc_tx_t * txd, const char *srcloc_str, txc_tx_srcloc_t *srcloc)
00523 {
00524         /* do nothing */
00525 }
00526 
00527 
00528 void
00529 txc_tx_pre_begin(txc_tx_t * txd, const char *srcloc_str, txc_tx_srcloc_t *srcloc)
00530 {
00531         if (txc_tx_get_xactstate(txd) == TXC_XACTSTATE_NONTRANSACTIONAL) {
00532                 tx_pre_outerbegin(txd, srcloc_str, srcloc);
00533         } else {
00534                 tx_pre_innerbegin(txd, srcloc_str, srcloc);
00535         }
00536 }
00537 
00538 
00539 void
00540 txc_tx_post_begin(txc_tx_t * txd)
00541 {
00542         txc_sentinel_transaction_postbegin(txd);
00543 #ifdef _TXC_STATS_BUILD 
00544         if (txc_runtime_settings.statistics == TXC_BOOL_TRUE) {
00545                 txc_stats_transaction_postbegin(txd);
00546         }       
00547 #endif  
00548         txc_fm_transaction_postbegin(txd);
00549 }
00550 
00551 
00552 /*
00553  **************************************************************************
00554  ***                     DEBUGGING SUPPORT ROUTINES                     ***
00555  **************************************************************************
00556  */
00557 
00558 void 
00559 txc_txmgr_print(txc_txmgr_t *txmgr)
00560 {
00561         txc_tx_t *txd;
00562 
00563         TXC_MUTEX_LOCK(&(txmgr->mutex));
00564         fprintf(TXC_DEBUG_OUT, "TRANSACTION MANAGER: %p\n", txmgr); 
00565         fprintf(TXC_DEBUG_OUT, "\talloc_txd_list_head =  %p\n", txmgr->alloc_txd_list_head); 
00566         fprintf(TXC_DEBUG_OUT, "\talloc_txd_list_tail =  %p\n", txmgr->alloc_txd_list_tail); 
00567         fprintf(TXC_DEBUG_OUT, "========================================\n"); 
00568 
00569         for (txd = txmgr->alloc_txd_list_head; 
00570              txd != NULL; 
00571              txd = txd->next)
00572         {
00573                 fprintf(TXC_DEBUG_OUT, "Descriptor: %p\n", txd); 
00574                 fprintf(TXC_DEBUG_OUT, "\ttid: %u\n", txd->tid);
00575                 fprintf(TXC_DEBUG_OUT, "\ttid_pthread: 0x%x\n", txd->tid_pthread);
00576                 fprintf(TXC_DEBUG_OUT, "\tnext: %p\n", txd->next);
00577                 fprintf(TXC_DEBUG_OUT, "\tprev: %p\n", txd->prev);
00578         }
00579 
00580         fprintf(TXC_DEBUG_OUT, "\n");
00581         TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00582 }
00583 
00584 
00585 txc_result_t 
00586 txc_tx_exists(txc_txmgr_t *txmgr, txc_tx_t *txd)
00587 {
00588         txc_tx_t     *iter_txd;
00589         txc_result_t result;
00590 
00591         TXC_MUTEX_LOCK(&(txmgr->mutex));
00592         for (iter_txd = txmgr->alloc_txd_list_head; 
00593              iter_txd != NULL; 
00594              iter_txd = iter_txd->next)
00595         {
00596                 if (iter_txd == txd) {
00597                         result = TXC_R_SUCCESS;
00598                         goto done;
00599                 }
00600         }
00601 
00602         result = TXC_R_NOTEXISTS;
00603 done:
00604         TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00605         return result;
00606 }

Generated on Wed Dec 9 20:32:39 2009 for xCalls by  doxygen 1.4.7