|
The JavaPorts Application Programming Interface (API)
The following is a list of the allowable
message passing operations on a Port object. Any combination of these methods
can be used for anonymous message passing among tasks of a distributed
application. In anonymous communications the task and port to
which a message is sent is never mentioned explicitly and may not even be known
to the developer of a task.
Every Port maintains a list for buffering incoming messages. A message key
identifies the port list element to be used for writing or reading a
message.
- public Object AsyncRead(int MsgKey) throws RemoteException;
Looks at the port list element MsgKey
to determine whether a new message has arrived. If a message has arrived
it returns a handle to the message object.
The calling task can use this handle to retrieve the contents of the
message. AsyncRead returns null and terminates if either the message has not arrived
yet, or it has been read already. AsyncRead never blocks the calling task.
- public void AsyncWrite(Object msg, int MsgKey) throws RemoteException;
Spawns a new thread in order
to transfer (asynchronously) Object using key
MsgKey.
The message will be stored in the receiving task's port list
element MsgKey.
The calling task is not blocked.
- public Object SyncRead(int MsgKey) throws RemoteException;
Same as the AsyncRead, but it blocks the calling task until a message
has arrived at the MsgKey
port list element.
- public void SyncWrite(Object msg, int MsgKey) throws RemoteException;
Same as the AsyncWrite but the calling task will block until a read method with
the same MsgKey
is posted by the receiving task. No new thread is spawned.
Implementation of an algorithm
The automatically generated templates are edited to implement a parallel Mandelbrot set computation.
The Display task sends an image to be computed to the DynamicWorker task. The DynamicWorker distributes
the total work to all available worker tasks using a dynamic load balancing scheme. Each worker task
performs the same computation on a different portion of the image, then returns its results to the
DynamicManager, which sends these results back to the Display task to update the screen image.
The tasks architecture was depicted by the sample task graph.
The developers of cooperating tasks need to share only the structure of the messages to be exchanged
and the port(s) their own task should use for communications. As long as this information is
available, the development of each task can proceed independently.
Completed Java Program for Tasks T3 and T4 of the task graph
package Mandelbrot;
import SCLASS.*;
public class MandelbrotWorker extends Thread {
private Port[] port_;
private String AppName_;
private String TaskVarName_;
// User application variables
int readkey = 0;
int writekey = 0;
int iterations[][];
long time = 0;
public MandelbrotWorker(String AppName, String TaskVarName) {
super();
AppName_ = AppName;
TaskVarName_ = TaskVarName;
}
public synchronized void run () {
try {
// register ports
PortManager portmanager = new PortManager();
port_ = portmanager.configure(AppName_, TaskVarName_);
// read first message
Message data = (Message)port_[0].SyncRead(readkey++);
// loop until EXIT message received
while (data.status != Message.EXIT) {
// compute Mandelbrot set (code not shown)
Message result = new Message(data.x1, data.x2, data.y1,
data.y2, iterations, data.maxiter, time, TaskVarName_+" (Java)");
// return result
port_[0].AsyncWrite(result, writekey++);
// read next message
data = (Message)port_[0].SyncRead(readkey++);
}
// distributed termination
portmanager.release();
} catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
public static void main (String args[]) {
MandelbrotWorker MandelbrotWorkerThread =
new MandelbrotWorker(args[0], args[1]);
MandelbrotWorkerThread.start();
try {
MandelbrotWorkerThread.join();
} catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
}
Completed Matlab Program for Task T5 of the task graph
function MMWorker(AppName, TaskVarName)
import SCLASS.*;
import Mandelbrot.*;
portmanager = PortManager;
port = portmanager.configure(AppName, TaskVarName);
readkey = 0;
writekey = 0;
% read first message
data = port(1).SyncRead(readkey);
readkey = readkey + 1;
% loop until EXIT message received
while (data.status ~= Message.EXIT)
% process data
iterations = mSets([real_min real_max imag_min imag_max], maxiter, [xrange yrange])
% send result
result = Message(data.x1, data.x2, data.y1, data.y2, iterations, data.maxiter, time, [TaskVarName ' (Matlab)']);
port(1).SyncWrite(result, writekey);
writekey = writekey + 1;
% read next message
data = port(1).SyncRead(readkey);
readkey = readkey + 1;
end
portmanager.release;
quit;
|