Skip to content

Commit 363d2c3

Browse files
committed
Work on iluwatar#74, improved documentation and minor changes
1 parent 7ac262b commit 363d2c3

File tree

11 files changed

+142
-115
lines changed

11 files changed

+142
-115
lines changed

reactor/src/main/java/com/iluwatar/reactor/app/App.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
1111

1212
/**
13-
* This application demonstrates Reactor pattern. It represents a Distributed Logging Service
14-
* where it can listen on multiple TCP or UDP sockets for incoming log requests.
13+
* This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging Service
14+
* where it listens on multiple TCP or UDP sockets for incoming log requests.
1515
*
1616
* <p>
1717
* <i>INTENT</i>
@@ -49,13 +49,10 @@ public class App {
4949

5050
/**
5151
* App entry.
52+
* @throws IOException
5253
*/
53-
public static void main(String[] args) {
54-
try {
55-
new App().start();
56-
} catch (IOException e) {
57-
e.printStackTrace();
58-
}
54+
public static void main(String[] args) throws IOException {
55+
new App().start();
5956
}
6057

6158
/**
@@ -70,12 +67,12 @@ public void start() throws IOException {
7067

7168
/*
7269
* This represents application specific business logic that dispatcher will call
73-
* on appropriate events. These events are read and write event in our example.
70+
* on appropriate events. These events are read events in our example.
7471
*/
7572
LoggingHandler loggingHandler = new LoggingHandler();
7673

7774
/*
78-
* Our application binds to multiple I/O channels and uses same logging handler to handle
75+
* Our application binds to multiple channels and uses same logging handler to handle
7976
* incoming log requests.
8077
*/
8178
reactor

reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java

Lines changed: 89 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,43 @@
99
import java.net.InetAddress;
1010
import java.net.InetSocketAddress;
1111
import java.net.Socket;
12-
import java.net.SocketException;
12+
import java.net.UnknownHostException;
1313
import java.util.concurrent.ExecutorService;
1414
import java.util.concurrent.Executors;
1515
import java.util.concurrent.TimeUnit;
1616

17+
/**
18+
* Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging
19+
* requests to Reactor.
20+
*
21+
* @author npathai
22+
*/
1723
public class AppClient {
18-
private ExecutorService service = Executors.newFixedThreadPool(3);
19-
20-
public static void main(String[] args) {
21-
new AppClient().start();
24+
private ExecutorService service = Executors.newFixedThreadPool(4);
25+
26+
/**
27+
* App client entry.
28+
* @throws IOException if any I/O error occurs.
29+
*/
30+
public static void main(String[] args) throws IOException {
31+
AppClient appClient = new AppClient();
32+
appClient.start();
2233
}
2334

24-
public void start() {
25-
service.execute(new LoggingClient("Client 1", 6666));
26-
service.execute(new LoggingClient("Client 2", 6667));
27-
service.execute(new UDPLoggingClient(6668));
35+
/**
36+
* Starts the logging clients.
37+
* @throws IOException if any I/O error occurs.
38+
*/
39+
public void start() throws IOException {
40+
service.execute(new TCPLoggingClient("Client 1", 6666));
41+
service.execute(new TCPLoggingClient("Client 2", 6667));
42+
service.execute(new UDPLoggingClient("Client 3", 6668));
43+
service.execute(new UDPLoggingClient("Client 4", 6668));
2844
}
29-
45+
46+
/**
47+
* Stops logging clients. This is a blocking call.
48+
*/
3049
public void stop() {
3150
service.shutdown();
3251
if (!service.isTerminated()) {
@@ -39,96 +58,106 @@ public void stop() {
3958
}
4059
}
4160

42-
/*
43-
* A logging client that sends logging requests to logging server
61+
private static void artificialDelayOf(long millis) {
62+
try {
63+
Thread.sleep(millis);
64+
} catch (InterruptedException e) {
65+
e.printStackTrace();
66+
}
67+
}
68+
69+
/**
70+
* A logging client that sends requests to Reactor on TCP socket.
4471
*/
45-
static class LoggingClient implements Runnable {
72+
static class TCPLoggingClient implements Runnable {
4673

4774
private int serverPort;
4875
private String clientName;
4976

50-
public LoggingClient(String clientName, int serverPort) {
77+
/**
78+
* Creates a new TCP logging client.
79+
*
80+
* @param clientName the name of the client to be sent in logging requests.
81+
* @param port the port on which client will send logging requests.
82+
*/
83+
public TCPLoggingClient(String clientName, int serverPort) {
5184
this.clientName = clientName;
5285
this.serverPort = serverPort;
5386
}
5487

5588
public void run() {
56-
Socket socket = null;
57-
try {
58-
socket = new Socket(InetAddress.getLocalHost(), serverPort);
89+
try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) {
5990
OutputStream outputStream = socket.getOutputStream();
6091
PrintWriter writer = new PrintWriter(outputStream);
61-
writeLogs(writer, socket.getInputStream());
92+
sendLogRequests(writer, socket.getInputStream());
6293
} catch (IOException e) {
6394
e.printStackTrace();
6495
throw new RuntimeException(e);
65-
} finally {
66-
if (socket != null) {
67-
try {
68-
socket.close();
69-
} catch (IOException e) {
70-
e.printStackTrace();
71-
}
72-
}
7396
}
7497
}
7598

76-
private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException {
99+
private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException {
77100
for (int i = 0; i < 4; i++) {
78101
writer.println(clientName + " - Log request: " + i);
79-
try {
80-
Thread.sleep(100);
81-
} catch (InterruptedException e) {
82-
e.printStackTrace();
83-
}
84102
writer.flush();
103+
85104
byte[] data = new byte[1024];
86105
int read = inputStream.read(data, 0, data.length);
87106
if (read == 0) {
88107
System.out.println("Read zero bytes");
89108
} else {
90109
System.out.println(new String(data, 0, read));
91110
}
111+
112+
artificialDelayOf(100);
92113
}
93114
}
115+
94116
}
95-
117+
118+
/**
119+
* A logging client that sends requests to Reactor on UDP socket.
120+
*/
96121
static class UDPLoggingClient implements Runnable {
97-
private int port;
122+
private String clientName;
123+
private InetSocketAddress remoteAddress;
98124

99-
public UDPLoggingClient(int port) {
100-
this.port = port;
125+
/**
126+
* Creates a new UDP logging client.
127+
*
128+
* @param clientName the name of the client to be sent in logging requests.
129+
* @param port the port on which client will send logging requests.
130+
* @throws UnknownHostException if localhost is unknown
131+
*/
132+
public UDPLoggingClient(String clientName, int port) throws UnknownHostException {
133+
this.clientName = clientName;
134+
this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
101135
}
102-
136+
103137
@Override
104138
public void run() {
105-
DatagramSocket socket = null;
106-
try {
107-
socket = new DatagramSocket();
139+
try (DatagramSocket socket = new DatagramSocket()) {
108140
for (int i = 0; i < 4; i++) {
109-
String message = "UDP Client" + " - Log request: " + i;
110-
try {
111-
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port));
112-
socket.send(packet);
113-
114-
byte[] data = new byte[1024];
115-
DatagramPacket reply = new DatagramPacket(data, data.length);
116-
socket.receive(reply);
117-
if (reply.getLength() == 0) {
118-
System.out.println("Read zero bytes");
119-
} else {
120-
System.out.println(new String(reply.getData(), 0, reply.getLength()));
121-
}
122-
} catch (IOException e) {
123-
e.printStackTrace();
141+
142+
String message = clientName + " - Log request: " + i;
143+
DatagramPacket request = new DatagramPacket(message.getBytes(),
144+
message.getBytes().length, remoteAddress);
145+
146+
socket.send(request);
147+
148+
byte[] data = new byte[1024];
149+
DatagramPacket reply = new DatagramPacket(data, data.length);
150+
socket.receive(reply);
151+
if (reply.getLength() == 0) {
152+
System.out.println("Read zero bytes");
153+
} else {
154+
System.out.println(new String(reply.getData(), 0, reply.getLength()));
124155
}
156+
157+
artificialDelayOf(100);
125158
}
126-
} catch (SocketException e1) {
159+
} catch (IOException e1) {
127160
e1.printStackTrace();
128-
} finally {
129-
if (socket != null) {
130-
socket.close();
131-
}
132161
}
133162
}
134163
}

reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
/**
1111
* Logging server application logic. It logs the incoming requests on standard console and returns
12-
* a canned acknowledgement back to the remote peer.
12+
* a canned acknowledgement back to the remote peer.
1313
*
1414
* @author npathai
1515
*/
@@ -23,17 +23,15 @@ public class LoggingHandler implements ChannelHandler {
2323
@Override
2424
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
2525
/*
26-
* As this channel is attached to both TCP and UDP channels we need to check whether
26+
* As this handler is attached with both TCP and UDP channels we need to check whether
2727
* the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel).
2828
*/
2929
if (readObject instanceof ByteBuffer) {
30-
byte[] data = ((ByteBuffer)readObject).array();
31-
doLogging(data);
32-
sendReply(channel, data, key);
30+
doLogging(((ByteBuffer)readObject));
31+
sendReply(channel, key);
3332
} else if (readObject instanceof DatagramPacket) {
3433
DatagramPacket datagram = (DatagramPacket)readObject;
35-
byte[] data = datagram.getData().array();
36-
doLogging(data);
34+
doLogging(datagram.getData());
3735
sendReply(channel, datagram, key);
3836
} else {
3937
throw new IllegalStateException("Unknown data received");
@@ -50,13 +48,13 @@ private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket
5048
channel.write(replyPacket, key);
5149
}
5250

53-
private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
51+
private void sendReply(AbstractNioChannel channel, SelectionKey key) {
5452
ByteBuffer buffer = ByteBuffer.wrap(ACK);
5553
channel.write(buffer, key);
5654
}
5755

58-
private void doLogging(byte[] data) {
56+
private void doLogging(ByteBuffer data) {
5957
// assuming UTF-8 :(
60-
System.out.println(new String(data));
58+
System.out.println(new String(data.array(), 0, data.limit()));
6159
}
6260
}

reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,25 @@ public SelectableChannel getChannel() {
5555
}
5656

5757
/**
58-
* The operation in which the channel is interested, this operation is be provided to {@link Selector}.
58+
* The operation in which the channel is interested, this operation is provided to {@link Selector}.
5959
*
6060
* @return interested operation.
6161
* @see SelectionKey
6262
*/
6363
public abstract int getInterestedOps();
6464

6565
/**
66-
* Requests the channel to bind.
66+
* Binds the channel on provided port.
6767
*
6868
* @throws IOException if any I/O error occurs.
6969
*/
7070
public abstract void bind() throws IOException;
7171

7272
/**
73-
* Reads the data using the key and returns the read data.
74-
* @param key the key which is readable.
73+
* Reads the data using the key and returns the read data. The underlying channel should be fetched using
74+
* {@link SelectionKey#channel()}.
75+
*
76+
* @param key the key on which read event occurred.
7577
* @return data read.
7678
* @throws IOException if any I/O error occurs.
7779
*/
@@ -106,7 +108,7 @@ void flush(SelectionKey key) throws IOException {
106108
/**
107109
* Writes the data to the channel.
108110
*
109-
* @param pendingWrite data which was queued for writing in batch mode.
111+
* @param pendingWrite the data to be written on channel.
110112
* @param key the key which is writable.
111113
* @throws IOException if any I/O error occurs.
112114
*/

reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@
77
* to it by the {@link Dispatcher}. This is where the application logic resides.
88
*
99
* <p>
10-
* A {@link ChannelHandler} is associated with one or many {@link AbstractNioChannel}s, and whenever
10+
* A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and whenever
1111
* an event occurs on any of the associated channels, the handler is notified of the event.
1212
*
1313
* @author npathai
1414
*/
1515
public interface ChannelHandler {
1616

1717
/**
18-
* Called when the {@code channel} has received some data from remote peer.
18+
* Called when the {@code channel} receives some data from remote peer.
1919
*
20-
* @param channel the channel from which the data is received.
20+
* @param channel the channel from which the data was received.
2121
* @param readObject the data read.
22-
* @param key the key from which the data is received.
22+
* @param key the key on which read event occurred.
2323
*/
2424
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
2525
}

reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
/**
66
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
77
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write
8-
* or connect, and then calls the {@link Dispatcher} to dispatch the event. This decouples the I/O
8+
* or connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the I/O
99
* processing from application specific processing.
1010
* <br/>
1111
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred.
@@ -24,6 +24,9 @@ public interface Dispatcher {
2424
* This hook method is called when read event occurs on particular channel. The data read
2525
* is provided in <code>readObject</code>. The implementation should dispatch this read event
2626
* to the associated {@link ChannelHandler} of <code>channel</code>.
27+
*
28+
* <p>
29+
* The type of <code>readObject</code> depends on the channel on which data was received.
2730
*
2831
* @param channel on which read event occurred
2932
* @param readObject object read by channel
@@ -32,7 +35,7 @@ public interface Dispatcher {
3235
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
3336

3437
/**
35-
* Stops the dispatching events and cleans up any acquired resources such as threads.
38+
* Stops dispatching events and cleans up any acquired resources such as threads.
3639
*/
3740
void stop();
3841
}

0 commit comments

Comments
 (0)