00001
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <errno.h>
00010 #include <string.h>
00011 #include <stdlib.h>
00012 #include <misc/debug.h>
00013 #include <misc/errno.h>
00014 #include <core/tx.h>
00015 #include <core/config.h>
00016 #include <core/koa.h>
00017 #include <core/buffer.h>
00018 #include <core/txdesc.h>
00019 #include <core/stats.h>
00020 #include <libc/syscalls.h>
00021 #include <xcalls/xcalls.h>
00022
00023
00024 typedef struct x_write_commit_args_s x_write_commit_args_t;
00025
00026 struct x_write_commit_args_s {
00027 int fd;
00028 int nbyte;
00029 void *buf;
00030 };
00031
00032 static
00033 void
00034 x_write_pipe_commit(void *args, int *result)
00035 {
00036 x_write_commit_args_t *args_commit = (x_write_commit_args_t *) args;
00037 int local_result = 0;
00038
00039 if (txc_libc_write(args_commit->fd, args_commit->buf,
00040 args_commit->nbyte) < 0)
00041 {
00042 local_result = errno;
00043 }
00044 if (result) {
00045 *result = local_result;
00046 }
00047 }
00048
00049
00064 ssize_t
00065 XCALL_DEF(x_write_pipe)(int fd, const void *buf, size_t nbyte, int *result)
00066 {
00067 txc_tx_t *txd;
00068 txc_koamgr_t *koamgr = txc_g_koamgr;
00069 txc_koa_t *koa;
00070 txc_sentinel_t *sentinel;
00071 txc_result_t xret;
00072 int ret;
00073 void *deferred_data;
00074 x_write_commit_args_t *args_write_commit;
00075 int local_result;
00076
00077
00078 txd = txc_tx_get_txd();
00079
00080 switch(txc_tx_get_xactstate(txd)) {
00081 case TXC_XACTSTATE_TRANSACTIONAL_IRREVOCABLE:
00082 case TXC_XACTSTATE_NONTRANSACTIONAL:
00083 if ((ret = txc_libc_write(fd, buf, nbyte)) < 0) {
00084 local_result = errno;
00085 } else {
00086 local_result = 0;
00087 }
00088 goto done;
00089
00090 case TXC_XACTSTATE_TRANSACTIONAL_RETRYABLE:
00091 txc_koa_lock_fd(koamgr, fd);
00092 xret = txc_koa_lookup_fd2koa(koamgr, fd, &koa);
00093 if (xret == TXC_R_FAILURE) {
00094
00095
00096
00097
00098 txc_koa_unlock_fd(koamgr, fd);
00099 local_result = EBADF;
00100 ret = -1;
00101 goto done;
00102 }
00103 sentinel = txc_koa_get_sentinel(koa);
00104 xret = txc_sentinel_tryacquire(txd, sentinel,
00105 TXC_SENTINEL_ACQUIREONRETRY);
00106 txc_koa_unlock_fd(koamgr, fd);
00107 if (xret == TXC_R_BUSYSENTINEL) {
00108 txc_tx_abort_transaction(txd, TXC_ABORTREASON_BUSYSENTINEL);
00109 TXC_INTERNALERROR("Never gets here. Transaction abort failed.\n");
00110 }
00111
00112
00113 if ((args_write_commit = (x_write_commit_args_t *)
00114 txc_buffer_linear_malloc(txd->buffer_linear,
00115 sizeof(x_write_commit_args_t)))
00116 == NULL)
00117 {
00118 local_result = ENOMEM;
00119 ret = -1;
00120 goto done;
00121 }
00122 if ((deferred_data = (void *) txc_buffer_linear_malloc(txd->buffer_linear,
00123 sizeof(char) * nbyte))
00124 == NULL)
00125 {
00126 local_result = ENOMEM;
00127 ret = -1;
00128 goto error_handler;
00129 }
00130 memcpy (deferred_data, buf, nbyte);
00131 args_write_commit->nbyte = nbyte;
00132 args_write_commit->buf = deferred_data;
00133 args_write_commit->fd = fd;
00134 txc_tx_register_commit_action(txd, x_write_pipe_commit,
00135 (void *) args_write_commit, result,
00136 TXC_TX_REGULAR_COMMIT_ACTION_ORDER);
00137 local_result = 0;
00138 ret = args_write_commit->nbyte;
00139 txc_stats_txstat_increment(txd, XCALL, x_write_pipe, 1);
00140 goto done;
00141 default:
00142 TXC_INTERNALERROR("Unknown transaction state\n");
00143 }
00144
00145 error_handler:
00146 txc_buffer_linear_free(txd->buffer_linear,
00147 sizeof(x_write_commit_args_t));
00148 done:
00149 if (result) {
00150 *result = local_result;
00151 }
00152 return ret;
00153 }