Skip to content

Commit 50a4d82

Browse files
committed
[add] add some examples of concurrency,especially BlockingQueue
1 parent c017048 commit 50a4d82

File tree

7 files changed

+655
-0
lines changed

7 files changed

+655
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.brianway.learning.java.concurrent;
2+
3+
import java.io.InputStream;
4+
import java.net.ServerSocket;
5+
import java.net.Socket;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.TimeUnit;
9+
10+
/**
11+
* Interrupting a blocked task by closing the underlying resource.
12+
* {RunByHand}
13+
*/
14+
public class CloseResource {
15+
public static void main(String[] args) throws Exception {
16+
ExecutorService exec = Executors.newCachedThreadPool();
17+
ServerSocket server = new ServerSocket(8080);
18+
InputStream socketInput =
19+
new Socket("localhost", 8080).getInputStream();
20+
exec.execute(new IOBlocked(socketInput));
21+
exec.execute(new IOBlocked(System.in));
22+
TimeUnit.MILLISECONDS.sleep(100);
23+
System.out.println("Shutting down all threads");
24+
exec.shutdownNow();
25+
TimeUnit.SECONDS.sleep(1);
26+
System.out.println("Closing " + socketInput.getClass().getName());
27+
socketInput.close(); // Releases blocked thread
28+
TimeUnit.SECONDS.sleep(1);
29+
System.out.println("Closing " + System.in.getClass().getName());
30+
System.in.close(); // Releases blocked thread
31+
}
32+
}
33+
34+
/* Output: (85% match)
35+
Waiting for read():
36+
Waiting for read():
37+
Shutting down all threads
38+
Closing java.net.SocketInputStream
39+
Interrupted from blocked I/O
40+
Exiting IOBlocked.run()
41+
Closing java.io.BufferedInputStream
42+
Exiting IOBlocked.run()
43+
*///:~
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.brianway.learning.java.concurrent;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.Future;
8+
import java.util.concurrent.TimeUnit;
9+
10+
/**
11+
* Interrupting a blocked thread.
12+
*
13+
*
14+
*/
15+
public class Interrupting {
16+
private static ExecutorService exec =
17+
Executors.newCachedThreadPool();
18+
19+
static void test(Runnable r) throws InterruptedException {
20+
Future<?> f = exec.submit(r);
21+
TimeUnit.MILLISECONDS.sleep(100);
22+
System.out.println("Interrupting " + r.getClass().getSimpleName());
23+
f.cancel(true); // Interrupts if running
24+
System.out.println("Interrupt sent to " + r.getClass().getSimpleName());
25+
}
26+
27+
public static void main(String[] args) throws Exception {
28+
test(new SleepBlocked());
29+
test(new IOBlocked(System.in));
30+
test(new SynchronizedBlocked());
31+
TimeUnit.SECONDS.sleep(3);
32+
System.out.println("Aborting with System.exit(0)");
33+
System.exit(0); // ... since last 2 interrupts failed
34+
}
35+
}
36+
37+
/**
38+
* 可中断的阻塞示例
39+
*/
40+
class SleepBlocked implements Runnable {
41+
public void run() {
42+
try {
43+
TimeUnit.SECONDS.sleep(100);
44+
} catch (InterruptedException e) {
45+
System.out.println("InterruptedException");
46+
}
47+
System.out.println("Exiting SleepBlocked.run()");
48+
}
49+
}
50+
51+
class IOBlocked implements Runnable {
52+
private InputStream in;
53+
54+
public IOBlocked(InputStream is) {
55+
in = is;
56+
}
57+
58+
public void run() {
59+
try {
60+
System.out.println("Waiting for read():");
61+
in.read();
62+
} catch (IOException e) {
63+
if (Thread.currentThread().isInterrupted()) {
64+
System.out.println("Interrupted from blocked I/O");
65+
} else {
66+
throw new RuntimeException(e);
67+
}
68+
}
69+
System.out.println("Exiting IOBlocked.run()");
70+
}
71+
}
72+
73+
class SynchronizedBlocked implements Runnable {
74+
public synchronized void f() {
75+
while (true) // Never releases lock
76+
Thread.yield();
77+
}
78+
79+
public SynchronizedBlocked() {
80+
new Thread() {
81+
public void run() {
82+
f(); // Lock acquired by this thread
83+
}
84+
}.start();
85+
}
86+
87+
public void run() {
88+
System.out.println("Trying to call f()");
89+
f();
90+
System.out.println("Exiting SynchronizedBlocked.run()");
91+
}
92+
}
93+
94+
/* Output: (95% match)
95+
Interrupting SleepBlocked
96+
Interrupt sent to SleepBlocked
97+
InterruptedException
98+
Exiting SleepBlocked.run()
99+
Waiting for read():
100+
Interrupting IOBlocked
101+
Interrupt sent to IOBlocked
102+
Trying to call f()
103+
Interrupting SynchronizedBlocked
104+
Interrupt sent to SynchronizedBlocked
105+
Aborting with System.exit(0)
106+
*///:~
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.brianway.learning.java.concurrent;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
/**
6+
* General idiom for interrupting a task.
7+
* {Args: 1100}
8+
*/
9+
public class InterruptingIdiom {
10+
public static void main(String[] args) throws Exception {
11+
if (args.length != 1) {
12+
System.out.println("usage: java InterruptingIdiom delay-in-mS");
13+
System.exit(1);
14+
}
15+
Thread t = new Thread(new Blocked3());
16+
t.start();
17+
TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
18+
t.interrupt();
19+
}
20+
}
21+
22+
class NeedsCleanup {
23+
private final int id;
24+
25+
public NeedsCleanup(int ident) {
26+
id = ident;
27+
System.out.println("NeedsCleanup " + id);
28+
}
29+
30+
public void cleanup() {
31+
System.out.println("Cleaning up " + id);
32+
}
33+
}
34+
35+
class Blocked3 implements Runnable {
36+
private volatile double d = 0.0;
37+
38+
public void run() {
39+
try {
40+
while (!Thread.interrupted()) {
41+
// point1
42+
NeedsCleanup n1 = new NeedsCleanup(1);
43+
// Start try-finally immediately after definition
44+
// of n1, to guarantee proper cleanup of n1:
45+
try {
46+
System.out.println("Sleeping");
47+
TimeUnit.SECONDS.sleep(1);
48+
// point2
49+
NeedsCleanup n2 = new NeedsCleanup(2);
50+
// Guarantee proper cleanup of n2:
51+
try {
52+
System.out.println("Calculating");
53+
// A time-consuming, non-blocking operation:
54+
for (int i = 1; i < 2500000; i++)
55+
d = d + (Math.PI + Math.E) / d;
56+
System.out.println("Finished time-consuming operation");
57+
} finally {
58+
n2.cleanup();
59+
}
60+
} finally {
61+
n1.cleanup();
62+
}
63+
}
64+
System.out.println("Exiting via while() test");
65+
} catch (InterruptedException e) {
66+
System.out.println("Exiting via InterruptedException");
67+
}
68+
}
69+
}
70+
71+
/* Output: (Sample)
72+
NeedsCleanup 1
73+
Sleeping
74+
NeedsCleanup 2
75+
Calculating
76+
Finished time-consuming operation
77+
Cleaning up 2
78+
Cleaning up 1
79+
NeedsCleanup 1
80+
Sleeping
81+
Cleaning up 1
82+
Exiting via InterruptedException
83+
*///:~
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.brianway.learning.java.concurrent;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Random;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.TimeUnit;
9+
10+
class Count {
11+
private int count = 0;
12+
private Random rand = new Random(47);
13+
14+
// Remove the synchronized keyword to see counting fail:
15+
public synchronized int increment() {
16+
int temp = count;
17+
if (rand.nextBoolean()) // Yield half the time
18+
{
19+
Thread.yield();
20+
}
21+
return (count = ++temp);
22+
}
23+
24+
public synchronized int value() {
25+
return count;
26+
}
27+
}
28+
29+
class Entrance implements Runnable {
30+
private static Count count = new Count();
31+
private static List<Entrance> entrances =
32+
new ArrayList<Entrance>();
33+
private int number = 0;
34+
// Doesn't need synchronization to read:
35+
private final int id;
36+
private static volatile boolean canceled = false;
37+
38+
// Atomic operation on a volatile field:
39+
public static void cancel() {
40+
canceled = true;
41+
}
42+
43+
public Entrance(int id) {
44+
this.id = id;
45+
// Keep this task in a list. Also prevents
46+
// garbage collection of dead tasks:
47+
entrances.add(this);
48+
}
49+
50+
public void run() {
51+
while (!canceled) {
52+
synchronized (this) {
53+
++number;
54+
}
55+
System.out.println(this + " Total: " + count.increment());
56+
try {
57+
TimeUnit.MILLISECONDS.sleep(100);
58+
} catch (InterruptedException e) {
59+
System.out.println("sleep interrupted");
60+
}
61+
}
62+
System.out.println("Stopping " + this);
63+
}
64+
65+
public synchronized int getValue() {
66+
return number;
67+
}
68+
69+
public String toString() {
70+
return "Entrance " + id + ": " + getValue();
71+
}
72+
73+
public static int getTotalCount() {
74+
return count.value();
75+
}
76+
77+
public static int sumEntrances() {
78+
int sum = 0;
79+
for (Entrance entrance : entrances)
80+
sum += entrance.getValue();
81+
return sum;
82+
}
83+
}
84+
85+
public class OrnamentalGarden {
86+
public static void main(String[] args) throws Exception {
87+
ExecutorService exec = Executors.newCachedThreadPool();
88+
for (int i = 0; i < 5; i++)
89+
exec.execute(new Entrance(i));
90+
// Run for a while, then stop and collect the data:
91+
TimeUnit.SECONDS.sleep(3);
92+
Entrance.cancel();
93+
exec.shutdown();
94+
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
95+
System.out.println("Some tasks were not terminated!");
96+
}
97+
System.out.println("Total: " + Entrance.getTotalCount());
98+
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
99+
}
100+
}
101+

0 commit comments

Comments
 (0)