00001
00010 #include <sys/time.h>
00011 #include <errno.h>
00012 #include <pthread.h>
00013 #include <stdio.h>
00014 #include <stdlib.h>
00015 #include <setjmp.h>
00016 #include <assert.h>
00017 #include <sched.h>
00018 #include <misc/debug.h>
00019 #include <misc/pool.h>
00020 #include <misc/malloc.h>
00021 #include <misc/mutex.h>
00022 #include <core/config.h>
00023 #include <core/sentinel.h>
00024 #include <core/buffer.h>
00025 #include <core/koa.h>
00026 #include <core/fm.h>
00027 #include <core/tx.h>
00028 #include <core/txdesc.h>
00029
00030
00031 static void tx_generic_undo_action(txc_tx_t *);
00032 static void tx_generic_commit_action(txc_tx_t *);
00033
00034
00035 __thread txc_tx_t *txc_l_txd;
00036
00037
00038
00039 struct txc_tx_action_list_entry_s {
00040 txc_tx_function_t function;
00041 void *args;
00042 int *error_result;
00043 int order;
00044 };
00045
00046
00047 struct txc_tx_action_list_s {
00048 txc_tx_action_list_entry_t *entries;
00049 unsigned int num_entries;
00050 unsigned int size;
00051 };
00052
00053
00054 struct txc_txmgr_s {
00055 txc_mutex_t mutex;
00056 unsigned int alloc_txd_num;
00057 txc_tx_t *alloc_txd_list_head;
00058 txc_tx_t *alloc_txd_list_tail;
00059 txc_pool_t *pool_txd;
00060 txc_buffermgr_t *buffermgr;
00061 txc_statsmgr_t *statsmgr;
00062 };
00063
00064
00065
00066 #include <tm/itm.h>
00067 #include <tm/logtm.h>
00068
00069 txc_txmgr_t *txc_g_txmgr;
00070
00071 static inline
00072 txc_result_t
00073 allocate_action_list_entries(txc_tx_action_list_t *la, int extend)
00074 {
00075 if (extend) {
00076
00077 la->size *= 2;
00078 if ((la->entries =
00079 (txc_tx_action_list_entry_t *) REALLOC (la->entries,
00080 la->size * sizeof(txc_tx_action_list_entry_t)))
00081 == NULL)
00082 {
00083 return TXC_R_NOMEMORY;
00084 }
00085 } else {
00086
00087 if ((la->entries =
00088 (txc_tx_action_list_entry_t *) MALLOC (la->size * sizeof(txc_tx_action_list_entry_t)))
00089 == NULL)
00090 {
00091 return TXC_R_NOMEMORY;
00092 }
00093 }
00094 return TXC_R_SUCCESS;
00095 }
00096
00097
00098 static inline
00099 txc_result_t
00100 deallocate_action_list_entries(txc_tx_action_list_t *la)
00101 {
00102 FREE(la->entries);
00103 return TXC_R_SUCCESS;
00104 }
00105
00106
00107 txc_result_t
00108 txc_txmgr_create(txc_txmgr_t **txmgrp,
00109 txc_buffermgr_t *buffermgr,
00110 txc_sentinelmgr_t *sentinelmgr,
00111 txc_statsmgr_t *statsmgr)
00112 {
00113 txc_result_t result;
00114 txc_pool_object_t *pool_object;
00115 txc_tx_t *txd;
00116
00117 *txmgrp = (txc_txmgr_t *) MALLOC(sizeof(txc_txmgr_t));
00118 if (*txmgrp == NULL) {
00119 return TXC_R_NOMEMORY;
00120 }
00121 if ((result = txc_pool_create(&((*txmgrp)->pool_txd),
00122 sizeof(txc_tx_t),
00123 TXC_MAX_NUM_THREADS,
00124 NULL)) != TXC_R_SUCCESS)
00125 {
00126 FREE(*txmgrp);
00127 return result;
00128 }
00129
00130
00131
00132
00133
00134 for (pool_object = txc_pool_object_first((*txmgrp)->pool_txd,
00135 TXC_POOL_OBJECT_ALLOCATED &
00136 TXC_POOL_OBJECT_FREE);
00137 pool_object != NULL;
00138 pool_object = txc_pool_object_next(pool_object))
00139 {
00140 txd = (txc_tx_t *) txc_pool_object_of(pool_object);
00141 txd->manager = *txmgrp;
00142 txc_sentinel_list_create(sentinelmgr, &(txd->sentinel_list));
00143 txc_sentinel_list_create(sentinelmgr, &(txd->sentinel_list_preacquire));
00144 txd->commit_action_list = (txc_tx_commit_action_list_t *) MALLOC(sizeof(txc_tx_commit_action_list_t));
00145 txd->undo_action_list = (txc_tx_undo_action_list_t *) MALLOC(sizeof(txc_tx_undo_action_list_t));
00146 txd->commit_action_list->size = TXC_ACTION_LIST_SIZE;
00147 txd->undo_action_list->size = TXC_ACTION_LIST_SIZE;
00148 allocate_action_list_entries(txd->commit_action_list, 0);
00149 allocate_action_list_entries(txd->undo_action_list, 0);
00150 txc_buffer_linear_create(buffermgr, &(txd->buffer_linear));
00151 }
00152
00153 (*txmgrp)->alloc_txd_num = 0;
00154 (*txmgrp)->alloc_txd_list_head = (*txmgrp)->alloc_txd_list_tail = NULL;
00155 (*txmgrp)->buffermgr = buffermgr;
00156 (*txmgrp)->statsmgr = statsmgr;
00157 TXC_MUTEX_INIT(&(*txmgrp)->mutex, NULL);
00158
00159 return TXC_R_SUCCESS;
00160 }
00161
00162
00163 txc_result_t
00164 txc_txmgr_destroy(txc_txmgr_t **txmgrp)
00165 {
00166 txc_tx_t *txd;
00167 txc_pool_object_t *pool_object;
00168
00169 for (pool_object = txc_pool_object_first((*txmgrp)->pool_txd,
00170 TXC_POOL_OBJECT_ALLOCATED &
00171 TXC_POOL_OBJECT_FREE);
00172 pool_object != NULL;
00173 pool_object = txc_pool_object_next(pool_object))
00174 {
00175 txd = (txc_tx_t *) txc_pool_object_of(pool_object);
00176 txd->manager = NULL;
00177 deallocate_action_list_entries(txd->commit_action_list);
00178 deallocate_action_list_entries(txd->undo_action_list);
00179 txc_sentinel_list_destroy(&(txd->sentinel_list));
00180 txc_sentinel_list_destroy(&(txd->sentinel_list_preacquire));
00181 txc_buffer_linear_destroy(&(txd->buffer_linear));
00182 }
00183
00184 txc_pool_destroy(&(*txmgrp)->pool_txd);
00185 FREE(*txmgrp);
00186 *txmgrp = NULL;
00187
00188 return TXC_R_SUCCESS;
00189 }
00190
00191
00192 static inline
00193 txc_result_t
00194 txc_tx_init(txc_tx_t *txd)
00195 {
00196 txd->sentinelmgr_commit_action_registered = 0;
00197 txd->sentinelmgr_undo_action_registered = 0;
00198 txd->statsmgr_commit_action_registered = 0;
00199 txd->statsmgr_undo_action_registered = 0;
00200 txd->generic_undo_action_registered = 0;
00201 txd->generic_commit_action_registered = 0;
00202 txd->abort_reason = TXC_ABORTREASON_TMCONFLICT;
00203 txd->commit_action_list->num_entries = 0;
00204 txd->undo_action_list->num_entries = 0;
00205 txc_buffer_linear_init(txd->buffer_linear);
00206
00207 return TXC_R_SUCCESS;
00208 }
00209
00210
00211 txc_result_t
00212 txc_tx_create(txc_txmgr_t *txmgr, txc_tx_t **txdp)
00213 {
00214 txc_tx_t *txd;
00215 txc_result_t result;
00216
00217 if ((result = txc_pool_object_alloc(txmgr->pool_txd,(void **) &txd, 1))
00218 != TXC_R_SUCCESS)
00219 {
00220 TXC_INTERNALERROR("Could not create TX object\n");
00221 return result;
00222 }
00223
00224 TXC_MUTEX_LOCK(&(txmgr->mutex));
00225 if (txmgr->alloc_txd_list_head == NULL) {
00226 TXC_ASSERT(txmgr->alloc_txd_list_tail == NULL);
00227 txmgr->alloc_txd_list_head = txd;
00228 txmgr->alloc_txd_list_tail = txd;
00229 txd->next = NULL;
00230 txd->prev = NULL;
00231 } else {
00232 txmgr->alloc_txd_list_tail->next = txd;
00233 txd->prev = txmgr->alloc_txd_list_tail;
00234 txmgr->alloc_txd_list_tail = txd;
00235 }
00236 txmgr->alloc_txd_num++;
00237 TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00238
00239 txd->tid_pthread = pthread_self();
00240 txd->forced_retries = 0;
00241 txc_tx_init(txd);
00242 txc_sentinel_list_init(txd->sentinel_list);
00243 txc_sentinel_list_init(txd->sentinel_list_preacquire);
00244
00245
00246 tmsystem_tx_init(txd);
00247
00248 txc_fm_init(txd);
00249
00250 #ifdef _TXC_STATS_BUILD
00251 if (txc_runtime_settings.statistics == TXC_BOOL_TRUE) {
00252 txc_stats_threadstat_create(txd->manager->statsmgr, &(txd->threadstat), txd);
00253 txc_stats_txstat_create(&(txd->txstat));
00254 }
00255 #endif
00256
00257 *txdp = txd;
00258
00259 return TXC_R_SUCCESS;
00260 }
00261
00262
00263 txc_result_t
00264 txc_tx_destroy(txc_tx_t **txdp)
00265 {
00266 txc_tx_t *txd;
00267 txc_txmgr_t *txmgr;
00268
00269 txd = *txdp;
00270 txmgr = txd->manager;
00271
00272 TXC_MUTEX_LOCK(&(txmgr->mutex));
00273 TXC_ASSERT(txmgr->alloc_txd_list_head != NULL);
00274 TXC_ASSERT(txmgr->alloc_txd_list_tail != NULL);
00275
00276 if (txmgr->alloc_txd_list_head == txd) {
00277 txmgr->alloc_txd_list_head = txd->next;
00278 }
00279 if (txmgr->alloc_txd_list_tail == txd) {
00280 txmgr->alloc_txd_list_tail = txd->prev;
00281 }
00282 if (txd->prev) {
00283 txd->prev->next = txd->next;
00284 }
00285 if (txd->next) {
00286 txd->next->prev = txd->prev;
00287 }
00288
00289 txmgr->alloc_txd_num--;
00290 TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00291
00292 txc_pool_object_free(txmgr->pool_txd, (void **) (&txd), 1);
00293 *txdp = NULL;
00294
00295 return TXC_R_SUCCESS;
00296 }
00297
00298
00299 void
00300 txc_tx_abort_transaction(txc_tx_t *txd, txc_tx_abortreason_t abort_reason)
00301 {
00302 TXC_DEBUG_PRINT(TXC_DEBUG_TX, "ABORT\n");
00303 tmsystem_abort_transaction(txd, abort_reason);
00304 }
00305
00306
00307 static inline txc_result_t
00308 register_action(txc_tx_action_list_t *la,
00309 txc_tx_function_t function, void *args, int *error_result, int order)
00310 {
00311 txc_result_t ret;
00312
00313 if (la->num_entries == la->size) {
00314 if ((ret = allocate_action_list_entries(la, 1)) != TXC_R_SUCCESS) {
00315 return ret;
00316 }
00317 }
00318
00319 la->entries[la->num_entries].function = function;
00320 la->entries[la->num_entries].args = args;
00321 la->entries[la->num_entries].error_result = error_result;
00322 la->entries[la->num_entries].order = order;
00323 la->num_entries++;
00324
00325 return TXC_R_SUCCESS;
00326
00327 }
00328
00329
00330 txc_result_t
00331 txc_tx_register_commit_action(txc_tx_t *txd,
00332 txc_tx_commit_function_t function,
00333 void *args, int *error_result, int order)
00334 {
00335 txc_result_t ret;
00336 txc_tx_commit_action_list_t *la;
00337
00338 la = txd->commit_action_list;
00339 if ((ret = register_action(la, function, args, error_result, order))
00340 != TXC_R_SUCCESS)
00341 {
00342 return ret;
00343 }
00344 tmsystem_register_generic_commit_action(txd);
00345
00346 return TXC_R_SUCCESS;
00347 }
00348
00349
00350 txc_result_t
00351 txc_tx_register_undo_action(txc_tx_t *txd,
00352 txc_tx_undo_function_t function,
00353 void *args, int *error_result, int order)
00354 {
00355 txc_result_t ret;
00356 txc_tx_undo_action_list_t *la;
00357
00358 la = txd->undo_action_list;
00359 if ((ret = register_action(la, function, args, error_result, order))
00360 != TXC_R_SUCCESS)
00361 {
00362 return ret;
00363 }
00364
00365 tmsystem_register_generic_undo_action(txd);
00366
00367 return TXC_R_SUCCESS;
00368 }
00369
00370
00371 static
00372 void
00373 tx_generic_undo_action(txc_tx_t *txd)
00374 {
00375 txc_tx_undo_action_list_entry_t *entry;
00376 int i;
00377 int order;
00378 int num_actions_executed;
00379 int temp_error_result;
00380 int first_error_result = 0;
00381
00382 for (order = 0, num_actions_executed = 0;
00383 num_actions_executed != txd->undo_action_list->num_entries;
00384 order++)
00385 {
00386 for (i = txd->undo_action_list->num_entries-1; i >= 0; i--) {
00387 entry = &txd->undo_action_list->entries[i];
00388 if (entry->order == order) {
00389
00390
00391
00392
00393
00394
00395
00396
00397 temp_error_result = 0;
00398 entry->function(entry->args, &temp_error_result);
00399 if (entry->error_result) {
00400 *(entry->error_result) = temp_error_result;
00401 }
00402 if (first_error_result == 0) {
00403 first_error_result = temp_error_result;
00404 }
00405 num_actions_executed++;
00406 }
00407 }
00408 }
00409 txc_tx_init(txd);
00410 txc_fm_handle_undo_failure(txd, first_error_result);
00411 }
00412
00413
00414 static
00415 void
00416 tx_generic_commit_action(txc_tx_t *txd)
00417 {
00418 txc_tx_commit_action_list_entry_t *entry;
00419 int i;
00420 int order;
00421 int num_actions_executed;
00422 int temp_error_result;
00423 int first_error_result = 0;
00424
00425 for (order = 0, num_actions_executed = 0;
00426 num_actions_executed != txd->commit_action_list->num_entries;
00427 order++)
00428 {
00429 for (i = 0; i < txd->commit_action_list->num_entries; i++) {
00430 entry = &txd->commit_action_list->entries[i];
00431 if (entry->order == order) {
00432
00433
00434
00435
00436
00437
00438
00439
00440 temp_error_result = 0;
00441 entry->function(entry->args, &temp_error_result);
00442 if (entry->error_result) {
00443 *(entry->error_result) = temp_error_result;
00444 }
00445 if (first_error_result == 0) {
00446 first_error_result = temp_error_result;
00447 }
00448 num_actions_executed++;
00449 }
00450 }
00451 }
00452 txc_tx_init(txd);
00453 txd->forced_retries = 0;
00454 txc_fm_handle_commit_failure(txd, first_error_result);
00455 }
00456
00457
00458 txc_tx_t *
00459 txc_tx_get_txd()
00460 {
00461 return txc_l_txd;
00462 }
00463
00464
00465 unsigned int
00466 txc_tx_get_tid(txc_tx_t *txd)
00467 {
00468 if (txd != NULL) {
00469 return txd->tid;
00470 } else {
00471 return 0;
00472 }
00473 }
00474
00475
00476 unsigned int
00477 txc_tx_get_tid_pthread(txc_tx_t *txd)
00478 {
00479 if (txd != NULL) {
00480 return txd->tid_pthread;
00481 } else {
00482 return 0;
00483 }
00484 }
00485
00486
00487 txc_tx_xactstate_t
00488 txc_tx_get_xactstate(txc_tx_t *txd)
00489 {
00490 if (txd != NULL) {
00491 return tmsystem_get_xactstate(txd);
00492 } else {
00493 return TXC_XACTSTATE_NONTRANSACTIONAL;
00494 }
00495 }
00496
00497
00498 unsigned int
00499 txc_tx_get_forced_retries(txc_tx_t *txd)
00500 {
00501 return txd->forced_retries;
00502 }
00503
00504
00505 static
00506 void
00507 tx_pre_outerbegin(txc_tx_t * txd, const char *srcloc_str, txc_tx_srcloc_t *srcloc)
00508 {
00509 txc_tx_init(txd);
00510 txc_fm_init(txd);
00511 #ifdef _TXC_STATS_BUILD
00512 if (txc_runtime_settings.statistics == TXC_BOOL_TRUE) {
00513 txc_stats_txstat_init(txd->txstat, srcloc_str, srcloc);
00514 }
00515 #endif
00516 txd->forced_retries = 0;
00517 }
00518
00519
00520 static
00521 void
00522 tx_pre_innerbegin(txc_tx_t * txd, const char *srcloc_str, txc_tx_srcloc_t *srcloc)
00523 {
00524
00525 }
00526
00527
00528 void
00529 txc_tx_pre_begin(txc_tx_t * txd, const char *srcloc_str, txc_tx_srcloc_t *srcloc)
00530 {
00531 if (txc_tx_get_xactstate(txd) == TXC_XACTSTATE_NONTRANSACTIONAL) {
00532 tx_pre_outerbegin(txd, srcloc_str, srcloc);
00533 } else {
00534 tx_pre_innerbegin(txd, srcloc_str, srcloc);
00535 }
00536 }
00537
00538
00539 void
00540 txc_tx_post_begin(txc_tx_t * txd)
00541 {
00542 txc_sentinel_transaction_postbegin(txd);
00543 #ifdef _TXC_STATS_BUILD
00544 if (txc_runtime_settings.statistics == TXC_BOOL_TRUE) {
00545 txc_stats_transaction_postbegin(txd);
00546 }
00547 #endif
00548 txc_fm_transaction_postbegin(txd);
00549 }
00550
00551
00552
00553
00554
00555
00556
00557
00558 void
00559 txc_txmgr_print(txc_txmgr_t *txmgr)
00560 {
00561 txc_tx_t *txd;
00562
00563 TXC_MUTEX_LOCK(&(txmgr->mutex));
00564 fprintf(TXC_DEBUG_OUT, "TRANSACTION MANAGER: %p\n", txmgr);
00565 fprintf(TXC_DEBUG_OUT, "\talloc_txd_list_head = %p\n", txmgr->alloc_txd_list_head);
00566 fprintf(TXC_DEBUG_OUT, "\talloc_txd_list_tail = %p\n", txmgr->alloc_txd_list_tail);
00567 fprintf(TXC_DEBUG_OUT, "========================================\n");
00568
00569 for (txd = txmgr->alloc_txd_list_head;
00570 txd != NULL;
00571 txd = txd->next)
00572 {
00573 fprintf(TXC_DEBUG_OUT, "Descriptor: %p\n", txd);
00574 fprintf(TXC_DEBUG_OUT, "\ttid: %u\n", txd->tid);
00575 fprintf(TXC_DEBUG_OUT, "\ttid_pthread: 0x%x\n", txd->tid_pthread);
00576 fprintf(TXC_DEBUG_OUT, "\tnext: %p\n", txd->next);
00577 fprintf(TXC_DEBUG_OUT, "\tprev: %p\n", txd->prev);
00578 }
00579
00580 fprintf(TXC_DEBUG_OUT, "\n");
00581 TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00582 }
00583
00584
00585 txc_result_t
00586 txc_tx_exists(txc_txmgr_t *txmgr, txc_tx_t *txd)
00587 {
00588 txc_tx_t *iter_txd;
00589 txc_result_t result;
00590
00591 TXC_MUTEX_LOCK(&(txmgr->mutex));
00592 for (iter_txd = txmgr->alloc_txd_list_head;
00593 iter_txd != NULL;
00594 iter_txd = iter_txd->next)
00595 {
00596 if (iter_txd == txd) {
00597 result = TXC_R_SUCCESS;
00598 goto done;
00599 }
00600 }
00601
00602 result = TXC_R_NOTEXISTS;
00603 done:
00604 TXC_MUTEX_UNLOCK(&(txmgr->mutex));
00605 return result;
00606 }