00001
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <errno.h>
00010 #include <string.h>
00011 #include <stdlib.h>
00012 #include <misc/debug.h>
00013 #include <misc/errno.h>
00014 #include <core/tx.h>
00015 #include <core/config.h>
00016 #include <core/koa.h>
00017 #include <core/buffer.h>
00018 #include <core/txdesc.h>
00019 #include <core/stats.h>
00020 #include <libc/syscalls.h>
00021 #include <xcalls/xcalls.h>
00022 #include <sys/types.h>
00023 #include <sys/socket.h>
00024
00025 typedef struct x_sendmsg_commit_args_s x_sendmsg_commit_args_t;
00026
00027 struct x_sendmsg_commit_args_s {
00028 int fd;
00029 int flags;
00030 struct msghdr msg;
00031 };
00032
00033 static
00034 void
00035 x_sendmsg_commit(void *args, int *result)
00036 {
00037 x_sendmsg_commit_args_t *args_commit = (x_sendmsg_commit_args_t *) args;
00038 int local_result = 0;
00039
00040 if (txc_libc_sendmsg(args_commit->fd, &args_commit->msg,
00041 args_commit->flags) < 0)
00042 {
00043 local_result = errno;
00044 }
00045 if (result) {
00046 *result = local_result;
00047 }
00048 }
00049
00050
00067 ssize_t
00068 XCALL_DEF(x_sendmsg)(int fd, const struct msghdr *msg, int flags, int *result)
00069 {
00070 txc_tx_t *txd;
00071 txc_koamgr_t *koamgr = txc_g_koamgr;
00072 txc_koa_t *koa;
00073 txc_sentinel_t *sentinel;
00074 txc_result_t xret;
00075 int ret;
00076 x_sendmsg_commit_args_t *args_commit;
00077 int local_result;
00078
00079
00080 txd = txc_tx_get_txd();
00081
00082 switch(txc_tx_get_xactstate(txd)) {
00083 case TXC_XACTSTATE_TRANSACTIONAL_IRREVOCABLE:
00084 case TXC_XACTSTATE_NONTRANSACTIONAL:
00085 if ((ret = txc_libc_sendmsg(fd, msg, flags)) < 0) {
00086 local_result = errno;
00087 } else {
00088 local_result = 0;
00089 }
00090 goto done;
00091
00092 case TXC_XACTSTATE_TRANSACTIONAL_RETRYABLE:
00093 txc_koa_lock_fd(koamgr, fd);
00094 xret = txc_koa_lookup_fd2koa(koamgr, fd, &koa);
00095 if (xret == TXC_R_FAILURE) {
00096
00097
00098
00099
00100 txc_koa_unlock_fd(koamgr, fd);
00101 local_result = EBADF;
00102 ret = -1;
00103 goto done;
00104 }
00105 sentinel = txc_koa_get_sentinel(koa);
00106 xret = txc_sentinel_tryacquire(txd, sentinel,
00107 TXC_SENTINEL_ACQUIREONRETRY);
00108 txc_koa_unlock_fd(koamgr, fd);
00109 if (xret == TXC_R_BUSYSENTINEL) {
00110 txc_tx_abort_transaction(txd, TXC_ABORTREASON_BUSYSENTINEL);
00111 TXC_INTERNALERROR("Never gets here. Transaction abort failed.\n");
00112 }
00113
00114
00115 if ((args_commit = (x_sendmsg_commit_args_t *)
00116 txc_buffer_linear_malloc(txd->buffer_linear,
00117 sizeof(x_sendmsg_commit_args_t)))
00118 == NULL)
00119 {
00120 local_result = ENOMEM;
00121 ret = -1;
00122 goto done;
00123 }
00124 memcpy(&args_commit->msg, msg, sizeof(struct msghdr));
00125 args_commit->fd = fd;
00126 args_commit->flags = flags;
00127 if ((args_commit->msg.msg_iov = (struct iovec *) txc_buffer_linear_malloc(txd->buffer_linear,
00128 msg->msg_iovlen))
00129 == NULL)
00130 {
00131 local_result = ENOMEM;
00132 ret = -1;
00133 goto error_handler;
00134 }
00135 if ((args_commit->msg.msg_control = (void *) txc_buffer_linear_malloc(txd->buffer_linear,
00136 msg->msg_controllen))
00137 == NULL)
00138 {
00139 local_result = ENOMEM;
00140 ret = -1;
00141 goto error_handler2;
00142 }
00143
00144 memcpy (args_commit->msg.msg_iov, msg->msg_iov, msg->msg_iovlen);
00145 memcpy (args_commit->msg.msg_control, msg->msg_control, msg->msg_controllen);
00146 txc_tx_register_commit_action(txd, x_sendmsg_commit,
00147 (void *) args_commit, result,
00148 TXC_TX_REGULAR_COMMIT_ACTION_ORDER);
00149 local_result = 0;
00150 txc_stats_txstat_increment(txd, XCALL, x_sendmsg, 1);
00151 goto done;
00152 default:
00153 TXC_INTERNALERROR("Unknown transaction state\n");
00154 }
00155
00156 error_handler2:
00157 txc_buffer_linear_free(txd->buffer_linear,
00158 msg->msg_iovlen);
00159 error_handler:
00160 txc_buffer_linear_free(txd->buffer_linear,
00161 sizeof(x_sendmsg_commit_args_t));
00162 done:
00163 if (result) {
00164 *result = local_result;
00165 }
00166 return ret;
00167 }