00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "w_defines.h"
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #define SUBSTITUTE_MAIN
00042 #include "create_rec.cpp"
00043
00044 class smthread_user_t : public smthread_driver_t
00045 {
00046 rc_t do_work();
00047 rc_t demo_sort_stream();
00048
00049 public:
00050 smthread_user_t(int ac, char **av)
00051 : smthread_driver_t(ac, av) {}
00052
00053 virtual ~smthread_user_t() { }
00054 };
00055
00056 rc_t
00057 smthread_user_t::do_work()
00058 {
00059 if (_initialize_device) W_DO(do_init());
00060 else W_DO(no_init());
00061
00062
00063
00064
00065 W_DO(demo_sort_stream());
00066
00067 return RCOK;
00068 }
00069
00070 w_rc_t
00071 smthread_user_t::demo_sort_stream()
00072 {
00073
00074
00075
00076 w_base_t::int4_t hdrcontents;
00077
00078
00079 using ssm_sort::key_info_t ;
00080
00081 key_info_t kinfo;
00082 kinfo.type = sortorder::kt_i4;
00083 kinfo.derived = false;
00084 kinfo.where = key_info_t::t_hdr;
00085 kinfo.offset = 0;
00086 kinfo.len = sizeof(hdrcontents);
00087 kinfo.est_reclen = _rec_size;
00088
00089
00090 using ssm_sort::sort_parm_t ;
00091
00092 sort_parm_t behav;
00093 behav.run_size = 10;
00094 behav.vol = _vid;
00095 behav.unique = false;
00096 behav.ascending = false;
00097 behav.destructive = false;
00098
00099 behav.property = ss_m::t_temporary;
00100
00101 sort_stream_i stream(kinfo, behav, _rec_size);
00102 w_assert1(stream.is_sorted()==false);
00103
00104
00105 W_DO(ssm->begin_xct());
00106
00107 scan_file_i scan(_fid);
00108 pin_i* cursor(NULL);
00109 bool eof(false);
00110 int i(0);
00111
00112 do {
00113 w_rc_t rc = scan.next(cursor, 0, eof);
00114 if(rc.is_error()) {
00115 cerr << "Error getting next: " << rc << endl;
00116 _retval = rc.err_num();
00117 return rc;
00118 }
00119 if(eof) break;
00120
00121 vec_t header (cursor->hdr(), cursor->hdr_size());
00122 header.copy_to(&hdrcontents, sizeof(hdrcontents));
00123
00124 rid_t rid = cursor->rid();
00125 vec_t elem (&rid, sizeof(rid));
00126
00127 stream.put(header, elem);
00128 i++;
00129 } while (!eof);
00130 w_assert1(i == _num_rec);
00131 W_DO(ssm->commit_xct());
00132
00133
00134 w_assert1(stream.is_sorted()==false);
00135
00136
00137
00138 cout << "In reverse order:" << endl;
00139 W_DO(ssm->begin_xct());
00140 i--;
00141 eof = false;
00142 do {
00143 vec_t header;
00144 vec_t elem;
00145 W_DO(stream.get_next(header, elem, eof));
00146 if(eof) break;
00147
00148 w_assert1(stream.is_sorted()==true);
00149 w_assert1(stream.is_empty()==false);
00150 header.copy_to(&hdrcontents, sizeof(hdrcontents));
00151
00152 rid_t rid;
00153 elem.copy_to(&rid, sizeof(rid));
00154
00155 w_assert1(i == hdrcontents);
00156
00157 cout << " i " << hdrcontents << " rid " << rid << endl;
00158 i--;
00159 } while (!eof);
00160 W_DO(ssm->commit_xct());
00161 w_assert1(stream.is_empty()==false);
00162 w_assert1(stream.is_sorted()==true);
00163 return RCOK;
00164 }
00165
00166 int
00167 main(int argc, char* argv[])
00168 {
00169 argv0 = argv[0];
00170
00171 smthread_user_t *smtu = new smthread_user_t(argc, argv);
00172 if (!smtu)
00173 W_FATAL(fcOUTOFMEMORY);
00174
00175 w_rc_t e = smtu->fork();
00176 if(e.is_error()) {
00177 cerr << "error forking thread: " << e <<endl;
00178 return 1;
00179 }
00180 e = smtu->join();
00181 if(e.is_error()) {
00182 cerr << "error forking thread: " << e <<endl;
00183 return 1;
00184 }
00185
00186 int rv = smtu->return_value();
00187 delete smtu;
00188
00189 return rv;
00190 }