sort_stream.cpp

00001 /*<std-header orig-src='shore'>
00002 
00003  $Id: sort_stream.cpp,v 1.3 2010/09/21 14:26:28 nhall Exp $
00004 
00005 SHORE -- Scalable Heterogeneous Object REpository
00006 
00007 Copyright (c) 1994-99 Computer Sciences Department, University of
00008                       Wisconsin -- Madison
00009 All Rights Reserved.
00010 
00011 Permission to use, copy, modify and distribute this software and its
00012 documentation is hereby granted, provided that both the copyright
00013 notice and this permission notice appear in all copies of the
00014 software, derivative works or modified versions, and any portions
00015 thereof, and that both notices appear in supporting documentation.
00016 
00017 THE AUTHORS AND THE COMPUTER SCIENCES DEPARTMENT OF THE UNIVERSITY
00018 OF WISCONSIN - MADISON ALLOW FREE USE OF THIS SOFTWARE IN ITS
00019 "AS IS" CONDITION, AND THEY DISCLAIM ANY LIABILITY OF ANY KIND
00020 FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
00021 
00022 This software was developed with support by the Advanced Research
00023 Project Agency, ARPA order number 018 (formerly 8230), monitored by
00024 the U.S. Army Research Laboratory under contract DAAB07-91-C-Q518.
00025 Further funding for this work was provided by DARPA through
00026 Rome Research Laboratory Contract No. F30602-97-2-0247.
00027 
00028 */
00029 
00030 #include "w_defines.h"
00031 
00032 /*  -- do not edit anything above this line --   </std-header>*/
00033 
00034 /**\anchor sort_stream_i_example */
00035 /*
00036  * This program is a test of sorting with sort_stream.
00037  * It builds on create_rec.cpp; derives its user thread from
00038  * the smuser_thread_t in create_rec.cpp.
00039  */
00040 
00041 #define SUBSTITUTE_MAIN
00042 #include "create_rec.cpp"
00043 
00044 class smthread_user_t : public smthread_driver_t
00045 {
00046     rc_t do_work();
00047     rc_t demo_sort_stream();
00048 
00049 public:
00050     smthread_user_t(int ac, char **av) 
00051                 : smthread_driver_t(ac, av) {}
00052 
00053     virtual ~smthread_user_t()  { }
00054 };
00055 
00056 rc_t 
00057 smthread_user_t::do_work()
00058 {
00059     if (_initialize_device) W_DO(do_init());
00060     else  W_DO(no_init());
00061 
00062     /* instead of printing the sm statistics, 
00063      * demo the use of sort_stream
00064      */
00065     W_DO(demo_sort_stream());
00066 
00067     return RCOK;
00068 }
00069 
00070 w_rc_t 
00071 smthread_user_t::demo_sort_stream()
00072 {
00073     // Recall that the records contain a header that indicates the
00074     // order in which the records were created.  Let's sort in
00075     // reverse order.
00076     w_base_t::int4_t hdrcontents;
00077 
00078     // Key descriptor
00079     using ssm_sort::key_info_t ;
00080 
00081     key_info_t     kinfo;
00082     kinfo.type = sortorder::kt_i4;
00083     kinfo.derived = false;
00084     kinfo.where = key_info_t::t_hdr;
00085     kinfo.offset = 0;
00086     kinfo.len = sizeof(hdrcontents);
00087     kinfo.est_reclen = _rec_size;
00088 
00089     // Behavioral options
00090     using ssm_sort::sort_parm_t ;
00091 
00092     sort_parm_t   behav;
00093     behav.run_size = 10; // pages
00094     behav.vol = _vid;
00095     behav.unique = false; // there should be no duplicates
00096     behav.ascending = false; // reverse the order
00097     behav.destructive = false; // don't blow away the original file --
00098     // immaterial for sort_stream_i
00099     behav.property = ss_m::t_temporary; // don't log the scratch files used
00100 
00101     sort_stream_i  stream(kinfo, behav, _rec_size);
00102     w_assert1(stream.is_sorted()==false);
00103 
00104     // Scan the file, inserting key,rid pairs into a sort stream
00105     W_DO(ssm->begin_xct());
00106 
00107     scan_file_i scan(_fid);
00108     pin_i*      cursor(NULL);
00109     bool        eof(false);
00110     int         i(0);
00111 
00112     do {
00113         w_rc_t rc = scan.next(cursor, 0, eof);
00114         if(rc.is_error()) {
00115             cerr << "Error getting next: " << rc << endl;
00116             _retval = rc.err_num();
00117             return rc;
00118         }
00119         if(eof) break;
00120 
00121         vec_t       header (cursor->hdr(), cursor->hdr_size());
00122         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00123 
00124         rid_t       rid = cursor->rid();
00125         vec_t       elem (&rid, sizeof(rid));
00126 
00127         stream.put(header, elem);
00128         i++;
00129     } while (!eof);
00130     w_assert1(i == _num_rec);
00131     W_DO(ssm->commit_xct());
00132 
00133     // is not sorted until first get_next call
00134     w_assert1(stream.is_sorted()==false);
00135 
00136     // Iterate over the sort_stream_i 
00137     // and print records in reverse order.
00138     cout << "In reverse order:" << endl;
00139     W_DO(ssm->begin_xct());
00140     i--;
00141     eof = false;
00142     do {
00143         vec_t       header; 
00144         vec_t       elem; 
00145         W_DO(stream.get_next(header, elem, eof));
00146         if(eof) break;
00147 
00148         w_assert1(stream.is_sorted()==true);
00149         w_assert1(stream.is_empty()==false);
00150         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00151 
00152         rid_t       rid; 
00153         elem.copy_to(&rid, sizeof(rid));
00154 
00155         w_assert1(i == hdrcontents);
00156 
00157         cout << "    i " << hdrcontents << " rid " << rid << endl;
00158         i--;
00159     } while (!eof);
00160     W_DO(ssm->commit_xct());
00161     w_assert1(stream.is_empty()==false);
00162     w_assert1(stream.is_sorted()==true);
00163     return RCOK;
00164 }
00165 
00166 int
00167 main(int argc, char* argv[])
00168 {
00169     argv0 = argv[0];
00170 
00171     smthread_user_t *smtu = new smthread_user_t(argc, argv);
00172     if (!smtu)
00173             W_FATAL(fcOUTOFMEMORY);
00174 
00175     w_rc_t e = smtu->fork();
00176     if(e.is_error()) {
00177         cerr << "error forking thread: " << e <<endl;
00178         return 1;
00179     }
00180     e = smtu->join();
00181     if(e.is_error()) {
00182         cerr << "error forking thread: " << e <<endl;
00183         return 1;
00184     }
00185 
00186     int        rv = smtu->return_value();
00187     delete smtu;
00188 
00189     return rv;
00190 }

Generated on Thu Dec 9 08:42:26 2010 for Shore Storage Manager by  doxygen 1.4.7