Our sample Grid tool is a standalone program called rxdispatch, which for this example has been built and installed for Linux platforms. Note: Some processing steps will be represented by overview comments instead of detailed blocks of code. The following is a portion of its hypothetical “man page”: Namerxdispatch. Dispatch a command for remote execution.
Synopsisrxdispatch submit [-d directory][-i filename] [-o filename] command rxdispatch manage [-k | -p|-r] jobid rxdispatch status [-m filename] jobid
Descriptionrxdispatch distributes computation tasks over a local network. It works in tandem with the daemon program rxmonitor, which oversees the set of machines in the local network used to run these tasks. It has three subcommands:
Options
SampleA sample Grid plug-in written around rxdispatch is shown below:
class RXDispatchGridPlugin
extends GridPlugin {
// Job status: 'rxdispatch status' return values:
private static final int RX_FAILED = 0;
private static final int RX_PAUSED = 1;
private static final int RX_RUNNING = 2;
private static final int RX_DONE = 3;
private static final String[] jobStatusName = {
ResMgr.getMessage(CLASS,0,"Job failed"),
ResMgr.getMessage(CLASS,0,"Job is paused"),
ResMgr.getMessage(CLASS,0,"Job is running"),
ResMgr.getMessage(CLASS,0,"Job is done")
};
private String jobId = null;
private int jobStatus = RX_FAIL;
// COMMAND TO POLL FOR THE JOB STATUS.
// WILL BE COMPLETED WHEN THE JOB STARTS.
private String[] statusCmd = new String[5] {
"rxdispatch", "status", "-m", null, null
};
public int submitJobAndWait(
String[] cmd,
String[] inFiles,
String[] outFiles,
File localWorkingDir,
Log log)
throws RtGridException {
if (jobId != null) {
throw new RtGridException(
"Job "+jobId+" is already started");
}
// BUILD THE JOB SUBMIT COMMAND:
int nSubmitCmd = 3 + cmd.length;
if (inFiles != null) {
nSubmitCmd += inFiles.length;
}
if (outFiles != null) {
nSubmitCmd += outFiles.length;
}
String[] submitCmd = new String[nSubmitCmd];
submitCmd[0] = "rtdispatch";
submitCmd[1] = "submit";
submitCmd[2] = "-d";
submitCmd[3] = localWorkingDir.getCanonicalPath();
// STRIP THE LOCAL WORKING DIRECTORY NAME
// OFF OF ALL INPUT AND OUTPUT FILE NAMES
// AND ADD ALL NECESSARY -i AND -o OPTIONS
// TO THE JOB SUBMIT COMMAND:
. . .
// COPY ALL ENTRIES OF cmd TO THE REMAINING
// ENTRIES OF THE JOB SUBMIT COMMAND:
. . .
// SUBMIT THE JOB AND MONITOR IT:
// [REMEMBER: JOB STARTS PAUSED OR FAILED]
jobId = runCmd(submitCmd);
// CREATE A UNIQUE TEMP FILE FOR THE ERROR
// TEXT RETURNED BY THE STATUS COMMAND:
File errorFile = new File(localWorkingdir,jobId+".txt");
statusCmd[3] = errorFile. getCanonicalPath();
statusCmd[4] = jobId;
int waitRet = 1;
jobStatus = getStatus();
if (jobStatus == RX_PAUSED) {
resumeJob();
waitRet = waitForJob();
}
// JOB IS DONE - CHECK FOR ERRORS:
if (waitRet != 0) {
if (errorFile.exists()) {
// OPEN THE ERROR TEXT FILE AND
// WRITE THE ERRORS TO THE LOG:
}
}
return waitRet;
}
public int waitForJob()
throws RtGridException {
if (jobId != null) {
// REPEATEDLY POLL FOR THE JOB STATUS:
while ((jobStatus=getStatus()) == RX_RUNNING) {
// THIS BLOCK OF CODE MUST BE HERE
// SO THE GRID PLUGIN CAN RESPOND
// WHEN THE COMPONENT TIMES OUT:
try {
Thread.sleep(15000); // 15 seconds
}
catch (InterruptedException ie) {
throw new RtGridException(
"Job "+jobID+" interrupted");
}
}
// ADHERE TO THE DOCUMENTED PROTOCOL:
if (jobStatus != RX_DONE) {
return 1;
}
else {
return 0;
}
}
else {
throw new RtGridException("No job was started");
}
}
private int getStatus()
throws RtGridException {
String strStatus = runRXDispatch(statusCmd);
return Integer.parseInt(strStatus);
}
public void terminateJob()
throws RtGridException {
if (jobStatus == RX_RUNNING) {
manageJob("-k");
jobStatus = RX_FAILED;
}
}
public void pauseJob()
throws RtGridException {
if (jobStatus == RX_RUNNING) {
manageJob("-p");
jobStatus = RX_PAUSED;
}
}
public void resumeJob()
throws RtGridException {
if (jobStatus == RX_PAUSED) {
manageJob("-r");
jobStatus = RX_RUNNING;
}
}
private void manageJob(String flag)
throws RtGridException {
if (jobId != null) {
String[] manageCmd = new String[4] {
"rxdispatch", "manage", flag, jobId};
runRXDispatch(manageCmd);
}
else {
throw new RtGridException("No job was started");
}
}
private String runCmd(String[] cmd)
throws RtGridException {
// THIS METHOD RUNS THE GIVEN COMMAND
// USING A STANDARD ESI UTILITY, RETURNING
// THE TEXT WRITTEN BY THE COMMAND TO
// STANDARD OUTPUT. IF TEXT IS WRITTEN TO
// STANDARD ERROR AN EXCEPTION IS THROWN.
}
public int getJobStatus()
throws RtGridException {
if (jobId != null) {
// ADHERE TO THE DOCUMENTED PROTOCOL:
if (jobStatus != RX_DONE) {
return 1;
}
else {
return 0;
}
}
else {
throw new RtGridException("No job was started");
}
}
public String getJobStatusAsString()
throws RtGridException {
if (jobId != null) {
return jobStatusName[jobStatus];
}
else {
throw new RtGridException("No job was started");
}
}
public String getJobIDAsString()
throws RtGridException {
if (jobId != null) {
return jobId;
}
else {
throw new RtGridException("No job was started");
}
}
public GridOptionsPanel getOptionsPanel ()
throws RtGridException {
return new RXDispatchGridOptionsPanel(this);
}
}
| |||||||