00001
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <errno.h>
00010 #include <string.h>
00011 #include <stdlib.h>
00012 #include <sys/socket.h>
00013 #include <sys/types.h>
00014 #include <misc/debug.h>
00015 #include <misc/errno.h>
00016 #include <misc/generic_types.h>
00017 #include <core/tx.h>
00018 #include <core/config.h>
00019 #include <core/koa.h>
00020 #include <core/buffer.h>
00021 #include <libc/syscalls.h>
00022 #include <core/txdesc.h>
00023 #include <core/stats.h>
00024 #include <xcalls/xcalls.h>
00025 #include <netinet/in.h>
00026
00027 #define TXC_DGRAM_MAX_SIZE 1024*64
00028 #define TXC_CONTROLMSG_MAX_SIZE 1024*2
00029
00030 typedef struct txc_socket_msghdr_s txc_socket_msghdr_t;
00031
00032 struct txc_socket_msghdr_s {
00033 struct sockaddr_in msg_name;
00034 socklen_t msg_namelen;
00035 char *msg_iov_base;
00036 size_t msg_iov_len;
00037 void *msg_control;
00038 socklen_t msg_controllen;
00039 int msg_flags;
00040 };
00041
00042
00043
00044 typedef struct x_recvmsg_commit_undo_args_s x_recvmsg_commit_undo_args_t;
00045
00046 struct x_recvmsg_commit_undo_args_s {
00047 txc_koa_t *koa;
00048 txc_buffer_circular_t *buffer;
00049 };
00050
00051
00052 static
00053 void
00054 x_recvmsg_undo(void *args, int *result)
00055 {
00056 x_recvmsg_commit_undo_args_t *args_undo = (x_recvmsg_commit_undo_args_t *) args;
00057 int local_result = 0;
00058
00059 args_undo->buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00060 if (result) {
00061 *result = local_result;
00062 }
00063 }
00064
00065
00066 static
00067 void
00068 x_recvmsg_commit(void *args, int *result)
00069 {
00070 x_recvmsg_commit_undo_args_t *args_commit = (x_recvmsg_commit_undo_args_t *) args;
00071 int local_result = 0;
00072 txc_buffer_circular_t *buffer;
00073
00074 buffer = args_commit->buffer;
00075
00076 if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00077 buffer->primary_head = buffer->speculative_primary_head;
00078 buffer->primary_tail = buffer->speculative_primary_tail;
00079 } else if (buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00080 buffer->primary_head = buffer->speculative_secondary_head;
00081 buffer->primary_tail = buffer->speculative_secondary_tail;
00082 buffer->secondary_head = buffer->secondary_tail = 0;
00083 } else {
00084 buffer->primary_head = buffer->primary_tail = 0;
00085 buffer->secondary_head = buffer->secondary_tail = 0;
00086 }
00087 buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00088 if (result) {
00089 *result = local_result;
00090 }
00091 }
00092
00093
00094 static
00095 ssize_t
00096 __txc_recvmsg(txc_tx_t *txd, txc_bool_t speculative_read,
00097 int fd, struct msghdr *msg, int flags, int *result)
00098 {
00099 txc_koamgr_t *koamgr = txc_g_koamgr;
00100 txc_koa_t *koa;
00101 txc_sentinel_t *sentinel;
00102 txc_result_t xret;
00103 int ret;
00104 x_recvmsg_commit_undo_args_t *args_commit_undo;
00105 int local_result = 0;
00106 int store_in_primary_buffer;
00107 int store_in_secondary_buffer;
00108 int hdr_base_index;
00109 int needed_bytes;
00110 txc_buffer_circular_t *buffer;
00111 struct iovec temp_iovec;
00112 struct msghdr temp_msghdr;
00113 txc_socket_msghdr_t *temp_buffer_msghdr;
00114 ssize_t len;
00115 int s;
00116 int i;
00117
00118
00119
00120
00121
00122
00123
00124
00125 txc_koa_lock_fd(koamgr, fd);
00126 xret = txc_koa_lookup_fd2koa(koamgr, fd, &koa);
00127 if (xret == TXC_R_FAILURE) {
00128
00129
00130
00131
00132 txc_koa_unlock_fd(koamgr, fd);
00133 local_result = EBADF;
00134 ret = -1;
00135 goto done;
00136 }
00137 buffer = (txc_buffer_circular_t *) txc_koa_get_buffer(koa);
00138
00139 if (!speculative_read) {
00140 txc_koa_unlock_fd(koamgr, fd);
00141 if ((buffer->primary_tail != buffer->primary_head) ||
00142 (buffer->secondary_tail != buffer->secondary_head)) {
00143
00144 if (buffer->primary_tail != buffer->primary_head) {
00145
00146 hdr_base_index = buffer->primary_head;
00147 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00148 len = sizeof(txc_socket_msghdr_t) +
00149 temp_buffer_msghdr->msg_iov_len +
00150 temp_buffer_msghdr->msg_controllen;
00151 buffer->primary_head += len;
00152 } else {
00153
00154
00155
00156 hdr_base_index = buffer->secondary_head;
00157 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00158 len = sizeof(txc_socket_msghdr_t) +
00159 temp_buffer_msghdr->msg_iov_len +
00160 temp_buffer_msghdr->msg_controllen;
00161 buffer->secondary_head += len;
00162 buffer->primary_tail = buffer->secondary_tail;
00163 buffer->primary_head = buffer->secondary_head;
00164 buffer->secondary_head = buffer->secondary_tail = 0;
00165 }
00166
00167 } else {
00168 ret = txc_libc_recvmsg(fd, msg, flags);
00169 local_result = errno;
00170 goto done;
00171 }
00172 } else {
00173
00174 sentinel = txc_koa_get_sentinel(koa);
00175 xret = txc_sentinel_tryacquire(txd, sentinel,
00176 TXC_SENTINEL_ACQUIREONRETRY);
00177 txc_koa_unlock_fd(koamgr, fd);
00178 if (xret == TXC_R_BUSYSENTINEL) {
00179 txc_tx_abort_transaction(txd, TXC_ABORTREASON_BUSYSENTINEL);
00180 TXC_INTERNALERROR("Never gets here. Transaction abort failed.\n");
00181 }
00182
00183
00184 if (buffer->state == TXC_BUFFER_STATE_NON_SPECULATIVE) {
00185 if ((args_commit_undo = (x_recvmsg_commit_undo_args_t *)
00186 txc_buffer_linear_malloc(txd->buffer_linear,
00187 sizeof(x_recvmsg_commit_undo_args_t)))
00188 == NULL)
00189 {
00190 local_result = ENOMEM;
00191 ret = -1;
00192 goto done;
00193 }
00194 buffer->speculative_primary_head = buffer->primary_head;
00195 buffer->speculative_primary_tail = buffer->primary_tail;
00196 buffer->speculative_secondary_head = buffer->secondary_head;
00197 buffer->speculative_secondary_tail = buffer->secondary_tail;
00198 buffer->state = TXC_BUFFER_STATE_SPECULATIVE;
00199
00200 args_commit_undo->koa = koa;
00201 args_commit_undo->buffer = buffer;
00202
00203 txc_tx_register_commit_action(txd, x_recvmsg_commit,
00204 (void *) args_commit_undo, result,
00205 TXC_TX_REGULAR_COMMIT_ACTION_ORDER);
00206 txc_tx_register_undo_action(txd, x_recvmsg_undo,
00207 (void *) args_commit_undo, result,
00208 TXC_TX_REGULAR_UNDO_ACTION_ORDER);
00209 }
00210
00211 if (buffer->speculative_primary_tail != buffer->speculative_primary_head ||
00212 buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00213
00214 if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00215
00216 hdr_base_index = buffer->speculative_primary_head;
00217 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00218 len = sizeof(txc_socket_msghdr_t) +
00219 temp_buffer_msghdr->msg_iov_len +
00220 temp_buffer_msghdr->msg_controllen;
00221 buffer->speculative_primary_head += len;
00222 } else {
00223
00224 hdr_base_index = buffer->speculative_secondary_head;
00225 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00226 buffer->speculative_secondary_head += len;
00227 }
00228 } else {
00229
00230
00231
00232
00233
00234 needed_bytes = sizeof(txc_socket_msghdr_t) +
00235 TXC_DGRAM_MAX_SIZE + TXC_CONTROLMSG_MAX_SIZE;
00236 store_in_primary_buffer = store_in_secondary_buffer = 0;
00237 if (buffer->primary_tail < (buffer->size_max - needed_bytes)) {
00238
00239 hdr_base_index = buffer->primary_tail;
00240 store_in_primary_buffer = 1;
00241 } else if (buffer->secondary_tail <
00242 (buffer->primary_head + 1 - needed_bytes))
00243 {
00244
00245 hdr_base_index = buffer->secondary_tail;
00246 store_in_secondary_buffer = 1;
00247 } else {
00248
00249 local_result = ENOMEM;
00250 ret = -1;
00251 goto done;
00252 }
00253 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00254 temp_buffer_msghdr->msg_iov_base = (void *) (temp_buffer_msghdr +
00255 sizeof(txc_socket_msghdr_t));
00256 temp_buffer_msghdr->msg_control = (void *) (temp_buffer_msghdr +
00257 sizeof(txc_socket_msghdr_t) +
00258 TXC_DGRAM_MAX_SIZE);
00259 temp_iovec.iov_base = temp_buffer_msghdr->msg_iov_base;
00260 temp_iovec.iov_len = TXC_DGRAM_MAX_SIZE;
00261 temp_msghdr.msg_name = (void *) &(temp_buffer_msghdr->msg_name);
00262 temp_msghdr.msg_namelen = sizeof(struct sockaddr_in);
00263 temp_msghdr.msg_iov = &temp_iovec;
00264 temp_msghdr.msg_iovlen = 1;
00265 temp_msghdr.msg_control = temp_buffer_msghdr->msg_control;
00266 temp_msghdr.msg_controllen = TXC_CONTROLMSG_MAX_SIZE;
00267 ret = txc_libc_recvmsg(fd, &temp_msghdr, flags);
00268 if (ret > 0) {
00269 temp_buffer_msghdr->msg_iov_len = ret;
00270 temp_buffer_msghdr->msg_namelen = temp_msghdr.msg_namelen;
00271 temp_buffer_msghdr->msg_controllen = temp_msghdr.msg_controllen;
00272 temp_buffer_msghdr->msg_flags = temp_msghdr.msg_flags;
00273 if (store_in_primary_buffer) {
00274 buffer->primary_tail += sizeof(txc_socket_msghdr_t) +
00275 temp_buffer_msghdr->msg_iov_len +
00276 temp_buffer_msghdr->msg_controllen;
00277 } else if (store_in_secondary_buffer) {
00278 buffer->secondary_tail += sizeof(txc_socket_msghdr_t) +
00279 temp_buffer_msghdr->msg_iov_len +
00280 temp_buffer_msghdr->msg_controllen;
00281 } else {
00282 TXC_INTERNALERROR("Sanity check -- shouldn't come here.\n");
00283 }
00284
00285 if (temp_buffer_msghdr->msg_controllen > 0) {
00286 memcpy((char *) temp_buffer_msghdr->msg_iov_base + temp_buffer_msghdr->msg_iov_len,
00287 temp_buffer_msghdr->msg_control,
00288 temp_buffer_msghdr->msg_controllen);
00289 }
00290 }
00291 }
00292 txc_stats_txstat_increment(txd, XCALL, x_recvmsg, 1);
00293 }
00294
00295
00296 memcpy(msg->msg_name,
00297 (void *)&(temp_buffer_msghdr->msg_name),
00298 sizeof(struct sockaddr_in));
00299 msg->msg_namelen = temp_buffer_msghdr->msg_namelen;
00300 for (s=0, i=0; i<msg->msg_iovlen; i++) {
00301 memcpy(msg->msg_iov[i].iov_base,
00302 &temp_buffer_msghdr->msg_iov_base[s],
00303 msg->msg_iov[i].iov_len);
00304 s += msg->msg_iov[i].iov_len;
00305 }
00306 msg->msg_flags = temp_buffer_msghdr->msg_flags;
00307 msg->msg_controllen = temp_buffer_msghdr->msg_controllen;
00308 memcpy(msg->msg_control, temp_buffer_msghdr->msg_control,
00309 temp_buffer_msghdr->msg_controllen);
00310 ret = s;
00311 local_result = 0;
00312 done:
00313 if (result) {
00314 *result = local_result;
00315 }
00316 return ret;
00317 }
00318
00319
00320
00340 ssize_t
00341 XCALL_DEF(x_recvmsg)(int s, struct msghdr *msg, int flags, int *result)
00342 {
00343 txc_tx_t *txd;
00344 int ret;
00345
00346 txd = txc_tx_get_txd();
00347
00348 switch(txc_tx_get_xactstate(txd)) {
00349 case TXC_XACTSTATE_TRANSACTIONAL_IRREVOCABLE:
00350 case TXC_XACTSTATE_NONTRANSACTIONAL:
00351 ret = __txc_recvmsg(txd, TXC_BOOL_FALSE, s, msg, flags, result);
00352 break;
00353 case TXC_XACTSTATE_TRANSACTIONAL_RETRYABLE:
00354 ret = __txc_recvmsg(txd, TXC_BOOL_TRUE, s, msg, flags, result);
00355 break;
00356 default:
00357 TXC_INTERNALERROR("Unknown transaction state\n");
00358 }
00359 return ret;
00360 }