next up previous contents
Up: Writing Value-Added Servers with Previous: Running the Example

 

Appendix: Program Sources

 

Code Shared by Client and Server

 

Makefile: Makefile.template

# --------------------------------------------------------------- #
# -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- #
# -- University of Wisconsin-Madison, subject to the terms     -- #
# -- and conditions given in the file COPYRIGHT.  All Rights   -- #
# -- Reserved.                                                 -- #
# --------------------------------------------------------------- #

# $Header: /p/shore/shore_cvs/src/examples/vas/grid/Makefile.template,v 1.17 1996/07/30 19:40:21 kupsch Exp $

#
# Makefile for vas/grid program
#

# See "NOTE" below re: running on Solaris.
#     also see the note re: linking with DEBUG library

# Modify the following line to indicated where gcc is installed
GCC =       gcc

# Modify the following line as appropriate to point to the place where
# Shore is installed
INSTALL_DIR = /p/shore/installed/NEW/debug
INCLUDE     = -I$(INSTALL_DIR)/include
LIBSM       = $(INSTALL_DIR)/lib/libsm.a

# These are set to use the modified RPC shipped with Shore
RPCGEN = $(INSTALL_DIR)/bin/rpcgen
LIBCOMMON   = $(INSTALL_DIR)/lib/librpclib.a $(INSTALL_DIR)/lib/libshorecommon.a

SERVER_SRCS =	grid.C rpc_thread.C server.C command_server.C server_stubs.C
CLIENT_SRCS =	client.C command_client.C
COMMON_SRCS =	options.C command.C grid_basics.C

EXECS = server client

SERVER_OBJS =	$(SERVER_SRCS:.C=.o)
CLIENT_OBJS =	$(CLIENT_SRCS:.C=.o)
COMMON_OBJS =	$(COMMON_SRCS:.C=.o)
OBJS =  $(SERVER_OBJS) $(CLIENT_OBJS) $(COMMON_OBJS)

CXX =		$(GCC) -x c++
CC  = 		$(GCC)
LD =		$(GCC)

LDFLAGS =   -lg++ -lstdc++

# NOTE: if running on a Solaris machine, add this:
LDFLAGS       += -lnsl -lsocket

#
# C/C++ Flags.
#
# Note: if you are linking with a debugging version of the Storage
#       manager libraries, be sure to define DEBUG with -DDEBUG
#
CCFLAGS =	-g -O $(INCLUDE) -Wall
CXXFLAGS =	$(CCFLAGS) -fno-implicit-templates

all:	$(EXECS)

.SUFFIXES: .C .o

.C.o:
	$(CXX) $(CXXFLAGS) -c $<

.c.o:
	$(CC) $(CCFLAGS) -c $<

 
# NB: THIS IS IMPORTANT: we include "ShoreConfig.h" because
# we *NEED* the system-dependent definition of a jmp_buf
# in order to see that the thread data structures that
# that we build here are consistent with those in the library.
# If we don't get the right #defines for the configuration, we
# run the risk of building .o files here with the wrong idea
# about the size of sthread_t (the root of the class hierarchy
# for our threads).

RPC_FILES = msg.h msg_clnt.c msg_svc.c msg_xdr.c
$(RPC_FILES): msg.x
	cp $(INSTALL_DIR)/include/ShoreConfig.h .
	$(RPCGEN) msg.x
	rm -f msg_svc.c
	$(RPCGEN) -m -o msg_svc.c msg.x


server: $(SERVER_OBJS) $(COMMON_OBJS) msg_xdr.o msg_svc.o $(LIBSM) $(LIBCOMMON)
	$(LD) -o $@ msg_svc.o $(SERVER_OBJS) $(COMMON_OBJS) msg_xdr.o \
		    $(LIBSM) $(LIBCOMMON) -lm $(LDFLAGS)

client: $(CLIENT_OBJS) $(COMMON_OBJS) $(LIBCOMMON) msg_clnt.o msg_xdr.o
	$(LD) -o $@ $(CLIENT_OBJS) $(COMMON_OBJS) msg_clnt.o msg_xdr.o \
		    $(LIBCOMMON) $(LDFLAGS)

clean:
	$(RM) $(EXECS) $(OBJS) $(RPC_FILES) a.out core tags 

#dependencies
$(SERVER_OBJS) $(CLIENT_OBJS) $(COMMON_OBJS): msg.h grid_basics.h grid.h
msg.h: grid_basics.h
msg_svc.o msg_clnt.o msg_xdr.o: msg.h grid_basics.h
client.o: command.h command_client.h
rpc_thread.o: rpc_thread.h command.h command_server.h
server.o: rpc_thread.h
command.o: command.h
command_client.o: command.h command_client.h
command_server.o server_stubs.o: command.h command_server.h rpc_thread.h
 

RPC Declarations: msg.x

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

%#ifndef GRID_BASICS_H
#include "grid_basics.h"
%#endif /* !GRID_BASICS_H */

/* const MAXNAMELEN = MAX_NAME_LEN;*/
typedef char name_type_t[MAX_NAME_LEN];

typedef char error_msg_t[MAX_ERR_MSG_LEN];

struct error_reply {
    error_msg_t 		error_msg;
};

struct print_grid_reply {
    grid_display_t  	display;
    error_msg_t		error_msg;
};

struct add_item_arg {
    name_type_t		name;
    int			x;
    int			y;
};

struct remove_item_arg {
    name_type_t		name;
};

struct location_arg {
    name_type_t		name;
};

struct location_reply {
    int			x;
    int			y;
    error_msg_t		error_msg;
};

struct spatial_arg {
    int			x_low;
    int			y_low;
    int			x_hi;
    int			y_hi;
};

struct spatial_reply {
    spatial_result_t	result;
    error_msg_t		error_msg;
};

program GRID {
    version GRIDVERS {
	void	ping_rpc(void) 					= 0;
	error_reply	commit_transaction_rpc(void) 		= 101;
	error_reply	abort_transaction_rpc(void) 		= 102;
	error_reply	clear_grid_rpc(void) 			= 103;
	print_grid_reply print_grid_rpc(void) 			= 104;
	error_reply	add_item_rpc(add_item_arg)		= 105;
	error_reply	remove_item_rpc(remove_item_arg)	= 106;
	error_reply	move_item_rpc(add_item_arg)		= 107;
	location_reply  location_of_rpc(location_arg)		= 108;
	spatial_reply	spatial_rpc(spatial_arg)		= 109;
    } = 1;
} = 0x20000100;


#ifdef RPC_HDR

%#define MSG_H


%#ifdef RPC_SVC
%#ifdef __cplusplus

%/*
% * Maximum size of all replys
% * Used to create a sufficiently large a reply buffer in a thread.
% */
% const size_t thread_reply_buf_size = MAX(sizeof(error_reply),
%		          MAX(sizeof(print_grid_reply),
%			  MAX(sizeof(location_reply),
%			  sizeof(spatial_reply))));

%/* server dispatch function */
%extern "C" void grid_1(struct svc_req*, register SVCXPRT*);

%/* Server side of RPCs */
%extern "C" void* 		ping_rpc_1(void*, svc_req*);
%extern "C" error_reply*	commit_transaction_rpc_1(void*, svc_req*);
%extern "C" error_reply*	abort_transaction_rpc_1(void*, svc_req*);
%extern "C" error_reply*	clear_grid_rpc_1(void*, svc_req*);
%extern "C" print_grid_reply*	print_grid_rpc_1(void*, svc_req*);
%extern "C" error_reply*	add_item_rpc_1(add_item_arg*, svc_req*);
%extern "C" error_reply*	remove_item_rpc_1(remove_item_arg*, svc_req*);
%extern "C" error_reply*	move_item_rpc_1(add_item_arg*, svc_req*);
%extern "C" location_reply*	location_of_rpc_1(location_arg*, svc_req*);
%extern "C" spatial_reply*	spatial_rpc_1(spatial_arg*, svc_req*);
%#endif /*__cplusplus*/
%#endif /*RPC_SVC*/

%#ifdef RPC_CLNT
%#ifdef __cplusplus
%extern "C" void* ping_rpc_1(void*, CLIENT*);
%extern "C" error_reply*	commit_transaction_rpc_1(void*, CLIENT*);
%extern "C" error_reply*	abort_transaction_rpc_1(void*, CLIENT*);
%extern "C" error_reply*	clear_grid_rpc_1(void*, CLIENT*);
%extern "C" print_grid_reply*	print_grid_rpc_1(void*, CLIENT*);
%extern "C" error_reply*	add_item_rpc_1(add_item_arg*, CLIENT*);
%extern "C" error_reply*	remove_item_rpc_1(remove_item_arg*, CLIENT*);
%extern "C" error_reply*	move_item_rpc_1(add_item_arg*, CLIENT*);
%extern "C" location_reply*	location_of_rpc_1(location_arg*, CLIENT*);
%extern "C" spatial_reply*	spatial_rpc_1(spatial_arg*, CLIENT*);
%#endif /*__cplusplus*/
%#endif /*RPC_CLNT*/

#endif /*RPC_HDR*/
 

grid_basics.h

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#ifndef GRID_BASICS_H
#define GRID_BASICS_H

#include "ShoreConfig.h"

/* 
 * This defines a bunch of commonly used constants for the grid
 * program.  
 *
 * Note: #define is since this file must be run through rpcgen.
 */

/*
 * Maximun length of an item name
 */
#define MAX_NAME_LEN 21

/*
 * Max error message length
 */
#define MAX_ERR_MSG_LEN 80

/*
 * Maximum size of grid
 */
#define MAX_GRID_X  40
#define MAX_GRID_Y  15


/*
 * Maximum # of items returns from a spatial query
 */
#define MAX_SPATIAL_RESULT 10

/*
 * Items on the grid
 * Note: #ifdef __cplusplus is to avoid sending c++ code
 *       through rpcgen (since it can't handle it)
 */
struct item_t {
#ifdef __cplusplus
                item_t();
                item_t(const char* _name, int _x, int _y);
    void        init(const char* _name, int _x, int _y);
#endif
    /* location on grid */
    int         x;
    int  	y;
    /* name of the item */
    char        name[MAX_NAME_LEN]; 
};

/*
 */

/*
 * Query Result Structures
 */

typedef char grid_display_row_t[MAX_GRID_X];

struct grid_display_t {
    grid_display_row_t  rows[MAX_GRID_Y];
};

/* results for spatial queries */
struct spatial_result_t {
    int         found_cnt;      /* number of items found	*/
    item_t      items[MAX_SPATIAL_RESULT]; /* some of the items found */
};



/* sunos 4.1.3 does not declare these */
#if defined(SUNOS41) && defined(__cplusplus)
extern "C" {
    void bzero(char*, int);
    int	socket(int, int, int);
    int	bind(int, const void *, int);
}
#endif

#endif /* GRID_BASIC_H */
 

grid_basics.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#define GRID_BASICS_C

#include "ShoreConfig.h"
#include "string.h"
#include "grid_basics.h"

item_t::item_t()
    : x(0), y(0)
{
    memset(name, 0, MAX_NAME_LEN);
}

item_t::item_t(const char* _name, int _x, int _y)
    : x(_x), y(_y)
{
    strncpy(name, _name, MAX_NAME_LEN);
    name[MAX_NAME_LEN] = 0;  // make sure string ends in zero
}
 

command.h

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#ifndef COMMAND_H
#define COMMAND_H

typedef char*	cmd_err_t;

/*
 * Command processing class
 */
class command_base_t {
public:

			command_base_t() {};
    virtual		~command_base_t() {};

    // Commands that get converted to an RPC 
    // All return an error message string that is NULL if success.
    virtual cmd_err_t	commit_transaction() = 0;
    virtual cmd_err_t	abort_transaction() = 0;
    virtual cmd_err_t	clear_grid() = 0;
    virtual cmd_err_t	print_grid(grid_display_t& rows) = 0;
    virtual cmd_err_t	add_item(const char* name, int x, int y) = 0;
    virtual cmd_err_t	remove_item(const char* name) = 0;
    virtual cmd_err_t	move_item(const char* name, int x, int y) = 0;
    virtual cmd_err_t	location_of(const char* name, int& x, int& y) = 0;
    virtual cmd_err_t	spatial_query(const nbox_t& box, spatial_result_t& result) = 0;

    // Command Parsing
    // Print errors to stderr
    // Sets "quit" to true if quit command was found
    void		parse_command(char* line, bool& quit);
};


#endif /* COMMAND_H */
 

command.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements the main() code for the grid client program
 */

#include "ShoreConfig.h"
#include <stream.h>
#include <string.h>
#include <strstream.h>
#include <ctype.h>
#include <rpc/rpc.h>
// include stuff needed for SM applications (clients)
// use this rather than sm_vas.h since it's small and all that
// is necessary for this file
#include "grid_basics.h"
#include "sm_app.h"
#include "msg.h"
#include "grid.h"
#include "command.h"

enum command_token_t {
    commit_cmd,
    abort_cmd,
    clear_cmd,
    print_cmd,
    add_cmd,
    remove_cmd,
    move_cmd,
    locate_cmd,
    spatial_cmd,
    quit_cmd,
    help_cmd
};

struct command_description_t {
    command_token_t	token;
    int		  	param_cnt;	// number of parameters
    char* 		name;		// string name of command
    char* 		parameters;	// parameter list
    char* 		description;	// command description
};

static command_description_t descriptions[] = {
    {commit_cmd, 0, "commit", "", 	"commit transaction and start another one"},
    {abort_cmd,  0, "abort",  "", 	"abort transaction and start another one"},
    {clear_cmd,  0, "clear",  "", 	"clear grid"},
    {print_cmd,  0, "print",  "", 	"print grid"},
    {add_cmd,    3, "add",    "name x y", "add new item <name> at <x,y>"},
    {remove_cmd, 1, "remove", "name", 	"remove item <name>"},
    {move_cmd, 3, "move", "name x y", 	"move item <name> to location <x,y>"},
    {locate_cmd, 1, "locate", "name", 	"print location of item <name>"},
    {spatial_cmd, 4, "spatial", "x_lo y_lo x_hi y_hi", 	"print count of items in rectangle and list first few items"},
    {quit_cmd,   0, "quit",   "", 	"quit and exit program (aborts current transaction)"},
    {help_cmd,   0, "help",   "", 	"prints this message"}
};

// number of commands
static command_cnt = sizeof(descriptions)/sizeof(command_description_t);

static void
print_commands() 
{
    cerr << "Valid commands are: \n"<< endl;
    const command_description_t* cmd;
    for (cmd = descriptions; cmd != descriptions+command_cnt; cmd++) {
	cerr << "    " << cmd->name << " " << cmd->parameters << endl;
	cerr << "        " << cmd->description << endl;
    }
    cerr << "\n    Comments begin with a '#' and continue until the end of the line." << endl;
}

static void
print_usage(const command_description_t* cmd) 
{
    cerr << "Usage: "<< cmd->name << " " << cmd->parameters << endl;
}

void
command_base_t::parse_command(char* line, bool& quit)
{
    istrstream	s(line);

    const	max_params = 5;
    char*	params[max_params];
    int		param_cnt = 0;
    int		i;

    // find all parameters in the line (parameters begin
    // with non-white space) and end each parameter with \0
    bool in_param = false;	// not current in a parameter
    for (i = 0; line[i] != '\0'; i++) {
	if (in_param) {
	    if (isspace(line[i])) {
		// end of parameter
		line[i] = '\0';
		in_param = false;
	    }
	} else {
	    if (line[i] == '#') {
		// rest of line is comment
		break;
	    }

	    if (!isspace(line[i])) {
		// beginning of parameter
		if (param_cnt == max_params) {
		    cerr << "Error: too many parameters." << endl;
		    return;
		}
		params[param_cnt] = line+i;
		param_cnt++;
		in_param = true;
	    }
	}
    }

    if (param_cnt == 0) {
	// blank line
	return;
    }

    // Search for command in command list
    command_description_t* cmd;
    for (cmd = descriptions; cmd != descriptions+command_cnt; cmd++) {
	// command is recognized with just first 2 characters
	if (strncmp(params[0], cmd->name, 2) == 0) {
	    break;
	}
    }
    if (cmd == descriptions+command_cnt) {
	// command not found
	cerr << "Error: unkown command " << params[0] << endl;
	print_commands();
    } else if (cmd->param_cnt != param_cnt-1) {
	// wrong number of parameters
	cerr << "Error: wrong number of parameters for " << cmd->name << endl;
	print_usage(cmd);
    } else {

	// call proper RPC for the command

	int 	x;
	int 	y;
	char* 	name;
	quit = false;

	cmd_err_t err = 0;
	switch(cmd->token) {
	case commit_cmd:
	    err = commit_transaction();
	    if (!err) {
		cout << "transaction is committed -- new one started" << endl;
	    }
	    break;
	case abort_cmd:
	    err = abort_transaction();
	    if (!err) {
		cout << "transaction is rolled back -- new one started" << endl;
	    }
	    break;
	case clear_cmd:
	    err = clear_grid();
	    if (!err) {
		cout << "grid has been cleared" << endl;
	    }
	    break;
	case print_cmd:
	    grid_display_t display;
	    err = print_grid(display);
	    if (err) break;

	    // print header line
	    cout << "\n    ";
	    for (int col = 0; col < MAX_GRID_X; col++) {
		if (col%10 == 0) {
		    cout << '.';
		} else {
		    cout << col%10;
		}
	    } 
	    cout << endl;
	    // print rows
	    for (int row = 0; row < MAX_GRID_Y; row++) {
		cout << form("%.3i", row) << " " ;
		for (int col = 0; col < MAX_GRID_X; col++) {
		    cout << display.rows[row][col];
		}
		cout << endl;
	    }
	    break;
	case add_cmd:
	    name = params[1];
	    x = strtol(params[2], 0, 0);
	    y = strtol(params[3], 0, 0);
	    if (x < 0 || x >= MAX_GRID_X) {
		cerr << "Error: x parameter must be >=0 and < " << MAX_GRID_X << endl;
		break;
            }
	    if (y < 0 || y >= MAX_GRID_Y) {
		cerr << "Error: y parameter must be >=0 and < " << MAX_GRID_Y << endl;
		break;
            }
   	    err = add_item(name, x, y);
	    if (!err) {
		cout << "new item " << name << " has been added" << endl;
	    }
	    break;
	case remove_cmd:
	    name = params[1];
   	    err = remove_item(name);
	    if (!err) {
		cout << "item " << name << " has been removed" << endl;
	    }
	    break;
	case move_cmd:
	    name = params[1];
	    x = strtol(params[2], 0, 0);
	    y = strtol(params[3], 0, 0);
	    if (x < 0 || x >= MAX_GRID_X) {
		cerr << "Error: x parameter must be >=0 and < " << MAX_GRID_X << endl;
		break;
            }
	    if (y < 0 || y >= MAX_GRID_Y) {
		cerr << "Error: y parameter must be >=0 and < " << MAX_GRID_Y << endl;
		break;
            }
   	    err = move_item(name, x, y);
	    if (!err) {
		cout << "item " << name << " has been moved" << endl;
	    }
	    break;
	case locate_cmd:
	    name = params[1];
   	    err = location_of(name, x, y);
	    if (!err) {
		cout << "item " << name << " is located at: "
		     << x << "," << y << endl;
	    }
	    break;
	case spatial_cmd: {
	    // generate nbox from last 4 parameters
	    const coord_cnt = 4;
	    int coord[coord_cnt];
	    int i;
	    for (i = 0; i < coord_cnt; i++ ) {
		coord[i] = strtol(params[i+1], 0, 0);
	    }
	    {
		nbox_t box(2, coord);
		spatial_result_t result;
		err = spatial_query(box, result);
		if (!err) {
		    cout << "In box [" << coord[0] << "," << coord[1]
			 << " " << coord[2] << "," << coord[3] << "]"
			 << " there are " << result.found_cnt << " items."
			 << endl;
		    if (result.found_cnt > 0) {
			int print_cnt = MIN(MAX_SPATIAL_RESULT, result.found_cnt);
			cout << "The first " << print_cnt 
			     << " items found are:" << endl;
			item_t* it; // item iterator
			for (i = 0, it = result.items;
			     i < print_cnt; i++, it++) {
			    cout << it->name << " " << it->x << "," << it->y << endl;
			}
		    }
		}
	    }
	    }
	    break;
      	case quit_cmd:
	    quit = true;
	    break;
      	case help_cmd:
	    print_commands();
	    break;
	default:
	    cerr << "Internal Error at: " << __FILE__ << ":" << __LINE__ << endl;
	    exit(1);
	}
	if (err) {
	    //cerr << "Error: " << err << endl;
	    cerr << err << endl;
	    cerr << "Error: " << cmd->name << " command failed." << endl;
	}
    }
}
 

Configuration Options: options.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements configuration option processing for
 * both the client and the server.
 */

#include <stream.h>
#include <string.h>

// since this file only deals with the SSM option package,
// rather than including sm_vas.h, just include what's needed for
// options:
#include "w.h"
#include "option.h"

/*
 * init_config_options intialized configuration options for
 * both the client and server programs in the Grid example.
 *
 * The options parameter is the option group holding all the options.
 * It is assumed that all SSM options have been added if called
 * by the server.
 *
 * The prog_type parameter is should be either "client" or "server".
 *
 * The argc and argv parameters should be argc and argv from main().
 * Recognized options will be located in argv and removed.  argc
 * is changed to reflect the removal.
 *
 */

w_rc_t
init_config_options(option_group_t& options,
		    const char* prog_type,
		    int& argc, char** argv)
{

    w_rc_t rc;	// return code

    // set prog_name to the file name of the program without the path
    char* prog_name = strrchr(argv[0], '/');
    if (prog_name == NULL) {
	prog_name = argv[0];
    } else {
	prog_name += 1; /* skip the '/' */
	if (prog_name[0] == '\0')  {
		prog_name = argv[0];
	}
    }
 
    W_DO(options.add_class_level("grid"));	// for all grid examples
    W_DO(options.add_class_level(prog_type));	// server or client
    W_DO(options.add_class_level(prog_name));	// program name

    // read the .examplerc file to set options
    {
	ostrstream      err_stream;
	const char* opt_file = "./exampleconfig"; 	// option config file
	option_file_scan_t opt_scan(opt_file, &options);

	// scan the file and override any current option settings
	// options names must be spelled correctly
	rc = opt_scan.scan(true /*override*/, err_stream, true);
	if (rc) {
	    char* errmsg = err_stream.str();
	    cerr << "Error in reading option file: " << opt_file << endl;
	    cerr << "\t" << errmsg << endl;
	    if (errmsg) delete errmsg;
	    return rc;
	}
    }

    // parce argv for options
    if (!rc) {
        // parse command line
        ostrstream      err_stream;
        rc = options.parse_command_line(argv, argc, 2, &err_stream);
        err_stream << ends;
        char* errmsg = err_stream.str();
        if (rc) {
            cerr << "Error on Command line " << endl;
            cerr << "\t" << w_error_t::error_string(rc.err_num()) << endl;
            cerr << "\t" << errmsg << endl;
	    return rc;
        }
        if (errmsg) delete errmsg;
    }
 
    // check required options
    {
	ostrstream      err_stream;
	rc = options.check_required(&err_stream);
        if (rc) {
	    char* errmsg = err_stream.str();
            cerr << "These required options are not set:" << endl;
            cerr << errmsg << endl;
	    if (errmsg) delete errmsg;
	    return rc;
        }
    } 

    return RCOK;
}

 

Server Code

 

Main: server.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements the main() code for the grid server program
 */

#include "ShoreConfig.h"
#include <stream.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <rpc/rpc.h>
#ifdef SOLARIS2
#include <rpc/svc_soc.h>
#endif

// This include brings in all header files needed for writing a VAs 
#include "sm_vas.h"

#include "grid_basics.h"
#define RPC_SVC
#include "msg.h"
#include "grid.h"
#include "command.h"
#include "command_server.h"
#include "rpc_thread.h"

ss_m* ssm = 0;

// shorten error code type name
typedef w_rc_t rc_t;

// this is implemented in options.C
w_rc_t init_config_options(option_group_t& options,
			const char* prog_type,
			int& argc, char** argv);

// pointer to RPC service this server provides
SVCXPRT* svcxprt = 0;


/*
 * This function either formats a new device and creates a
 * volume on it, or mounts an already existing device and
 * returns the ID of the volume on it.
 */
rc_t
setup_device_and_volume(const char* device_name, bool init_device,
			smksize_t quota, lvid_t& lvid)
{
    devid_t	devid;
    u_int	vol_cnt;
    rc_t 	rc;

    if (init_device) {
	cout << "Formatting and mounting device: " << device_name 
	     << " with a " << quota << "KB quota ..." << endl;
	W_DO(ssm->format_dev(device_name, quota, true));

	// mount the new device
	W_DO(ssm->mount_dev(device_name, vol_cnt, devid));

	// generate a volume ID for the new volume we are about to
	// create on the device
	W_DO(ssm->generate_new_lvid(lvid));

	// create the new volume 
	cout << "Creating a new volume on the device" << endl;
	cout << "    with a " << quota << "KB quota ..." << endl;
	W_DO(ssm->create_vol(device_name, lvid, quota));

	// create the logical ID index on the volume, reserving no IDs
	W_DO(ssm->add_logical_id_index(lvid, 0, 0));

    } else {
	cout << "Using already existing device: " << device_name << endl;
	// mount already existing device
	rc = ssm->mount_dev(device_name, vol_cnt, devid);
	if (rc) {
	    cerr << "Error: could not mount device: " << device_name << endl;
	    cerr << "   Did you forget to run the server with -i the first time?" << endl;
	    return rc;
	}
	
	// find ID of the volume on the device
	lvid_t* lvid_list;
	u_int   lvid_cnt;
	W_DO(ssm->list_volumes(device_name, lvid_list, lvid_cnt));
	if (lvid_cnt == 0) {
	    cerr << "Grid program error, device has no volumes" << endl;
	    ::exit(1);
	}
	lvid = lvid_list[0];
	delete [] lvid_list;
    }
    return RCOK;
}


/*
 * This function starts the RPC service by allocating a connection
 * socket (and binding it to conn_port) and calling RPC initialization
 * functions.
 */
rc_t
start_tcp_rpc(int conn_port, int& conn_sock)
{
    struct sockaddr_in addr;

    cerr << "allocating a tcp socket for listening for connections ..." << endl;
#ifdef SOLARIS2
    conn_sock = t_open("/dev/tcp", O_RDWR, 0);
#else
    conn_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
#endif
    if (conn_sock < 0) {
        perror("socket");
        return RC(fcOS);	// indicate an OS error occurred
    }

    cerr << "binding to port " << conn_port << endl; 
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(conn_port);
#ifdef SOLARIS2
    // TODO: what is the tli counterpart of SO_REUSEADDR?
    struct t_bind tb_args;
    tb_args.addr.maxlen = tb_args.addr.len = sizeof addr;
    tb_args.addr.buf = (char *)&addr;
    tb_args.qlen = 5;       // Arbitrary value
    if (t_bind(conn_sock, &tb_args, 0) < 0) {
	// TODO: what is the tli counterpart of SO_REUSEADDR?
	// TODO: deal with Address-in-use error
	perror("t_bind");
	return 0;
    }
#else
    if (bind(conn_sock, (struct sockaddr *)&addr, sizeof (addr)) < 0) {
	perror("bind");
	return RC(fcOS);
    }
#endif

    cerr << "creating tcp service" << endl;
    int buf_size = 0; // use default send/receive buffer size
    errno = 0;

    svcxprt =  svctcp_create(conn_sock, buf_size, buf_size);

    if(svcxprt == 0) {
	cerr << "Error: svctcp_create returned NULL" << endl;
	if (errno) return RC(fcOS);
	else       return RC(fcINTERNAL);
    }

    cerr << "registering rpc service" << endl;
    // pass 0 for the protocol parameter so that the portmapper is not
    // used.
    if (!svc_register(svcxprt, GRID, GRIDVERS, grid_1, 0/*protocol*/)) {
	// registration failed
	cerr << "Error: rpc registration failed" << endl;
	::exit(1);
	// Note: if we were registering with the portmapper
	//       we could call svc_unregister and then
	//	 try again.
    }

    return RCOK;
}


/*
 * This function ends the RPC service by calling RPC shutdown functions.
 */
rc_t
stop_tcp_rpc()
{
    assert(svcxprt);
    cerr << "unregister rpc service" << endl;
    svc_unregister(GRID, GRIDVERS);
    cerr << "destroy rpc service" << endl;
    svc_destroy(svcxprt);
    svcxprt = 0;
    return RCOK;
}

void
usage(option_group_t& options)
{
    cerr << "Usage: server [-i] [options]" << endl;
    cerr << "       -i will re-initialize the device/volume for the DB" << endl;
    cerr << "Valid options are: " << endl;
    options.print_usage(TRUE, cerr);
}

class startup_smthread_t : public smthread_t
{
private:
	option_t* opt_connect_port,
		* opt_device_name,
	        * opt_device_quota;
	bool	init_device;

public:
        startup_smthread_t(option_t *, option_t *, option_t *, bool);
        ~startup_smthread_t() {}
        void run();
};

startup_smthread_t::startup_smthread_t(
	option_t * _opt_connect_port,
	option_t * _opt_device_name,
	option_t * _opt_device_quota,
	bool       _init_device
)
: 
    opt_connect_port(_opt_connect_port),
    opt_device_name(_opt_device_name),
    opt_device_quota(_opt_device_quota),
    init_device(_init_device),
    smthread_t(t_regular)
{
}


int
main(int argc, char* argv[])
{
    option_t* opt_connect_port = 0;
    option_t* opt_device_name = 0;
    option_t* opt_device_quota = 0;

    // pointers to options we will create for the grid server program

    cout << "processing configuration options ..." << endl;
    const option_level_cnt = 3; 
    option_group_t options(option_level_cnt);

    W_COERCE(options.add_option("connect_port", "1024 < integer < 65535",
			 "1234", "port for connecting to grid server",
			 false, option_t::set_value_long,
			 opt_connect_port));

    W_COERCE(options.add_option("device_name", "device/file name",
			 NULL, "device containg volume to use for grid program",
			 true, option_t::set_value_charstr,
			 opt_device_name));

    W_COERCE(options.add_option("device_quota", "# > 1000",
			 "2000", "quota for device containing grid volume",
			 false, option_t::set_value_long,
			 opt_device_quota));

    // have the SSM add its options to the group
    W_COERCE(ss_m::setup_options(&options));

    if (init_config_options(options, "server", argc, argv)) {
	usage(options);
	::exit(1);
    }


    // process command line: looking for the "-i" flag
    bool init_device = false;
    if (argc > 2) {
	usage(options);
	::exit(1);
    } else if (argc == 2) {
	if (strcmp(argv[1], "-i") == 0) {
	    cout << "Do you really want to initialize the Grid database? ";
	    char answer;
	    cin >> answer;
	    if (answer == 'y' || answer == 'Y') {
		init_device = true;
	    } else {
		cerr << "Please try again without the -i option" << endl;
		::exit(0);
	    }
	} else {
	    usage(options);
	    ::exit(1);
	}
    }

    startup_smthread_t *doit = new startup_smthread_t( opt_connect_port,
	opt_device_name, opt_device_quota, init_device);

    if(!doit) {
	W_FATAL(fcOUTOFMEMORY);
    }
    W_COERCE(doit->fork());
    W_COERCE(doit->wait());
    delete doit;
}

void
startup_smthread_t::run()
{
    rc_t rc;
    cout << "Starting SSM and performing recovery ..." << endl;
    ssm = new ss_m();
    if (!ssm) {
	cerr << "Error: Out of memory for ss_m" << endl;
	::exit(1);
    }

    lvid_t lvid;  // ID of volume for storing grid
    smksize_t quota = strtol(opt_device_quota->value(), 0, 0);
    rc = setup_device_and_volume(opt_device_name->value(), init_device, quota, lvid);
    if (rc) {
	cerr << "could not setup device/volume due to: " << endl;
	cerr << rc << endl;
	delete ssm;
	rc = RCOK;   // force deletion of w_error_t info hanging off rc
	             // otherwise a leak for w_error_t will be reported
	::exit(1);
    }

    // tell the command server what volume to use for the grid
    command_server_t::lvid = lvid;

    // start the RPC service listening on the connection port
    // specified by the connect_port option
    int connect_port = strtol(opt_connect_port->value(), 0, 0);
    cerr << "starting up, listening on port " << connect_port <<endl;
    int connect_socket;
    W_COERCE(start_tcp_rpc(connect_port, connect_socket));

    listener_t* listen_thread = new listener_t(connect_socket);
    W_COERCE(listen_thread->fork());

    // start thread to process commands on stdin
    cout << "main starting stdin thread" << endl;
    stdin_thread_t* stdin_thread = new stdin_thread_t;

    W_COERCE(stdin_thread->fork());

    // wait for the stdin thread to finish
    W_COERCE(stdin_thread->wait());
    cout << "Stdin thread is done" << endl;

    // shutdown the RPC listener thread and wait for it to end
    listen_thread->shutdown();
    W_COERCE(listen_thread->wait());
   
    delete listen_thread;
    delete stdin_thread;

    W_COERCE(stop_tcp_rpc());

    cout << "\nShutting down SSM ..." << endl;
    delete ssm;

    cout << "Finished!" << endl;
}
 

command_server.h

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#ifndef COMMAND_SERVER_H
#define COMMAND_SERVER_H

/*
 * Server command processing class
 */
class command_server_t : public command_base_t {
public:
			command_server_t();
			~command_server_t();
   
    // RPC methods
    virtual cmd_err_t	commit_transaction();
    virtual cmd_err_t	abort_transaction();
    virtual cmd_err_t	clear_grid();
    virtual cmd_err_t	print_grid(grid_display_t& rows);
    virtual cmd_err_t	add_item(const char* name, int x, int y);
    virtual cmd_err_t	remove_item(const char* name);
    virtual cmd_err_t	move_item(const char* name, int x, int y);
    virtual cmd_err_t	location_of(const char* name, int& x, int& y);
    virtual cmd_err_t   spatial_query(const nbox_t& box, spatial_result_t& result);

    static lvid_t	lvid;		// volume containing grid
private:
    static const char*	grid_name;	// root name of grid 

    rc_t		init();

    grid_t		grid;		// grid to serve requests for

    // These are used to generate error replies
    ostrstream		err_strstream;	
    char		_err_space[MAX_ERR_MSG_LEN];
};

#endif /* COMMAND_SERVER_H */
 

command_server.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements the server side of the RPCs
 *
 * It is organized as a mix of RPC stub and corresponding
 * command_server_t method for the stub.
 */

#include "ShoreConfig.h"
#include <stream.h>
#include <string.h>
#include <rpc/rpc.h>
// include stuff needed for SM applications (clients)
#include "sm_vas.h"
#include "grid_basics.h"
#define RPC_SVC  /* so rpc prototypes are included */
#include "msg.h"
#include "grid.h"
#include "command.h"
#include "command_server.h"
#include "rpc_thread.h"

lvid_t command_server_t::lvid = lvid_t::null;
const char* command_server_t::grid_name = "Grid_Name";

/*
 * This is an error handling macro that calls "method".
 * If "method" returns an error, a message is written to the
 * command_server_t err_strstream and the string message is returned.
 */
#define SSMDO(method)					\
{							\
    w_rc_t __e = method;				\
    if (__e) { 						\
	cerr << __e << endl;				\
	err_strstream.seekp(0, ios::beg);		\
	err_strstream << "Error from SSM: " 		\
		      << __e->error_string(__e.err_num()) << ends;	\
	return err_strstream.str();			\
    } 							\
}							

/********************************************************************
  Command_server_t Methods
 *******************************************************************/

command_server_t::command_server_t()
    : err_strstream(_err_space, sizeof(_err_space))
{
    W_COERCE(ss_m::begin_xct());

    rc_t rc = init();
    if (rc) {
	cerr << "Error: could not start command server due to:" << endl;
	cerr << "    " << rc << endl;
	W_COERCE(ss_m::abort_xct());
	exit(1);
    }

    W_COERCE(ss_m::commit_xct());  // commit the board initialization

    // start the first transaction for this client
    W_COERCE(ss_m::begin_xct());
}

command_server_t::~command_server_t()
{
    // connection is shutting down, so
    // abort the currently running transaction
    W_COERCE(ss_m::abort_xct());
}


/*
 * This functions initialized a command_server_t, including
 * creating the grid database structures if they do not yet exist.
 * 
 * First, it sees if there is a grid on the volume by
 * looking up the special string, Grid_Name, in
 * the volume root index.
 *
 * If no grid already exists, we create one.  And store info
 * about it in the root index.
 */
rc_t
command_server_t::init()
{
    serial_t root_iid;	// root index ID
    W_DO(ss_m::vol_root_index(lvid, root_iid));
    grid_t::grid_info_t info;
    smsize_t    info_len = sizeof(info);
    bool	found;
    W_DO(ss_m::find_assoc(lvid, root_iid,
			      vec_t(grid_name, strlen(grid_name)),
			      &info, info_len, found));
    if (found) {
	assert(info_len == sizeof(info));
	cout << "Using already existing grid" << endl;
    } else {
	cout << "Creating a new Grid" << endl;

	// create the item file
	W_DO(ss_m::create_file(lvid, info.item_file, t_regular));

	// create the btree index on item name
	// the "b*1000" indicates the key type is a variable
	// length byte string with maximum length of 1000
	W_DO(ss_m::create_index(lvid, ss_m::t_uni_btree, t_regular,
				"b*1000", 0, info.name_index));

	// create the R*tree index on item location
	W_DO(ss_m::create_md_index(lvid, ss_m::t_rtree, t_regular, info.spatial_index));

	// store the grid info in the root index
	W_DO(ss_m::create_assoc(lvid, root_iid,
				vec_t(grid_name, strlen(grid_name)),
 				vec_t(&info, sizeof(info))));
    }

    grid.init(lvid, info);    
    return RCOK;
}


/********************************************************************
  The following command_server_t methods correspond to RPCs
 *******************************************************************/

cmd_err_t
command_server_t::commit_transaction()
{
    SSMDO(ss_m::commit_xct());
    SSMDO(ss_m::begin_xct());
    return 0; /* success */
}

cmd_err_t
command_server_t::abort_transaction()
{
    SSMDO(ss_m::abort_xct());
    SSMDO(ss_m::begin_xct());
    return 0; /* success */
}

cmd_err_t
command_server_t::clear_grid()
{
    SSMDO(grid.clear());
    return 0; /* success */
}

cmd_err_t
command_server_t::print_grid(grid_display_t& rows)
{
    SSMDO(grid.generate_display(rows));
    return 0; /* success */
}

cmd_err_t
command_server_t::add_item(const char* name, int x, int y)
{
    SSMDO(grid.add_item(name, x, y));
    return 0; /* success */
}

cmd_err_t
command_server_t::remove_item(const char* name)
{
    bool found;
    SSMDO(grid.remove_item(name, found));
    if (!found) {
	err_strstream.seekp(0, ios::beg);
	err_strstream << "Error: item was not found" << endl;
	return err_strstream.str();
    }
    return 0; /* success */
}


cmd_err_t
command_server_t::move_item(const char* name, int x, int y)
{
    bool found;
    SSMDO(grid.move_item(name, x, y, found));
    if (!found) {
	err_strstream.seekp(0, ios::beg);
	err_strstream << "Error: item was not found" << endl;
	return err_strstream.str();
    }
    return 0; /* success */
}


cmd_err_t
command_server_t::location_of(const char* name, int& x, int& y)
{
    bool found;
    SSMDO(grid.location_of(name, x, y, found));
    if (!found) {
	err_strstream.seekp(0, ios::beg);
	err_strstream << "Error: item was not found" << endl;
	return err_strstream.str();
    }
    return 0; /* success */
}


cmd_err_t
command_server_t::spatial_query(const nbox_t& box, spatial_result_t& result)
{
    SSMDO(grid.spatial_query(box, result));
    return 0; /* success */
}
 

server_stubs.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements the server side of the RPCs
 *
 * It is organized as a mix of RPC stub and corresponding
 * command_server_t method for the stub.
 */

#include "ShoreConfig.h"
/* 
 * NB: THIS IS IMPORTANT: we include "ShoreConfig.h" because
 * we *NEED* the system-dependent definition of a jmp_buf
 * in order to see that the thread data structures that
 * that we build here are consistent with those in the library.
 * If we don't get the right #defines for the configuration, we
 * run the risk of building .o files here with the wrong idea
 * about the size of sthread_t (the root of the class hierarchy
 * for our threads).
 */

#include <stream.h>
#include <string.h>
#include <rpc/rpc.h>
// include stuff needed for SM applications (clients)
#include "sm_vas.h"
#include "grid_basics.h"
#define RPC_SVC  /* so rpc prototypes are included */
#include "msg.h"
#include "grid.h"
#include "command.h"
#include "command_server.h"
#include "rpc_thread.h"

/********************************************************************
   RPC Stubs that call command_server_t methods for processing
 *******************************************************************/

// there is no command_server_t method for ping
void *
ping_rpc_1(void* , svc_req* )
{
    return (void *)client_t::me()->reply_buf;
}

error_reply *
commit_transaction_rpc_1(void* , svc_req* )
{
    client_t* Me = client_t::me();

    error_reply& reply = *(error_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->commit_transaction();
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}

error_reply *
abort_transaction_rpc_1(void* , svc_req* )
{
    client_t* Me = client_t::me();

    error_reply& reply = *(error_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->abort_transaction();
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


error_reply *
clear_grid_rpc_1(void* , svc_req* )
{
    client_t* Me = client_t::me();
    error_reply& reply = *(error_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->clear_grid();

    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


print_grid_reply *
print_grid_rpc_1(void* , svc_req* )
{
    client_t* Me = client_t::me();

    print_grid_reply& reply = *(print_grid_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->print_grid(reply.display);
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


error_reply *
add_item_rpc_1(add_item_arg* argp, svc_req* )
{
    client_t* Me = client_t::me();

    error_reply& reply = *(error_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->add_item(argp->name, argp->x, argp->y);
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


error_reply *
remove_item_rpc_1(remove_item_arg* argp, svc_req* )
{
    client_t* Me = client_t::me();

    error_reply& reply = *(error_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->remove_item(argp->name);
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


error_reply *
move_item_rpc_1(add_item_arg* argp, svc_req* )
{
    client_t* Me = client_t::me();

    error_reply& reply = *(error_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->move_item(argp->name, argp->x, argp->y);
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


location_reply *
location_of_rpc_1(location_arg* argp, svc_req* )
{
    client_t* Me = client_t::me();

    location_reply& reply = *(location_reply*)Me->reply_buf;

    cmd_err_t err = Me->command_server->location_of(argp->name, reply.x, reply.y);
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}


spatial_reply *
spatial_rpc_1(spatial_arg* argp, svc_req* )
{
    client_t* Me = client_t::me();

    spatial_reply& reply = *(spatial_reply*)Me->reply_buf;

    // the spatial_arg is really an array of integers in the format
    // for an nbox_t
    nbox_t box(2, (int*)(&argp->x_low));

    cmd_err_t err = Me->command_server->spatial_query(box, reply.result);
    if (err) {
	strncpy(reply.error_msg, err, MAX_ERR_MSG_LEN);
	reply.error_msg[MAX_ERR_MSG_LEN-1] = '\0';
    } else {
	// no error message
	reply.error_msg[0] = '\0';
    }
    return &reply;
}
 

grid.h

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#ifndef GRID_H
#define GRID_H

/*
 * Class managing grid and items on the grid 
 */
class grid_t {
public:
    struct grid_info_t {
	serial_t item_file;	// ID of file of item records
	serial_t name_index;	// B+tree mapping names to item records
	serial_t spatial_index;	// R*tree mapping points to items
    };

    		grid_t();

    		~grid_t();
		// Access an already created grid
    void	init(const lvid_t& lvid, const grid_info_t& info);
    rc_t	add_item(const char* name, int x, int y);
    rc_t	remove_item(const char* name, bool& found);
    rc_t	move_item(const char* name, int x, int y, bool& found);
    rc_t	location_of(const char* name, int& x, int& y, bool& found);
    rc_t	generate_display(grid_display_t& display);
    rc_t	spatial_query(const nbox_t& box, spatial_result_t& result);
    rc_t	clear();	// remove all items from grid

    bool	is_initialized() const {return lvid != lvid_t::null;}
private:
    lvid_t	lvid;		// ID of volume containing grid;
    grid_info_t info;

    // get the ID of the item "name", return serial_t::null if
    // not found.
    rc_t	get_id(const char* name, serial_t& id);

};

#endif /* GRID_H */
 

grid.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#define GRID_C

#include "ShoreConfig.h"
#include <assert.h>
#include "sm_vas.h"
#include "grid_basics.h"
#include "grid.h"

// storage manger to use for all operations
extern ss_m* ssm;


/*
 * Prepare to create a new grid
 */
grid_t::grid_t()
{
}

grid_t::~grid_t()
{
    // make sure we can't reuse this space
    lvid = lvid_t::null;
}

/*
 * Access an already created grid
 */
void
grid_t::init(const lvid_t& _lvid, const grid_info_t& _info)
{
    lvid = _lvid;
    info = _info;
}

rc_t
grid_t::add_item(const char* name, int x, int y)
{
    assert(is_initialized());
    assert(strlen(name) <= MAX_NAME_LEN);
    rc_t rc;

    // create a savepoint, so that if any operation fails
    // we can roll back to this point;
    sm_save_point_t save_point;
    W_DO(ss_m::save_work(save_point));

    // create record for item
    item_t item(name, x, y);
    serial_t item_id;
    do {
	// create the item record
	//
	// note: the use of anonymous vectors since none of the data
	//       to store is scattered in memory
	// note: the ugly parameter comments are used because of a gcc bug
	rc = ssm->create_rec(lvid, info.item_file,	
			 vec_t(),	/* empty record header	*/
			 sizeof(item),	/* length hint		*/
			 vec_t(&item, sizeof(item)), /* body	*/
			 item_id);	/* new rec id		*/
	if (rc) break;

	// add item to index on name
	rc = ssm->create_assoc(lvid, info.name_index,		
			vec_t(item.name, strlen(item.name)), /* key */
			vec_t(&item_id, sizeof(item_id)));   /* element */

	if (rc) break;

	// 
	// add item to Rtree (spatial) index
	// 
	// coord holds the coordinates of the "rectangle" (actually a 
	// point) containing the item
	//
	int coord[4];
	coord[0] = coord[2] = x;
	coord[1] = coord[3] = y;
	nbox_t box(2, coord);
	rc = ssm->create_md_assoc(lvid, info.spatial_index, box,
			vec_t(&item_id, sizeof(item_id)));  // element

    } while (0);

    if (rc) {
	// an error occurred, so rollback to the savepoint
	W_DO(ss_m::rollback_work(save_point));
	return rc;
    }
    return RCOK;
}

rc_t
grid_t::remove_item(const char* name, bool& found)
{
    assert(is_initialized());

    serial_t	id;			// ID of item to remove
    W_DO(get_id(name, id));

    if (id == serial_t::null) {
	found = false;
	return RCOK;
    }

    // create a savepoint, so that if any operation fails
    // we can roll back to this point;
    sm_save_point_t save_point;
    W_DO(ss_m::save_work(save_point));

    rc_t rc;
    do {

	// remove item from index on name
	rc = ssm->destroy_assoc(lvid, info.name_index,
				vec_t(name, strlen(name)),
				vec_t(&id, sizeof(id)));
	if (rc) break;

	//
	// remove from spatial index (first must pin to find coordinates)
	//
	int coord[4];
	{
	    pin_i handle;
	    rc = handle.pin(lvid, id, 0);
	    if (rc) break;
	    const item_t* item = (const item_t*) handle.body();
	    coord[0] = coord[2] = item->x;
	    coord[1] = coord[3] = item->y;
	}
	nbox_t box(2, coord);
	rc = ssm->destroy_md_assoc(lvid, info.spatial_index, box,
				vec_t(&id, sizeof(id)));	/* element */
	if (rc) break;

	// destroy the item
	rc = ssm->destroy_rec(lvid, id);
	if (rc) break;

    } while (0);

    if (rc) {
	// an error occurred, so rollback to the savepoint
	W_DO(ss_m::rollback_work(save_point));
	return rc;
    }

    found = true;
    return RCOK;
}

rc_t
grid_t::move_item(const char* name, int x, int y, bool& found)
{
    assert(is_initialized());

    serial_t	id;			// ID of item to move
    W_DO(get_id(name, id));

    if (id == serial_t::null) {
	found = false;
	return RCOK;
    }

    // pin the item record
    pin_i handle;
    W_DO(handle.pin(lvid, id, 0));
    const item_t& item = *(const item_t*)handle.body();
    assert(strcmp(item.name, name) == 0);

    // 
    // Now we remove the item from the R*tree, update the
    // x,y coordinates, and re-insert it
    //
    // In general, the only way to "update" and index entry
    // is to remove it and re-insert the changed entry.
    //

    // We create a savepoint, so that if any operation fails
    // we can roll back to this point;
    sm_save_point_t save_point;
    W_DO(ss_m::save_work(save_point));

    rc_t rc;
    do {

	//
	// remove from spatial index (first must pin to find coordinates)
	//
	int coord[4];
	coord[0] = coord[2] = item.x;
	coord[1] = coord[3] = item.y;
	nbox_t box(2, coord);
	rc = ssm->destroy_md_assoc(lvid, info.spatial_index, box,
				vec_t(&id, sizeof(id)));	/* element */
	if (rc) break;
  
	//
  	// Now we need to update the item.  Note that it is illegal
	// to "update-in-place", so we create a new item and
	// use the update_rec method
	//

	// init a new item at the new location
	item_t new_item("", x, y);

	// update x,y in the pinned item 
	vec_t new_data(&new_item.x, sizeof(new_item.x) + sizeof(new_item.y));
	rc = handle.update_rec((smsize_t) offsetof(item_t, x), new_data);
	if (rc) break;

	//
	// add item back into the R*tree at new location
	//
	coord[0] = coord[2] = x;
	coord[1] = coord[3] = y;
	rc = ssm->create_md_assoc(lvid, info.spatial_index, nbox_t(2, coord),
			    vec_t(&id, sizeof(id)));	/* element */

	if (rc) break;

    } while (0);

    if (rc) {
	// an error occurred, so rollback to the savepoint
	W_DO(ss_m::rollback_work(save_point));
	return rc;
    }

    found = true;
    return RCOK;
}

rc_t
grid_t::location_of(const char* name, int& x, int& y, bool& found)
{
    assert(is_initialized());

    serial_t	id;			// ID of item to remove
    W_DO(get_id(name, id));

    if (id == serial_t::null) {
	found = false;
	return RCOK;
    }

    // pin the item record to get x,y
    pin_i handle;
    W_DO(handle.pin(lvid, id, 0));

    const item_t& item = *(const item_t*)handle.body();
    assert(strcmp(item.name, name) == 0);

    x = item.x;
    y = item.y;

    found = true;
    return RCOK;
}


rc_t
grid_t::generate_display(grid_display_t& display)
{
    for (int row = 0; row < MAX_GRID_Y; row++) {
	for (int col = 0; col < MAX_GRID_X; col++) {
	    display.rows[row][col] = '+';
	}
    }

    // add first character of each item item name the display
    scan_file_i	scan(lvid, info.item_file);
    pin_i*	handle;	// handle on current record
    bool	eof = false;
    const item_t* item;

    // scan item file and remove all items
    W_DO(scan.next(handle, 0, eof));
    while (!eof)  {
	item = (const item_t*)handle->body();
	display.rows[item->y][item->x] = item->name[0];
	W_DO(scan.next(handle, 0, eof));
    }

    return RCOK;
}


/*
 * This method removes all items and their associated index entries
 */
rc_t
grid_t::clear()
{
    // since we will eventually lock every item, we first obtain
    // an exclusive lock on the file and indexes so that finer
    // granularity locks are not obtained -- thus improving
    // the performance of this method
    W_DO(ss_m::lock(lvid, info.item_file, EX));
    W_DO(ss_m::lock(lvid, info.name_index, EX));
    W_DO(ss_m::lock(lvid, info.spatial_index, EX));

    scan_file_i	scan(lvid, info.item_file);
    pin_i*	handle;	// handle on current record
    bool	eof = false;
    const item_t*	item;
    rc_t	rc;

    // create a savepoint, so that if any operation fails
    // we can roll back to this point;
    sm_save_point_t save_point;
    W_DO(ss_m::save_work(save_point));

    // scan item file and remove all items
    rc = scan.next(handle, 0, eof);
    while (!eof && !rc) {
	item = (const item_t*)handle->body();
	const serial_t& id = handle->serial_no();

	// remove from name index (key==name, element==serial# of item) 
	rc = ss_m::destroy_assoc(lvid, info.name_index,
				vec_t(item->name, strlen(item->name)),
				vec_t(&id, sizeof(serial_t)));
	if (rc) break;
    
	// remove from spatial index
	int coord[4];
	coord[0] = coord[2] = item->x;
	coord[1] = coord[3] = item->y;
	nbox_t box(2, coord);
	rc = ssm->destroy_md_assoc(lvid, info.spatial_index, box,
				vec_t(&id, sizeof(serial_t)));
	if (rc) break;

	// remove item itself
	rc = ss_m::destroy_rec(lvid, id);
	if (rc) break;

	rc = scan.next(handle, 0, eof);
    }
    if (rc) {
	// an error occurred, so rollback to the savepoint
	W_DO(ss_m::rollback_work(save_point));
	return rc;
    }

    return RCOK;
}


rc_t
grid_t::spatial_query(const nbox_t& box, spatial_result_t& result)
{
    assert(is_initialized());

    result.found_cnt = 0;

    serial_t	id;
    smsize_t	id_len = sizeof(id);
    nbox_t	key;
    bool 	eof;
    pin_i 	handle;

    scan_rt_i scan(lvid, info.spatial_index, t_overlap, box);
    W_DO(scan.next(key, &id, id_len, eof));
    while (!eof) {
	assert(id_len == sizeof(id));
	if (result.found_cnt < MAX_SPATIAL_RESULT) {
	    // pin the item record to get x,y
	    W_DO(handle.pin(lvid, id, 0));

	    const item_t& item = *(const item_t*)handle.body();
	    strcpy(result.items[result.found_cnt].name, item.name);
	    result.items[result.found_cnt].x = item.x;
	    result.items[result.found_cnt].y = item.y;
	    assert(item.x == key.bound(0));
	    assert(item.y == key.bound(1));
	}

	result.found_cnt++;
	W_DO(scan.next(key, &id, id_len, eof));
    }
    return RCOK;
}



rc_t
grid_t::get_id(const char* name, serial_t& id)
{
    bool found;

    // find ID (serial#) of item
    // find_assoc will fill &id with the ID.  For safety, we
    // set id_len to sizeof(id) so that no bytes beyond id will
    // be written in case we accidentally put something to large in
    // the index.
    smsize_t	id_len = sizeof(id);  
    W_DO(ssm->find_assoc(lvid, info.name_index,
			vec_t(name, strlen(name)),
			&id, id_len, found));

    if (!found) {
	id = serial_t::null;
    }
    assert(id_len == sizeof(id)); 
    return RCOK;
}
 

rpc_thread.h

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#include "ShoreConfig.h"

#ifndef RPC_THREAD_H
#define RPC_THREAD_H

class client_t;

/*
 * Listener_t: Thread that listens for RPC connections from clients
 * Cleaner_t: Thread that deletes defunct client threads
 */

class listener_t; // forward

class cleaner_t: public smthread_t {
private: 
		bool  		_quit;
		listener_t *	_listener;
public:
		cleaner_t(listener_t *);
		~cleaner_t() {}
    
	void	kick();
	void	run();
	void	destroy();
};

class listener_t: public smthread_t {
    friend class cleaner_t;
public:
    		listener_t(int fd /*socket*/);
    		~listener_t();
    void	shutdown();

    // called by client_t thread to indicate it is starting/done
    void	child_is_done(w_link_t& child_link);
private:

    void		run();

    int                 _fd;	// socket to listen on
    sfile_read_hdl_t*	_ready; // read handler for the socket

    smutex_t		_clients_mutex;	// syncronizes access to _clients
    w_list_t<client_t>	_clients;	// list of client_t
    scond_t		_clients_empty;	// condition variable
    cleaner_t		*_cleaner_thread;
};


/*
 * Thread that manages a client connection and processes RPCs
 */
class client_t: public smthread_t {
    friend class cleaner_t;
	
public:
    			client_t(int fd /*socket*/, listener_t* parent);
    			~client_t();
    void		run();

    // return the current running thread.
    static client_t* 	me() { return (client_t*) smthread_t::me(); }

    // put reply messages here
    char 		reply_buf[thread_reply_buf_size];

    // the command_server implements RPCs
    command_server_t*	command_server;

    // this function returns the offset of _link for so that
    // listener_t can create a list of client_t objects
    static size_t	link_offset() {return offsetof(client_t, _link);}
private:
    int                 _fd;	// socket for incoming RPCs
    sfile_read_hdl_t*	_ready;	// read handler for the socket
    listener_t*		_parent;// for notify parent thread when finished
    w_link_t		_link;	// for listener_t list of clients
};


/*
 * Thread that monitors stdin (or some file descriptor) for commands
 */
class stdin_thread_t: public smthread_t {
public:
    			stdin_thread_t();
    			~stdin_thread_t();
    void		run();

    // put reply messages here
    char 		reply_buf[thread_reply_buf_size];

private:
    sfile_read_hdl_t*	_ready;	// read handler for stdin
};

#endif /* RPC_THREAD_H */
 

rpc_thread.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#define RPC_THREAD_C

#include "ShoreConfig.h"
#include <unistd.h>
#include <rpc/rpc.h>
#include "sm_vas.h"
#include "grid_basics.h"
#define RPC_SVC
#include "msg.h"
#include "grid.h"
#include "command.h"
#include "command_server.h"
#include "rpc_thread.h"

#ifdef __GNUG__
// This file uses the following templates.  For GCC we must
// explicitly instantiate them.
template class w_list_t<client_t>;
template class w_list_i<client_t>;
#endif

scond_t cleanup("cleanup.s");
smutex_t cleanup_mutex("cleanup.m");

listener_t::listener_t(int fd) :
    smthread_t(t_regular,	/* regular priority */
		 false, 	/* will run ASAP    */
		 false,		/* will not delete itself when done */
		 "listener"),	/* thread name */
    _fd(fd),
    _clients(client_t::link_offset()),
    _cleaner_thread(0)
{
    _cleaner_thread = new cleaner_t(this);
    if(!_cleaner_thread) {
	cerr << "cannot fork cleaner thread" <<endl;
	::exit(1);
    }
    W_COERCE(_cleaner_thread->fork());
}

listener_t::~listener_t()
{
    /* by the time we get here, client threads
    * should have been destroyed
    */
    W_COERCE(_clients_mutex.acquire(WAIT_FOREVER));
    assert(_clients.is_empty());
    _clients_mutex.release();

    cout << "listener exiting" << endl;
    _cleaner_thread->destroy();
    W_COERCE(_cleaner_thread->wait());
    delete _cleaner_thread;
    _cleaner_thread = 0;
}

void
listener_t::shutdown()
{
    // deactivate the file handler
    // prevents future connect requests from being accepted
    _ready->shutdown();
}

/*
 * The real work of the listener thread is done here.
 * This method loops waiting for connections and fork a thread
 * for each connection (keeping a list of the threads).
 * After shutdown() is called no more connections are accepted and
 * and the code waits for all clients to end
 */
void
listener_t::run()
{
    rc_t rc;

    cerr << "creating file handler for listener socket" << endl;
    _ready = new sfile_read_hdl_t(_fd);
    if(!_ready) {
	cerr << "Error: Out of Memory" << endl;
	::exit(1);
    }

    while (1) {
        fd_set fds;
        FD_ZERO(&fds);
        FD_SET(_fd, &fds);

	// wait for a connect request 
        rc = _ready->wait(WAIT_FOREVER);

        if (!rc) {
	    cerr << "listener detects connection" << endl;
            // have the rpc library do the accept.
            // it doesn't process any msgs.
            assert(FD_ISSET(_fd, &svc_fdset));
            svc_getreqset(&fds);  // get the connect request and
				  // call its handler

            if (! (FD_ISSET(_fd, &svc_fdset)))  {
		// The socket we're listening on was closed!!
		// This is an error case.
                cerr << "listener: RPC removed fd " 
                    << _fd << " from the set" << endl;
                break;
            }
        } else {
            // someone did a _ready->shutdown()
	    // server must be shutting down, so exit.
		
            break;
        }

	/*
 	 * At this point, RPC has accepted a new connection
	 * from a client.  We need to find out the socket being
	 * used to that we can fork a thread to service it.
	 */
        assert(FD_ISSET(_fd, &svc_fdset));
        int client_sock = -1;
        {
	    int i;
	    // loop over all file descriptors
	    const max_open_fds = sysconf(_SC_OPEN_MAX);
            for (i = 0; i < max_open_fds; i++) {
		/*
		 * If this file descriptor is serviced by RPC
		 * AND there is no active file descriptor handler
		 * for it, then this must be the new connection
		 */
                if (FD_ISSET(i, &svc_fdset) &&
		    !sfile_hdl_base_t::is_active(i)) {
                    client_sock = i;
                    break; // the for loop
                }
            }
        }
	// we must have found the connection
        assert(client_sock>0);

	/*
	 * Fork a thread to process requests from the new client
	 */
        {
            // fork a thread to get requests off the socket
	    client_t* c;
            c = new client_t(client_sock, this);
            if(!c ) {
                cerr << "Error: could not fork client thread." <<endl;
		::exit(1);
            }

	    W_COERCE(c->fork());

	    cerr << "Forked thread to handle client" << endl;

	    // put new thread on list of clients
	    W_COERCE(_clients_mutex.acquire(WAIT_FOREVER));
	    _clients.append(c);
	    _clients_mutex.release();
        }
    }

    _cleaner_thread->kick();

    /*
     * Wait for all client threads to end
     */
    cout << "listener waiting for all clients to end ..." << endl;
    // must get mutex on the list before checking it
    W_COERCE(_clients_mutex.acquire(WAIT_FOREVER));
    while(!_clients.is_empty()) {
	// this code waits free's the mutex protecting the list
	// and waits for _clients_empty to be signaled
	W_COERCE(_clients_empty.wait(_clients_mutex));
    }
    _clients_mutex.release();

    cout << "listener exiting" << endl;
    delete _ready;
}

/*
 * This is a "call-back" function called by children (client_t)
 * of listener.
 */
void
listener_t::child_is_done(w_link_t& 
#ifdef OLDWAY
    child
#endif
)
{
    // new way: just let child go to defunct state
    // and let cleaner_t remove the defunct threads and delete them
    _cleaner_thread->kick();

#ifdef OLDWAY
    // must get mutex on the list before changing it
    W_COERCE(_clients_mutex.acquire(WAIT_FOREVER));
    // remove the child from the list
    child.detach();
    if (_clients.is_empty()) {
	// tell the listener thread that there are no more children
	_clients_empty.signal();
    }
    _clients_mutex.release();
#endif
}


/********************************************************************
   Implementation of client_t:
	a thread class that manages a client connection and
	processes RPCs 
********************************************************************/


client_t::client_t(int fd, listener_t* parent)
    : smthread_t(t_regular,	/* regular priority */
		 false, 	/* will run ASAP    */
		 false,		/* will not delete itself when done */
		 "client"),	/* thread name */
      _fd(fd), _parent(parent)
{
}

client_t::~client_t()
{
}

void
client_t::run()
{
    cerr << "New client thread "
	<< smthread_t::me()->id
	<< " is running" << endl;

    _ready = new sfile_read_hdl_t(_fd);
    if (!_ready) {
	cerr << "Error: Out of Memory" << endl;
	::exit(1);
    }
    cerr << "client thread "
	<< smthread_t::me()->id
	<< " has read handler for fd "
	<< _fd << endl;

    // start up C++ side of RPCs processing
    command_server = new command_server_t;
    if (command_server == 0) {
	cerr << "Error: Out of memory" << endl;
	::exit(1);
    }
    cerr << "client thread "
	<< smthread_t::me()->id
	<< " has command server" << endl;

    fd_set fds;
    FD_ZERO(&fds);
    FD_SET(_fd, &fds);
    rc_t    rc;

    while(1) {
        rc = _ready->wait(WAIT_FOREVER);
        if (!rc) {

	    /**********************************************************
	     * The essence of RPC handling is in svc_getreqset, which
	     * calls the right RPC stub.  The RPC stub will then
	     * call the corresponding command_server_t method
	     **********************************************************/
            svc_getreqset(&fds);  // get the request and call its handler
            if (! (FD_ISSET(_fd, &svc_fdset)))  {
                    cerr << " client on fd " 
                    << _fd << " hung up" << endl;
                break;
            }
        } else {
            // someone called _ready->shutdown().
	    cerr << "exiting client thread for fd: " << _fd << endl;
            break;
        }
    }
    assert(sfile_read_hdl_t::is_active(_fd));
    delete _ready;
    cerr << "Thread "
	<< smthread_t::me()->id
	<< " deleted read file handler " <<endl;

    _ready = 0;
    assert(!sfile_read_hdl_t::is_active(_fd));

    cerr << "Thread "
	<< smthread_t::me()->id
	<< " deleted server " <<endl;

    delete command_server;
    command_server = 0;

    // tell the listener thread we are done
    _parent->child_is_done(_link);
}


/********************************************************************
   Implementation of stdin_thread_t:
	a thread class that manages input from the terminal
********************************************************************/

stdin_thread_t::stdin_thread_t() : 
    smthread_t(t_regular,	/* regular priority */
	       false, 		/* will run ASAP    */
	       false,		/* will not delete itself when done */
	       "stdin")		/* thread name */
{
}

stdin_thread_t::~stdin_thread_t()
{
}

void
stdin_thread_t::run()
{
    cerr << "Command thread is running" << endl;

    char 	line_buf[256];
    char* 	line;
    bool	quit = false;
    rc_t	rc;

    // start a command server
    command_server_t cmd_server;

    _ready = new sfile_read_hdl_t(0);	// handle stdin
    if (!_ready) {
	cerr << "Error: Out of Memory" << endl;
	::exit(1);
    }

    while (1) {
	cout << "Server> " ; cout.flush();
        rc = _ready->wait(WAIT_FOREVER);
        if(!rc) {
	    //cerr << "stdin ready" << endl;
	    line = fgets(line_buf, sizeof(line_buf)-1, stdin);
	    if (line == 0) {
		// end-of-file
		break;
	    }
	    cmd_server.parse_command(line_buf, quit);
	    if (quit) {
		// quit command was entered
		break; 
	    }
        } else {
            // someone called _ready->shutdown().
	    cerr << "exiting command thread " <<  endl;
            break;
        }
    }
    assert(sfile_read_hdl_t::is_active(0));
    delete _ready;
    _ready = 0;
    assert(!sfile_read_hdl_t::is_active(0));

    cout << "Shutting down command thread" << endl;
}


cleaner_t::cleaner_t(listener_t *_l)
 : smthread_t( smthread_t::t_regular, // priority
                false, // block_immediate
                false, // auto_delete
                "cleaner_t",     //thread name
                WAIT_FOREVER // don't block on locks
        )
{
    _quit = false;
    _listener = _l;
}

void
cleaner_t::kick()
{
    cleanup.broadcast();
}

void
cleaner_t::destroy()
{
    _quit = true;
    cleanup.broadcast();
}

void
cleaner_t::run()
{
    while(1) {
        if(_quit) {
            return;
        }

        // wait on condition
        w_rc_t e;
        W_COERCE(cleanup_mutex.acquire(WAIT_FOREVER));
        if(e=cleanup.wait(cleanup_mutex)) {
            cerr << e << endl; assert(0);
        }
        cleanup_mutex.release();

	W_COERCE(_listener->_clients_mutex.acquire(WAIT_FOREVER));
	{
	    // iterate over list of client_t
	    w_list_i<client_t>	i(_listener->_clients);	
	    client_t*		c;


	    while( (c=i.next()) ) {
		if(c->status() == t_defunct)  {
		    c->_link.detach();
		}
	    }
	}
	if (_listener->_clients.is_empty()) {
	    // tell the listener thread that there are no more children
	    _listener->_clients_empty.signal();
	}
	_listener->_clients_mutex.release();
    }
}

 

Client Code

 

Main: client.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements the main() code for the grid client program
 */

#include "ShoreConfig.h"
#include <stream.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <rpc/rpc.h>
#ifdef SOLARIS2
#include <rpc/clnt_soc.h>
#endif
#include <memory.h>

// The client program does not need all of the SSM stuff, so
// instead of sm_vas.h, only sm_app.h is included
#include "sm_app.h"

#include "grid_basics.h"
#define RPC_CLNT  /* so rpc prototypes are included */
#include "msg.h"
#include "grid.h"
#include "command.h"
#include "command_client.h"

// shorten error code type name
typedef w_rc_t rc_t;

// this is implemented in options.C
w_rc_t init_config_options(option_group_t& options,
                    	const char* prog_type,
                    	int& argc, char** argv);

// pointer to RPC service this client uses
CLIENT* client = 0;

/*
 * connect_to_server connects to the server on machine
 * "hostname" at port "port".  If connection succeeds, 
 * true is returned.  Otherwise a message is printed to
 * cerr and false is returned.
 */
bool
connect_to_server(const char* hostname, int port)
{
    /*
     * If the server was registered with the port mapper,
     * then this function would just call the RPC function
     * clnt_create().  Since the server is not, but is instead
     * listening on "port" then we need to use the RPC
     * function clnttcp_create().
     *
     * The RPC package shipped with Shore has a clnt_create_port
     * function that allows the port to be specified, eliminating
     * the need for most of the code below.
     */

    struct sockaddr_in	saddr;	// server is located at this address
    struct hostent* 	h; 	// server host information

    int 		sock;	// socket for connection


    h = gethostbyname(hostname);
    if (h == NULL) {
	cerr << "Error: machine: " << hostname << " is unkown" << endl;
	return false;
    }
    if (h->h_addrtype != AF_INET) {
	cerr << "Error: machine " << hostname << " does not have an internet address" << endl;
	return false;
    }

    // fill the socket address with host address information
    // see inet(4) for more information
    saddr.sin_family = h->h_addrtype;
    memset(saddr.sin_zero, 0, sizeof(saddr.sin_zero));
    memcpy( (char*)&saddr.sin_addr, h->h_addr, h->h_length);

    // set the port to connect to (using 0 would use the portmapper)
    saddr.sin_port = htons(port);

    // connect with the server
    cerr << "attempting server connection" << endl;
    sock = RPC_ANYSOCK; // connect using a new socket
    client = clnttcp_create(&saddr, GRID, GRIDVERS, &sock, 0, 0);
    if (client == 0) {
	cerr << "Error: clnttcp_create() could not connect to server" << endl;
	cerr << "       server may not be running" << endl;
	// print RCP error message
	clnt_pcreateerror(hostname);
	return false;
    }

    /*
     * Set timeout if rpc's do not return in 30 seconds.
     * Rpc's may block at the server while waiting for
     * locks, so this may need to be increased.
     *
     * Note RPC library shipped with Shore allows the
     * use of CLRMV_TIMEOUT to completely remove the timeout.
     */
    struct timeval tv;
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    if (!clnt_control(client, CLSET_TIMEOUT, (char *) &tv)) {
	cerr << "Error: could not set client timeout" << endl;
	return false;
    }

    return true;
}

void
disconnect_from_server()
{
    if (client) clnt_destroy(client);
}

void
process_user_commands()
{
    command_client_t cmd_client(client);

    char        line_buf[256];
    char*       line;
    bool	quit = false;

    cout << "Client ready." << endl;
    while(!quit) {
	cout << "client> " << flush;
	line = fgets(line_buf, sizeof(line_buf)-1, stdin);
	if (line == 0) {
	    // end of file
	    break;
	}
	cmd_client.parse_command(line_buf, quit);
    }
}


void
usage(option_group_t& options)
{
    cerr << "Usage: client [options]" << endl;
    cerr << "Valid options are: " << endl;
    options.print_usage(TRUE, cerr);
}


int
main(int argc, char* argv[])
{
    cout << "processing configuration options ..." << endl;

    // pointers to options we will create for the grid server program
    option_t* opt_server_host = 0;
    option_t* opt_connect_port = 0;

    const option_level_cnt = 3; 
    option_group_t options(option_level_cnt);

    W_COERCE(options.add_option("connect_port", "1024 < integer < 65535",
		     "1234", "port for connecting to grid server",
		     false, option_t::set_value_long,
		     opt_connect_port));

    W_COERCE(options.add_option("server_host", "host address",
		     "localhost", "address of host running server",
		     false, option_t::set_value_charstr,
		     opt_server_host));

    if (init_config_options(options, "client", argc, argv)) {
	usage(options);
	exit(1);
    }

    // there should not be any other command line arguments
    if (argc > 1) {
	usage(options);
	exit(1);
    }

    int port = strtol(opt_connect_port->value(), 0, 0);
    cout << "trying to connect to server at port " << port<< endl;
    if (!connect_to_server(opt_server_host->value(), port)) {
	cerr << "Shutting down due to connection failure" << endl;
	exit(1);
    }

    process_user_commands();

    disconnect_from_server();

    cout << "Finished!" << endl;
    return 0;
}
 

command_client.h

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

#ifndef COMMAND_CLIENT_H
#define COMMAND_CLIENT_H

/*
 * Client command processing class
 */
class command_client_t : public command_base_t {
public:
			command_client_t(CLIENT*);
    
    virtual cmd_err_t	commit_transaction();
    virtual cmd_err_t	abort_transaction();
    virtual cmd_err_t	clear_grid();
    virtual cmd_err_t	print_grid(grid_display_t& rows);
    virtual cmd_err_t	add_item(const char* name, int x, int y);
    virtual cmd_err_t	remove_item(const char* name);
    virtual cmd_err_t	move_item(const char* name, int x, int y);
    virtual cmd_err_t	location_of(const char* name, int& x, int& y);
    virtual cmd_err_t   spatial_query(const nbox_t& box, spatial_result_t& result);

private:
    CLIENT*		cl;
};

#endif /* COMMAND_CLIENT_H */
 

command_client.C

/* --------------------------------------------------------------- */
/* -- Copyright (c) 1994, 1995 Computer Sciences Department,    -- */
/* -- University of Wisconsin-Madison, subject to the terms     -- */
/* -- and conditions given in the file COPYRIGHT.  All Rights   -- */
/* -- Reserved.                                                 -- */
/* --------------------------------------------------------------- */

/*
 * This file implements the main() code for the grid client program
 */

#include "ShoreConfig.h"
#include <stream.h>
#include <string.h>
#include <rpc/rpc.h>
// include stuff needed for SM applications (clients)
#include "sm_app.h"
#include "grid_basics.h"
#define RPC_CLNT  /* so rpc prototypes are included */
#include "msg.h"
#include "grid.h"
#include "command.h"
#include "command_client.h"

#define DO_RPC(rpc, reply) \
    reply = rpc;					\
    if (reply == 0) {					\
	return clnt_sperror(cl, "");			\
    } else if (reply->error_msg[0] != 0) {		\
	return reply->error_msg;			\
    }


command_client_t::command_client_t(CLIENT* client)
	: cl(client)
{
}

cmd_err_t
command_client_t::commit_transaction()
{
    error_reply* reply;
    DO_RPC(commit_transaction_rpc_1(0, cl), reply);
    return 0; /* success */
}

cmd_err_t
command_client_t::abort_transaction()
{
    error_reply* reply;
    DO_RPC(abort_transaction_rpc_1(0, cl), reply);
    return 0; /* success */
}

cmd_err_t
command_client_t::clear_grid()
{
    error_reply* reply;
    DO_RPC(clear_grid_rpc_1(0, cl), reply);
    return 0; /* success */
}

cmd_err_t
command_client_t::print_grid(grid_display_t& display)
{
    print_grid_reply* reply;
    DO_RPC(print_grid_rpc_1(0, cl), reply);
    display = reply->display;
    return 0; /* success */
}

cmd_err_t
command_client_t::add_item(const char* name, int x, int y)
{
    error_reply* reply;
    add_item_arg arg;

    strncpy(arg.name, name, sizeof(arg.name)-1);
    arg.x = x; 
    arg.y = y; 
    
    DO_RPC(add_item_rpc_1(&arg, cl), reply);
    return 0; /* success */
}

cmd_err_t
command_client_t::remove_item(const char* name)
{
    error_reply* reply;
    remove_item_arg arg;

    strncpy(arg.name, name, sizeof(arg.name)-1);
    
    DO_RPC(remove_item_rpc_1(&arg, cl), reply);
    return 0; /* success */
}


cmd_err_t
command_client_t::move_item(const char* name, int x, int y)
{
    error_reply* reply;
    add_item_arg arg;
    arg.x = x; 
    arg.y = y; 

    strncpy(arg.name, name, sizeof(arg.name)-1);
    
    DO_RPC(move_item_rpc_1(&arg, cl), reply);
    return 0; /* success */
}


cmd_err_t
command_client_t::location_of(const char* name, int& x, int& y)
{
    location_reply* reply;
    location_arg arg;

    strncpy(arg.name, name, sizeof(arg.name)-1);
    
    DO_RPC(location_of_rpc_1(&arg, cl), reply);
    x = reply->x;
    y = reply->y;
    return 0; /* success */
}


cmd_err_t
command_client_t::spatial_query(const nbox_t& box, spatial_result_t& result)
{
    spatial_reply* reply;
    spatial_arg arg;

    arg.x_low = box.bound(0);
    arg.y_low = box.bound(1);
    arg.x_hi = box.bound(0+box.dimension());
    arg.y_hi = box.bound(1+box.dimension());

    DO_RPC(spatial_rpc_1(&arg, cl), reply);
    result = reply->result;
    return 0; /* success */
}



Marvin Solomon
Fri Aug 2 13:40:14 CDT 1996