Your task is to create a sequencer-based multicasting service, using a slight variant of the simple version of the Amoeba protocol (the first protocol described in www.cdk3.net/coordination), and a small client application to exercise it. You will use Java RMI and multicast sockets.
The main classes in your implementation will be:
Below you will find:
You need to write SequencerImpl.java and History.java from scratch.
Note that the scheme you produce will not precisely
implement the Amoeba protocol. In that protocol, messages are
sent unreliably to the sequencer in the first instance. In yours,
messages to be multicast are handed to the sequencer using RMI.
SequencerImpl should take as an argument the IP multicast address that the corresponding Groups are to use. To avoid collisions with numbers chosen by others, incorporate a random number into the multicast IP address that you use -- e.g. 234.day.month.rand, where day and month are chosen from a team member's birthday, and rand is a random number between 1 and 254.
Don't set your sockets' time-to-live (TTL) to more than
1. That way, your multicast packets will not be transmitted beyond
the local Mbone router.
This project is suitable for implementation by groups of two or three students. The following division of labour might be used:
Teams hand in all code and a written report of between four and six pages, explaining the design and its rationale.
Teams are required to demonstrate their implementation. The
demonstration should include as many as possible of the following
features: simple message sending, stress-testing, recovery from
simulated multicast datagram loss, heartbeat messages and history
truncation.
package sequencer;
import java.rmi.*;
import java.net.*;
import java.io.*;
public interface Sequencer extends Remote
{
// join -- request for "sender" to
join sequencer's multicasting service;
// returns an object specifying the multicast
address and the first sequence number to expect
public SequencerJoinInfo join(String sender)
throws RemoteException, SequencerException;
// send -- "sender" supplies the
msg to be sent, its identifier,
// and the sequence number of the last received
message
public void send(String sender, byte[] msg, long
msgID, long lastSequenceReceived)
throws RemoteException;
// leave -- tell sequencer that "sender"
will no longer need its services
public void leave(String sender)
throws RemoteException;
// getMissing -- ask sequencer for the message
whose sequence number is "sequence"
public byte[] getMissing(String sender, long sequence)
throws RemoteException,
SequencerException;
// heartbeat -- we have received messages
up to number "lastSequenceReceived"
public void heartbeat(String sender, long lastSequenceReceived)
throws RemoteException;
}
package sequencer;
import java.io.*;
import java.net.*;
public class SequencerJoinInfo implements Serializable
{
public InetAddress addr;
public long sequence;
public SequencerJoinInfo(InetAddress addr,
long sequence)
{
this.addr = addr;
this.sequence = sequence;
}
}
package sequencer;
import java.io.*;
public class SequencerException extends Exception implements
Serializable
{
public SequencerException(String s)
{
super(s);
}
}
package sequencer;
import java.net.*;
import java.util.*;
import java.io.*;
import java.rmi.*;
public class Group implements Runnable
{
public Group(String host, MsgHandler handler, String senderName) throws GroupException
{
// contact Sequencer on "host"
to join group,
// create MulticastSocket
and thread to listen on it,
// perform other initialisations
}
public void send(byte[] msg) throws GroupException
{
// send the given message
to all instances of Group using the same sequencer
}
public void leave()
{
// leave group
}
public void run()
{
// repeatedly: listen
to MulticastSocket created in constructor, and on receipt
// of a datagram call
"handle" on the instance
// of Group.MsgHandler
which was supplied to the constructor
}
public interface MsgHandler
{
public void handle(int
count, byte[] msg);
}
public class GroupException extends Exception
{
public GroupException(String
s)
{
super(s);
}
}
public class HeartBeater extends Thread
{
// This thread sends
heartbeat messages when required
}
}
ByteArrayOutputStream bstream =
new ByteArrayOutputStream(MAX_MSG_LENGTH);
DataOutputStream dstream = new DataOutputStream(bstream);
dstream.writeLong(aLong); // marshals a Long into the byte array
underlying bstream
.....
After marshalling, the data can be obtained from bstream for inclusion in a DatagramPacket:
byte[] theData = bstream.toByteArray();
(For unmarshalling there are corresponding classes ByteArrayInputStream
and DataInputStream.)