src/xcalls/x_recvmsg.c

Go to the documentation of this file.
00001 
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <errno.h>
00010 #include <string.h>
00011 #include <stdlib.h>
00012 #include <sys/socket.h>
00013 #include <sys/types.h>
00014 #include <misc/debug.h>
00015 #include <misc/errno.h>
00016 #include <misc/generic_types.h>
00017 #include <core/tx.h>
00018 #include <core/config.h>
00019 #include <core/koa.h>
00020 #include <core/buffer.h>
00021 #include <libc/syscalls.h>
00022 #include <core/txdesc.h>
00023 #include <core/stats.h>
00024 #include <xcalls/xcalls.h>
00025 #include <netinet/in.h>
00026 
00027 #define TXC_DGRAM_MAX_SIZE      1024*64
00028 #define TXC_CONTROLMSG_MAX_SIZE 1024*2
00029 
00030 typedef struct txc_socket_msghdr_s txc_socket_msghdr_t;
00031 
00032 struct txc_socket_msghdr_s {
00033         struct sockaddr_in msg_name;
00034         socklen_t          msg_namelen;
00035         char               *msg_iov_base;
00036         size_t             msg_iov_len;         /* length in bytes */
00037         void               *msg_control;
00038         socklen_t          msg_controllen;
00039         int                msg_flags;
00040 };
00041 
00042 
00043 
00044 typedef struct x_recvmsg_commit_undo_args_s x_recvmsg_commit_undo_args_t;
00045 
00046 struct x_recvmsg_commit_undo_args_s {
00047         txc_koa_t             *koa;
00048         txc_buffer_circular_t *buffer;
00049 };
00050 
00051 
00052 static
00053 void 
00054 x_recvmsg_undo(void *args, int *result) 
00055 {
00056         x_recvmsg_commit_undo_args_t *args_undo = (x_recvmsg_commit_undo_args_t *) args;
00057         int                          local_result = 0;
00058 
00059         args_undo->buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00060         if (result) {
00061                 *result = local_result;
00062         }
00063 }
00064 
00065 
00066 static 
00067 void
00068 x_recvmsg_commit(void *args, int *result)
00069 {
00070         x_recvmsg_commit_undo_args_t *args_commit = (x_recvmsg_commit_undo_args_t *) args;
00071         int                          local_result = 0;
00072         txc_buffer_circular_t        *buffer;
00073 
00074         buffer = args_commit->buffer;
00075 
00076         if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00077                 buffer->primary_head = buffer->speculative_primary_head; 
00078                 buffer->primary_tail = buffer->speculative_primary_tail; 
00079         } else if (buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00080                 buffer->primary_head = buffer->speculative_secondary_head; 
00081                 buffer->primary_tail = buffer->speculative_secondary_tail; 
00082                 buffer->secondary_head = buffer->secondary_tail = 0;
00083         } else {
00084                 buffer->primary_head = buffer->primary_tail = 0;
00085                 buffer->secondary_head = buffer->secondary_tail = 0;
00086         }
00087         buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00088         if (result) {
00089                 *result = local_result;
00090         }
00091 }
00092 
00093 
00094 static 
00095 ssize_t 
00096 __txc_recvmsg(txc_tx_t *txd, txc_bool_t speculative_read,
00097               int fd, struct msghdr *msg, int flags, int *result)
00098 {
00099         txc_koamgr_t                   *koamgr = txc_g_koamgr;
00100         txc_koa_t                      *koa;
00101         txc_sentinel_t                 *sentinel;
00102         txc_result_t                   xret;
00103         int                            ret;
00104         x_recvmsg_commit_undo_args_t  *args_commit_undo;
00105         int                            local_result = 0;
00106         int                            store_in_primary_buffer;
00107         int                            store_in_secondary_buffer;
00108         int                            hdr_base_index;
00109         int                            needed_bytes;
00110         txc_buffer_circular_t          *buffer;
00111         struct iovec                   temp_iovec;
00112         struct msghdr                  temp_msghdr;
00113         txc_socket_msghdr_t            *temp_buffer_msghdr;
00114         ssize_t                        len;
00115         int                            s;
00116         int                            i;
00117 
00118         
00119         /* 
00120          * FIXME: We don't handle the case where we need to consume more data 
00121          * than available in the buffer. In such a case you need to request
00122          * the additional data.
00123          */
00124 
00125         txc_koa_lock_fd(koamgr, fd);
00126         xret = txc_koa_lookup_fd2koa(koamgr, fd, &koa);
00127         if (xret == TXC_R_FAILURE) {
00128                 /* 
00129                  * The KOA mapped to the file descriptor has gone. Report
00130                  * this error as invalid file descriptor.
00131                  */
00132                 txc_koa_unlock_fd(koamgr, fd);
00133                 local_result = EBADF;
00134                 ret = -1;
00135                 goto done;
00136         }
00137         buffer = (txc_buffer_circular_t *) txc_koa_get_buffer(koa);
00138 
00139         if (!speculative_read) {
00140                 txc_koa_unlock_fd(koamgr, fd);
00141                 if ((buffer->primary_tail != buffer->primary_head) ||
00142                     (buffer->secondary_tail != buffer->secondary_head)) {
00143                         /* There are buffered data available */
00144                         if (buffer->primary_tail != buffer->primary_head) {
00145                                 /* Consume data from the primary buffer */
00146                                 hdr_base_index = buffer->primary_head;
00147                                 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00148                                 len = sizeof(txc_socket_msghdr_t) +
00149                                       temp_buffer_msghdr->msg_iov_len + 
00150                                       temp_buffer_msghdr->msg_controllen;
00151                                 buffer->primary_head += len;
00152                         } else {
00153                                 /* Consume data from the secondary buffer and make the secondary 
00154                                  * buffer primary 
00155                                  */
00156                                 hdr_base_index = buffer->secondary_head;
00157                                 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00158                                 len = sizeof(txc_socket_msghdr_t) +
00159                                       temp_buffer_msghdr->msg_iov_len + 
00160                                       temp_buffer_msghdr->msg_controllen;
00161                                 buffer->secondary_head += len;
00162                                 buffer->primary_tail = buffer->secondary_tail; 
00163                                 buffer->primary_head = buffer->secondary_head; 
00164                                 buffer->secondary_head = buffer->secondary_tail = 0;
00165                         }
00166                         /* Now fall through to do the copy */
00167                 } else { 
00168                         ret = txc_libc_recvmsg(fd, msg, flags);
00169                         local_result = errno;
00170                         goto done;
00171                 }
00172         } else {
00173                 /* READ is speculative */               
00174                 sentinel = txc_koa_get_sentinel(koa);
00175                 xret = txc_sentinel_tryacquire(txd, sentinel, 
00176                                                                            TXC_SENTINEL_ACQUIREONRETRY);
00177                 txc_koa_unlock_fd(koamgr, fd);
00178                 if (xret == TXC_R_BUSYSENTINEL) {
00179                         txc_tx_abort_transaction(txd, TXC_ABORTREASON_BUSYSENTINEL);
00180                         TXC_INTERNALERROR("Never gets here. Transaction abort failed.\n");
00181                 }
00182 
00183                 /* Got sentinel. Continue with the rest of the stuff. */
00184                 if (buffer->state == TXC_BUFFER_STATE_NON_SPECULATIVE) {
00185                         if ((args_commit_undo = (x_recvmsg_commit_undo_args_t *)
00186                                                                          txc_buffer_linear_malloc(txd->buffer_linear, 
00187                                                                                                                           sizeof(x_recvmsg_commit_undo_args_t)))
00188                                  == NULL)
00189                         {
00190                                 local_result = ENOMEM;
00191                                 ret = -1;
00192                                 goto done;
00193                         }
00194                         buffer->speculative_primary_head = buffer->primary_head;
00195                         buffer->speculative_primary_tail = buffer->primary_tail;
00196                         buffer->speculative_secondary_head = buffer->secondary_head;
00197                         buffer->speculative_secondary_tail = buffer->secondary_tail;
00198                         buffer->state = TXC_BUFFER_STATE_SPECULATIVE;
00199 
00200                         args_commit_undo->koa = koa;
00201                         args_commit_undo->buffer = buffer;
00202 
00203                         txc_tx_register_commit_action(txd, x_recvmsg_commit, 
00204                                                                                   (void *) args_commit_undo, result,
00205                                                                                   TXC_TX_REGULAR_COMMIT_ACTION_ORDER);
00206                         txc_tx_register_undo_action(txd, x_recvmsg_undo, 
00207                                                                                 (void *) args_commit_undo, result,
00208                                                                                 TXC_TX_REGULAR_UNDO_ACTION_ORDER);
00209                 }
00210 
00211                 if (buffer->speculative_primary_tail != buffer->speculative_primary_head ||
00212                                 buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00213                         /* There are buffered data available to consume*/
00214                         if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00215                                 /* Consume data from the primary buffer */
00216                                 hdr_base_index = buffer->speculative_primary_head;
00217                                 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00218                                 len = sizeof(txc_socket_msghdr_t) + 
00219                                       temp_buffer_msghdr->msg_iov_len + 
00220                                       temp_buffer_msghdr->msg_controllen;
00221                                 buffer->speculative_primary_head += len;
00222                         } else {
00223                                 /* Consume data from the secondary buffer */
00224                                 hdr_base_index = buffer->speculative_secondary_head;
00225                                 temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00226                                 buffer->speculative_secondary_head += len;
00227                         }
00228                 } else { 
00229                         /* No buffered data available 
00230                          * 1) Find space in the buffer
00231                          * 3) Issue a read call to the kernel to bring the data in the buffer
00232                          * 4) Copy the buffered data back to the application buffer
00233                          */
00234                         needed_bytes = sizeof(txc_socket_msghdr_t) + 
00235                                        TXC_DGRAM_MAX_SIZE + TXC_CONTROLMSG_MAX_SIZE;
00236                         store_in_primary_buffer = store_in_secondary_buffer = 0;
00237                         if (buffer->primary_tail < (buffer->size_max - needed_bytes)) {
00238                                 /* There is space in the primary part of the buffer */
00239                                 hdr_base_index = buffer->primary_tail;
00240                                 store_in_primary_buffer = 1;
00241                         } else if (buffer->secondary_tail < 
00242                                    (buffer->primary_head + 1 - needed_bytes)) 
00243                         {
00244                                 /* There is space in the secondary part of the buffer */
00245                                 hdr_base_index = buffer->secondary_tail;
00246                                 store_in_secondary_buffer = 1;
00247                         } else {
00248                                 /* There is not space in the buffer */
00249                                 local_result = ENOMEM;
00250                                 ret = -1;
00251                                 goto done;
00252                         }
00253                         temp_buffer_msghdr = (txc_socket_msghdr_t *) &(buffer->buf[hdr_base_index]);
00254                         temp_buffer_msghdr->msg_iov_base =      (void *) (temp_buffer_msghdr + 
00255                                                                       sizeof(txc_socket_msghdr_t));
00256                         temp_buffer_msghdr->msg_control = (void *) (temp_buffer_msghdr + 
00257                                                                     sizeof(txc_socket_msghdr_t) + 
00258                                                                     TXC_DGRAM_MAX_SIZE);
00259                         temp_iovec.iov_base = temp_buffer_msghdr->msg_iov_base; 
00260                         temp_iovec.iov_len = TXC_DGRAM_MAX_SIZE;
00261                         temp_msghdr.msg_name = (void *) &(temp_buffer_msghdr->msg_name);
00262                         temp_msghdr.msg_namelen = sizeof(struct sockaddr_in);
00263                         temp_msghdr.msg_iov = &temp_iovec;
00264                         temp_msghdr.msg_iovlen = 1;
00265                         temp_msghdr.msg_control = temp_buffer_msghdr->msg_control;
00266                         temp_msghdr.msg_controllen = TXC_CONTROLMSG_MAX_SIZE;
00267                         ret = txc_libc_recvmsg(fd, &temp_msghdr, flags);
00268                         if (ret > 0) {
00269                                 temp_buffer_msghdr->msg_iov_len = ret;
00270                                 temp_buffer_msghdr->msg_namelen = temp_msghdr.msg_namelen;
00271                                 temp_buffer_msghdr->msg_controllen = temp_msghdr.msg_controllen;
00272                                 temp_buffer_msghdr->msg_flags = temp_msghdr.msg_flags;
00273                                 if (store_in_primary_buffer) {
00274                                         buffer->primary_tail += sizeof(txc_socket_msghdr_t) +
00275                                                                 temp_buffer_msghdr->msg_iov_len +
00276                                                                 temp_buffer_msghdr->msg_controllen;     
00277                                 } else if (store_in_secondary_buffer) {
00278                                         buffer->secondary_tail += sizeof(txc_socket_msghdr_t) +
00279                                                                   temp_buffer_msghdr->msg_iov_len +
00280                                                                   temp_buffer_msghdr->msg_controllen;   
00281                                 } else {
00282                                         TXC_INTERNALERROR("Sanity check -- shouldn't come here.\n");
00283                                 }
00284                                 /* shift control message to the end of the msg_data */
00285                                 if (temp_buffer_msghdr->msg_controllen > 0) {
00286                                         memcpy((char *) temp_buffer_msghdr->msg_iov_base + temp_buffer_msghdr->msg_iov_len, 
00287                                                temp_buffer_msghdr->msg_control,
00288                                                temp_buffer_msghdr->msg_controllen);
00289                                 }
00290                         }
00291                 }
00292                 txc_stats_txstat_increment(txd, XCALL, x_recvmsg, 1);
00293         }
00294 
00295         /* Copy data from the buffer back to the application buffer */
00296         memcpy(msg->msg_name, 
00297                (void *)&(temp_buffer_msghdr->msg_name), 
00298                sizeof(struct sockaddr_in));
00299         msg->msg_namelen = temp_buffer_msghdr->msg_namelen;
00300         for (s=0, i=0; i<msg->msg_iovlen; i++) {
00301                 memcpy(msg->msg_iov[i].iov_base, 
00302                        &temp_buffer_msghdr->msg_iov_base[s],
00303                        msg->msg_iov[i].iov_len);
00304                 s += msg->msg_iov[i].iov_len;
00305         }
00306         msg->msg_flags = temp_buffer_msghdr->msg_flags;
00307         msg->msg_controllen = temp_buffer_msghdr->msg_controllen;
00308         memcpy(msg->msg_control, temp_buffer_msghdr->msg_control, 
00309                temp_buffer_msghdr->msg_controllen);
00310         ret = s;
00311         local_result = 0;
00312 done:
00313         if (result) {
00314                 *result = local_result;
00315         }
00316         return ret;
00317 }
00318 
00319 
00320 
00340 ssize_t 
00341 XCALL_DEF(x_recvmsg)(int s, struct msghdr *msg, int flags, int *result)
00342 {
00343         txc_tx_t *txd;
00344         int      ret;
00345 
00346         txd = txc_tx_get_txd();
00347 
00348         switch(txc_tx_get_xactstate(txd)) {
00349                 case TXC_XACTSTATE_TRANSACTIONAL_IRREVOCABLE:
00350                 case TXC_XACTSTATE_NONTRANSACTIONAL:
00351                         ret = __txc_recvmsg(txd, TXC_BOOL_FALSE, s, msg, flags, result);
00352                         break;
00353                 case TXC_XACTSTATE_TRANSACTIONAL_RETRYABLE:
00354                         ret = __txc_recvmsg(txd, TXC_BOOL_TRUE, s, msg, flags, result);
00355                         break;
00356                 default:
00357                         TXC_INTERNALERROR("Unknown transaction state\n");
00358         }
00359         return ret;
00360 }

Generated on Wed Dec 9 20:32:39 2009 for xCalls by  doxygen 1.4.7