src/xcalls/x_read_pipe.c

Go to the documentation of this file.
00001 
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <errno.h>
00010 #include <string.h>
00011 #include <stdlib.h>
00012 #include <misc/debug.h>
00013 #include <misc/errno.h>
00014 #include <misc/generic_types.h>
00015 #include <core/tx.h>
00016 #include <core/config.h>
00017 #include <core/koa.h>
00018 #include <core/buffer.h>
00019 #include <core/stats.h>
00020 #include <libc/syscalls.h>
00021 #include <core/txdesc.h>
00022 #include <xcalls/xcalls.h>
00023 
00024 
00025 typedef struct x_read_pipe_commit_undo_args_s x_read_pipe_commit_undo_args_t;
00026 
00027 struct x_read_pipe_commit_undo_args_s {
00028         txc_koa_t             *koa;
00029         txc_buffer_circular_t *buffer;
00030 };
00031 
00032 
00033 static
00034 void 
00035 x_read_pipe_undo(void *args, int *result) 
00036 {
00037         x_read_pipe_commit_undo_args_t *args_undo = (x_read_pipe_commit_undo_args_t *) args;
00038         int                            local_result = 0;
00039 
00040         args_undo->buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00041         if (result) {
00042                 *result = local_result;
00043         }
00044 }
00045 
00046 
00047 static 
00048 void
00049 x_read_pipe_commit(void *args, int *result)
00050 {
00051         x_read_pipe_commit_undo_args_t *args_commit = (x_read_pipe_commit_undo_args_t *) args;
00052         int                            local_result = 0;
00053         txc_buffer_circular_t          *buffer;
00054 
00055         buffer = args_commit->buffer;
00056 
00057         if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00058                 buffer->primary_head = buffer->speculative_primary_head; 
00059                 buffer->primary_tail = buffer->speculative_primary_tail; 
00060         } else if (buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00061                 buffer->primary_head = buffer->speculative_secondary_head; 
00062                 buffer->primary_tail = buffer->speculative_secondary_tail; 
00063                 buffer->secondary_head = buffer->secondary_tail = 0;
00064         } else {
00065                 buffer->primary_head = buffer->primary_tail = 0;
00066                 buffer->secondary_head = buffer->secondary_tail = 0;
00067         }
00068         buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00069         if (result) {
00070                 *result = local_result;
00071         }
00072 }
00073 
00074 
00075 static 
00076 ssize_t 
00077 __txc_read_pipe(txc_tx_t *txd, txc_bool_t speculative_read,
00078                 int fd, void *buf, size_t nbyte, int *result) 
00079 {
00080         txc_koamgr_t                   *koamgr = txc_g_koamgr;
00081         txc_koa_t                      *koa;
00082         txc_sentinel_t                 *sentinel;
00083         txc_result_t                   xret;
00084         int                            ret;
00085         x_read_pipe_commit_undo_args_t *args_commit_undo;
00086         int                            local_result = 0;
00087         int                            store_in_primary_buffer;
00088         int                            store_in_secondary_buffer;
00089         int                            hdr_base_index;
00090         int                            needed_bytes;
00091         txc_buffer_circular_t          *buffer;
00092 
00093         
00094         /* 
00095          * FIXME: We don't handle the case where we need to consume more data 
00096          * than available in the buffer. In such a case you need to request
00097          * the additional data.
00098          */
00099 
00100         txc_koa_lock_fd(koamgr, fd);
00101         xret = txc_koa_lookup_fd2koa(koamgr, fd, &koa);
00102         if (xret == TXC_R_FAILURE) {
00103                 /* 
00104                  * The KOA mapped to the file descriptor has gone. Report
00105                  * this error as invalid file descriptor.
00106                  */
00107                 txc_koa_unlock_fd(koamgr, fd);
00108                 local_result = EBADF;
00109                 ret = -1;
00110                 goto done;
00111         }
00112         buffer = (txc_buffer_circular_t *) txc_koa_get_buffer(koa);
00113 
00114         if (!speculative_read) {
00115                 txc_koa_unlock_fd(koamgr, fd);
00116                 if ((buffer->primary_tail != buffer->primary_head) ||
00117                     (buffer->secondary_tail != buffer->secondary_head)) {
00118                         /* There are buffered data available */
00119                         if (buffer->primary_tail != buffer->primary_head) {
00120                                 /* Consume data from the primary buffer */
00121                                 hdr_base_index = buffer->primary_head;
00122                                 buffer->primary_head += nbyte;
00123                         } else {
00124                                 /* Consume data from the secondary buffer and make the secondary 
00125                                  * buffer primary 
00126                                  */
00127                                 hdr_base_index = buffer->secondary_head;
00128                                 buffer->secondary_head += nbyte;
00129                                 buffer->primary_tail = buffer->secondary_tail; 
00130                                 buffer->primary_head = buffer->secondary_head; 
00131                                 buffer->secondary_head = buffer->secondary_tail = 0;
00132                         }
00133                         /* Now fall through to do the copy */
00134                 } else { 
00135                         ret = txc_libc_read(fd, buf, nbyte);
00136                         local_result = errno;
00137                         goto done;
00138                 }
00139         } else {
00140                 /* READ is speculative */               
00141                 sentinel = txc_koa_get_sentinel(koa);
00142                 xret = txc_sentinel_tryacquire(txd, sentinel, 
00143                                                                            TXC_SENTINEL_ACQUIREONRETRY);
00144                 txc_koa_unlock_fd(koamgr, fd);
00145                 if (xret == TXC_R_BUSYSENTINEL) {
00146                         txc_tx_abort_transaction(txd, TXC_ABORTREASON_BUSYSENTINEL);
00147                         TXC_INTERNALERROR("Never gets here. Transaction abort failed.\n");
00148                 }
00149 
00150                 /* Got sentinel. Continue with the rest of the stuff. */
00151                 if (buffer->state == TXC_BUFFER_STATE_NON_SPECULATIVE) {
00152                         if ((args_commit_undo = (x_read_pipe_commit_undo_args_t *)
00153                                                                          txc_buffer_linear_malloc(txd->buffer_linear, 
00154                                                                                                                           sizeof(x_read_pipe_commit_undo_args_t)))
00155                                  == NULL)
00156                         {
00157                                 local_result = ENOMEM;
00158                                 ret = -1;
00159                                 goto done;
00160                         }
00161                         buffer->speculative_primary_head = buffer->primary_head;
00162                         buffer->speculative_primary_tail = buffer->primary_tail;
00163                         buffer->speculative_secondary_head = buffer->secondary_head;
00164                         buffer->speculative_secondary_tail = buffer->secondary_tail;
00165                         buffer->state = TXC_BUFFER_STATE_SPECULATIVE;
00166 
00167                         args_commit_undo->koa = koa;
00168                         args_commit_undo->buffer = buffer;
00169 
00170                         txc_tx_register_commit_action(txd, x_read_pipe_commit, 
00171                                                                                   (void *) args_commit_undo, result,
00172                                                                                   TXC_TX_REGULAR_COMMIT_ACTION_ORDER);
00173                         txc_tx_register_undo_action(txd, x_read_pipe_undo, 
00174                                                                                 (void *) args_commit_undo, result,
00175                                                                                 TXC_TX_REGULAR_UNDO_ACTION_ORDER);
00176                 }
00177 
00178                 if (buffer->speculative_primary_tail != buffer->speculative_primary_head ||
00179                                 buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00180                         /* There are buffered data available to consume*/
00181                         if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00182                                 /* Consume data from the primary buffer */
00183                                 hdr_base_index = buffer->speculative_primary_head;
00184                                 buffer->speculative_primary_head += nbyte;
00185                         } else {
00186                                 /* Consume data from the secondary buffer */
00187                                 hdr_base_index = buffer->speculative_secondary_head;
00188                                 buffer->speculative_secondary_head += nbyte;
00189                         }
00190                         ret = nbyte;
00191                 } else { 
00192                         /* No buffered data available 
00193                          * 1) Find space in the buffer
00194                          * 3) Issue a read call to the kernel to bring the data in the buffer
00195                          * 4) Copy the buffered data back to the application buffer
00196                          */
00197                         needed_bytes = nbyte; 
00198                         store_in_primary_buffer = store_in_secondary_buffer = 0;
00199                         if (buffer->primary_tail < (buffer->size_max - needed_bytes)) {
00200                                 /* There is space in the primary part of the buffer */
00201                                 hdr_base_index = buffer->primary_tail;
00202                                 store_in_primary_buffer = 1;
00203                         }       else if (buffer->secondary_tail < 
00204                                                                                                         (buffer->primary_head + 1 - needed_bytes)) {
00205                                 /* There is space in the secondary part of the buffer */
00206                                 hdr_base_index = buffer->secondary_tail;
00207                                 store_in_secondary_buffer = 1;
00208                         } else {
00209                                 /* There is not space in the buffer */
00210                                 local_result = ENOMEM;
00211                                 ret = -1;
00212                                 goto done;
00213                         }
00214                         ret = txc_libc_read(fd, &(buffer->buf[hdr_base_index]), nbyte);
00215                         if (ret > 0) {
00216                                 if (store_in_primary_buffer) {
00217                                         buffer->primary_tail += ret;
00218                                 } else if (store_in_secondary_buffer) {
00219                                         buffer->secondary_tail += ret;
00220                                 } else {
00221                                         TXC_INTERNALERROR("Sanity check -- shouldn't come here.\n");
00222                                 }
00223                         }
00224                         txc_stats_txstat_increment(txd, XCALL, x_read_pipe, 1);
00225                 }
00226         }       
00227         /* Copy data from the buffer back to the application buffer */
00228         memcpy(buf, &buffer->buf[hdr_base_index], nbyte); 
00229         local_result = 0;
00230 done:
00231         if (result) {
00232                 *result = local_result;
00233         }
00234         return ret;
00235 }
00236 
00237 
00256 ssize_t 
00257 XCALL_DEF(x_read_pipe)(int fd, void *buf, size_t nbyte, int *result)
00258 {
00259         txc_tx_t           *txd;
00260         int                ret;
00261 
00262         txd = txc_tx_get_txd();
00263 
00264         switch(txc_tx_get_xactstate(txd)) {
00265                 case TXC_XACTSTATE_TRANSACTIONAL_IRREVOCABLE:
00266                 case TXC_XACTSTATE_NONTRANSACTIONAL:
00267                         ret = __txc_read_pipe(txd, TXC_BOOL_FALSE, fd, buf, nbyte, result);
00268                         break;
00269                 case TXC_XACTSTATE_TRANSACTIONAL_RETRYABLE:
00270                         ret = __txc_read_pipe(txd, TXC_BOOL_TRUE, fd, buf, nbyte, result);
00271                         break;
00272                 default:
00273                         TXC_INTERNALERROR("Unknown transaction state\n");
00274         }
00275         return ret;
00276 }

Generated on Wed Dec 9 20:32:39 2009 for xCalls by  doxygen 1.4.7