00001
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <errno.h>
00010 #include <string.h>
00011 #include <stdlib.h>
00012 #include <misc/debug.h>
00013 #include <misc/errno.h>
00014 #include <misc/generic_types.h>
00015 #include <core/tx.h>
00016 #include <core/config.h>
00017 #include <core/koa.h>
00018 #include <core/buffer.h>
00019 #include <core/stats.h>
00020 #include <libc/syscalls.h>
00021 #include <core/txdesc.h>
00022 #include <xcalls/xcalls.h>
00023
00024
00025 typedef struct x_read_pipe_commit_undo_args_s x_read_pipe_commit_undo_args_t;
00026
00027 struct x_read_pipe_commit_undo_args_s {
00028 txc_koa_t *koa;
00029 txc_buffer_circular_t *buffer;
00030 };
00031
00032
00033 static
00034 void
00035 x_read_pipe_undo(void *args, int *result)
00036 {
00037 x_read_pipe_commit_undo_args_t *args_undo = (x_read_pipe_commit_undo_args_t *) args;
00038 int local_result = 0;
00039
00040 args_undo->buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00041 if (result) {
00042 *result = local_result;
00043 }
00044 }
00045
00046
00047 static
00048 void
00049 x_read_pipe_commit(void *args, int *result)
00050 {
00051 x_read_pipe_commit_undo_args_t *args_commit = (x_read_pipe_commit_undo_args_t *) args;
00052 int local_result = 0;
00053 txc_buffer_circular_t *buffer;
00054
00055 buffer = args_commit->buffer;
00056
00057 if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00058 buffer->primary_head = buffer->speculative_primary_head;
00059 buffer->primary_tail = buffer->speculative_primary_tail;
00060 } else if (buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00061 buffer->primary_head = buffer->speculative_secondary_head;
00062 buffer->primary_tail = buffer->speculative_secondary_tail;
00063 buffer->secondary_head = buffer->secondary_tail = 0;
00064 } else {
00065 buffer->primary_head = buffer->primary_tail = 0;
00066 buffer->secondary_head = buffer->secondary_tail = 0;
00067 }
00068 buffer->state = TXC_BUFFER_STATE_NON_SPECULATIVE;
00069 if (result) {
00070 *result = local_result;
00071 }
00072 }
00073
00074
00075 static
00076 ssize_t
00077 __txc_read_pipe(txc_tx_t *txd, txc_bool_t speculative_read,
00078 int fd, void *buf, size_t nbyte, int *result)
00079 {
00080 txc_koamgr_t *koamgr = txc_g_koamgr;
00081 txc_koa_t *koa;
00082 txc_sentinel_t *sentinel;
00083 txc_result_t xret;
00084 int ret;
00085 x_read_pipe_commit_undo_args_t *args_commit_undo;
00086 int local_result = 0;
00087 int store_in_primary_buffer;
00088 int store_in_secondary_buffer;
00089 int hdr_base_index;
00090 int needed_bytes;
00091 txc_buffer_circular_t *buffer;
00092
00093
00094
00095
00096
00097
00098
00099
00100 txc_koa_lock_fd(koamgr, fd);
00101 xret = txc_koa_lookup_fd2koa(koamgr, fd, &koa);
00102 if (xret == TXC_R_FAILURE) {
00103
00104
00105
00106
00107 txc_koa_unlock_fd(koamgr, fd);
00108 local_result = EBADF;
00109 ret = -1;
00110 goto done;
00111 }
00112 buffer = (txc_buffer_circular_t *) txc_koa_get_buffer(koa);
00113
00114 if (!speculative_read) {
00115 txc_koa_unlock_fd(koamgr, fd);
00116 if ((buffer->primary_tail != buffer->primary_head) ||
00117 (buffer->secondary_tail != buffer->secondary_head)) {
00118
00119 if (buffer->primary_tail != buffer->primary_head) {
00120
00121 hdr_base_index = buffer->primary_head;
00122 buffer->primary_head += nbyte;
00123 } else {
00124
00125
00126
00127 hdr_base_index = buffer->secondary_head;
00128 buffer->secondary_head += nbyte;
00129 buffer->primary_tail = buffer->secondary_tail;
00130 buffer->primary_head = buffer->secondary_head;
00131 buffer->secondary_head = buffer->secondary_tail = 0;
00132 }
00133
00134 } else {
00135 ret = txc_libc_read(fd, buf, nbyte);
00136 local_result = errno;
00137 goto done;
00138 }
00139 } else {
00140
00141 sentinel = txc_koa_get_sentinel(koa);
00142 xret = txc_sentinel_tryacquire(txd, sentinel,
00143 TXC_SENTINEL_ACQUIREONRETRY);
00144 txc_koa_unlock_fd(koamgr, fd);
00145 if (xret == TXC_R_BUSYSENTINEL) {
00146 txc_tx_abort_transaction(txd, TXC_ABORTREASON_BUSYSENTINEL);
00147 TXC_INTERNALERROR("Never gets here. Transaction abort failed.\n");
00148 }
00149
00150
00151 if (buffer->state == TXC_BUFFER_STATE_NON_SPECULATIVE) {
00152 if ((args_commit_undo = (x_read_pipe_commit_undo_args_t *)
00153 txc_buffer_linear_malloc(txd->buffer_linear,
00154 sizeof(x_read_pipe_commit_undo_args_t)))
00155 == NULL)
00156 {
00157 local_result = ENOMEM;
00158 ret = -1;
00159 goto done;
00160 }
00161 buffer->speculative_primary_head = buffer->primary_head;
00162 buffer->speculative_primary_tail = buffer->primary_tail;
00163 buffer->speculative_secondary_head = buffer->secondary_head;
00164 buffer->speculative_secondary_tail = buffer->secondary_tail;
00165 buffer->state = TXC_BUFFER_STATE_SPECULATIVE;
00166
00167 args_commit_undo->koa = koa;
00168 args_commit_undo->buffer = buffer;
00169
00170 txc_tx_register_commit_action(txd, x_read_pipe_commit,
00171 (void *) args_commit_undo, result,
00172 TXC_TX_REGULAR_COMMIT_ACTION_ORDER);
00173 txc_tx_register_undo_action(txd, x_read_pipe_undo,
00174 (void *) args_commit_undo, result,
00175 TXC_TX_REGULAR_UNDO_ACTION_ORDER);
00176 }
00177
00178 if (buffer->speculative_primary_tail != buffer->speculative_primary_head ||
00179 buffer->speculative_secondary_tail != buffer->speculative_secondary_head) {
00180
00181 if (buffer->speculative_primary_tail != buffer->speculative_primary_head) {
00182
00183 hdr_base_index = buffer->speculative_primary_head;
00184 buffer->speculative_primary_head += nbyte;
00185 } else {
00186
00187 hdr_base_index = buffer->speculative_secondary_head;
00188 buffer->speculative_secondary_head += nbyte;
00189 }
00190 ret = nbyte;
00191 } else {
00192
00193
00194
00195
00196
00197 needed_bytes = nbyte;
00198 store_in_primary_buffer = store_in_secondary_buffer = 0;
00199 if (buffer->primary_tail < (buffer->size_max - needed_bytes)) {
00200
00201 hdr_base_index = buffer->primary_tail;
00202 store_in_primary_buffer = 1;
00203 } else if (buffer->secondary_tail <
00204 (buffer->primary_head + 1 - needed_bytes)) {
00205
00206 hdr_base_index = buffer->secondary_tail;
00207 store_in_secondary_buffer = 1;
00208 } else {
00209
00210 local_result = ENOMEM;
00211 ret = -1;
00212 goto done;
00213 }
00214 ret = txc_libc_read(fd, &(buffer->buf[hdr_base_index]), nbyte);
00215 if (ret > 0) {
00216 if (store_in_primary_buffer) {
00217 buffer->primary_tail += ret;
00218 } else if (store_in_secondary_buffer) {
00219 buffer->secondary_tail += ret;
00220 } else {
00221 TXC_INTERNALERROR("Sanity check -- shouldn't come here.\n");
00222 }
00223 }
00224 txc_stats_txstat_increment(txd, XCALL, x_read_pipe, 1);
00225 }
00226 }
00227
00228 memcpy(buf, &buffer->buf[hdr_base_index], nbyte);
00229 local_result = 0;
00230 done:
00231 if (result) {
00232 *result = local_result;
00233 }
00234 return ret;
00235 }
00236
00237
00256 ssize_t
00257 XCALL_DEF(x_read_pipe)(int fd, void *buf, size_t nbyte, int *result)
00258 {
00259 txc_tx_t *txd;
00260 int ret;
00261
00262 txd = txc_tx_get_txd();
00263
00264 switch(txc_tx_get_xactstate(txd)) {
00265 case TXC_XACTSTATE_TRANSACTIONAL_IRREVOCABLE:
00266 case TXC_XACTSTATE_NONTRANSACTIONAL:
00267 ret = __txc_read_pipe(txd, TXC_BOOL_FALSE, fd, buf, nbyte, result);
00268 break;
00269 case TXC_XACTSTATE_TRANSACTIONAL_RETRYABLE:
00270 ret = __txc_read_pipe(txd, TXC_BOOL_TRUE, fd, buf, nbyte, result);
00271 break;
00272 default:
00273 TXC_INTERNALERROR("Unknown transaction state\n");
00274 }
00275 return ret;
00276 }