Skip to content

Commit a824fe3

Browse files
committed
Add AsyncUtils
** Probably not for Spring 4 M1 **
1 parent e556307 commit a824fe3

File tree

2 files changed

+455
-0
lines changed

2 files changed

+455
-0
lines changed
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.task;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.Future;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
28+
import org.springframework.cglib.proxy.Enhancer;
29+
import org.springframework.cglib.proxy.MethodInterceptor;
30+
import org.springframework.cglib.proxy.MethodProxy;
31+
import org.springframework.core.GenericTypeResolver;
32+
import org.springframework.core.task.support.ExecutorServiceAdapter;
33+
import org.springframework.util.Assert;
34+
35+
/**
36+
* Miscellaneous utilities for dealing with operations.
37+
*
38+
* @author Phillip Webb
39+
* @since 4.0
40+
*/
41+
public abstract class AsyncUtils {
42+
43+
/**
44+
* Submit a method call via an existing object for asynchronous execution. A proxy
45+
* of the specified source object is returned where calls to methods on that object
46+
* result in submission of a new executor task. Any non-void methods will return a
47+
* {@link #proxyFutureResult(Future) proxy} for the future result.
48+
*
49+
* <p>This method cab be particularly useful to speed up initialization code,
50+
* for example:
51+
*
52+
* <pre>{@code
53+
* PersistenceProvider provider = AsyncUtils.submitVia(executorService, persistenceProvider);
54+
* // This is usually a slow operation
55+
* managerFactory = provider.createContainerEntityManagerFactory();
56+
* // a reference to managerFacory can be held indefinitely.
57+
* // If no methods are called the thread is not blocked.
58+
* }</code>
59+
*
60+
* <p>This method can only be used with non-final non-private classes and methods.
61+
* @param executor the executor used to run the task (may be {@code null} to execute
62+
* in the current thread)
63+
* @param source the source object
64+
* @return a proxy of the source object that can be used to submit tasks
65+
* @see #submitVia(AsyncTaskExecutor, Object, Class)
66+
* @see #proxyFutureResult(Future)
67+
*/
68+
public static <T> T submitVia(AsyncTaskExecutor executor, T source) {
69+
return submitVia(asExecutorService(executor), source);
70+
}
71+
72+
/**
73+
* Submit a method call via an existing object for asynchronous execution. See
74+
* {@link #submitVia(AsyncTaskExecutor, Object)} for details.
75+
* @param executor the executor used to run the task (may be {@code null} to execute
76+
* in the current thread)
77+
* @param source the source object
78+
* @return a proxy of the source object that can be used to submit tasks
79+
*/
80+
@SuppressWarnings("unchecked")
81+
public static <T> T submitVia(ExecutorService executor, T source) {
82+
return submitVia(executor, source, (Class<T>) source.getClass());
83+
}
84+
85+
/**
86+
* Submit a method call via an existing object for asynchronous execution. See
87+
* {@link #submitVia(AsyncTaskExecutor, Object)} for details.
88+
* @param executor the executor used to run the task (may be {@code null} to execute
89+
* in the current thread)
90+
* @param source the source object
91+
* @param resultType The type of proxy to result
92+
* @return a proxy of the source object that can be used to submit tasks
93+
*/
94+
public static <T> T submitVia(AsyncTaskExecutor executor, T source, final Class<T> resultType) {
95+
return submitVia(new ExecutorServiceAdapter(executor), source, resultType);
96+
}
97+
98+
/**
99+
* Submit a method call via an existing object for asynchronous execution. See
100+
* {@link #submitVia(AsyncTaskExecutor, Object)} for details.
101+
* @param executor the executor used to run the task (may be {@code null} to execute
102+
* in the current thread)
103+
* @param source the source object
104+
* @param resultType The type of proxy to result
105+
* @return a proxy of the source object that can be used to submit tasks
106+
*/
107+
@SuppressWarnings("unchecked")
108+
public static <T> T submitVia(final ExecutorService executor, T source, final Class<T> resultType) {
109+
Assert.notNull(source, "Source must not be null");
110+
Assert.notNull(resultType, "ResultType must not be null");
111+
112+
Enhancer enhancer = new Enhancer();
113+
enhancer.setSuperclass(resultType);
114+
enhancer.setCallback(new MethodInterceptor() {
115+
public Object intercept(final Object obj, Method method, final Object[] args,
116+
final MethodProxy proxy) throws Throwable {
117+
Class returnType = method.getReturnType();
118+
if(executor == null) {
119+
return proxy.invokeSuper(obj, args);
120+
}
121+
Future<?> future = executor.submit(new Callable<Object>() {
122+
public Object call() throws Exception {
123+
try {
124+
return proxy.invokeSuper(obj, args);
125+
}
126+
catch (Throwable ex) {
127+
if(ex instanceof Exception) {
128+
throw (Exception) ex;
129+
}
130+
throw new RuntimeException(ex);
131+
}
132+
}
133+
});
134+
return proxyFutureResult(future, returnType);
135+
}
136+
});
137+
return (T) enhancer.create();
138+
}
139+
140+
/**
141+
* Submit the specified task to the given executor, or if the executor is {@code null}
142+
* run the task immediately.
143+
* @param executor the executor (may be {@code null}
144+
* @param task the task to execute
145+
* @return the future result
146+
*/
147+
public static <T> Future<T> submit(AsyncTaskExecutor executor, Callable<T> task) {
148+
return submit(asExecutorService(executor), task);
149+
}
150+
151+
/**
152+
* Submit the specified task to the given executor, or if the executor is {@code null}
153+
* run the task immediately.
154+
* @param executor the executor (may be {@code null}
155+
* @param task the task to execute
156+
* @return the future result
157+
*/
158+
public static <T> Future<T> submit(ExecutorService executor, Callable<T> task) {
159+
Assert.notNull(task, "Task must not be null");
160+
return (executor == null ? new CompletedFuture<T>(task) : executor.submit(task));
161+
}
162+
163+
/**
164+
* Submit the specified task to the given executor, or if the executor is {@code null}
165+
* run the task immediately.
166+
* @param executor the executor (may be {@code null}
167+
* @param task the task to execute
168+
* @return the future result
169+
*/
170+
public static Future<?> submit(AsyncTaskExecutor executor, Runnable task) {
171+
return submit(asExecutorService(executor), task);
172+
}
173+
174+
/**
175+
* Submit the specified task to the given executor, or if the executor is {@code null}
176+
* run the task immediately.
177+
* @param executor the executor (may be {@code null}
178+
* @param task the task to execute
179+
* @return the future result
180+
*/
181+
public static Future<?> submit(ExecutorService executor, Runnable task) {
182+
Assert.notNull(task, "Task must not be null");
183+
return (executor == null ? new CompletedFuture<Object>(Executors.callable(task)) : executor.submit(task));
184+
}
185+
186+
/**
187+
* Submit the specified task to the given executor, or if the executor is {@code null}
188+
* run the task immediately.
189+
* @param executor the executor (may be {@code null}
190+
* @param task the task to execute
191+
* @param result the result
192+
* @return the future result
193+
*/
194+
public static <T> Future<T> submit(AsyncTaskExecutor executor, Runnable task, T result) {
195+
return submit(asExecutorService(executor), task, result);
196+
}
197+
198+
/**
199+
* Submit the specified task to the given executor, or if the executor is {@code null}
200+
* run the task immediately.
201+
* @param executor the executor (may be {@code null}
202+
* @param task the task to execute
203+
* @param result the result
204+
* @return the future result
205+
*/
206+
public static <T> Future<T> submit(ExecutorService executor, Runnable task, T result) {
207+
Assert.notNull(task, "Task must not be null");
208+
return (executor == null ? new CompletedFuture<T>(Executors.callable(task, result)) : executor.submit(task, result));
209+
}
210+
211+
/**
212+
* Return a proxy for some Future result. Calls on the proxy will cause the
213+
* {@link Future#get()} method to be called.
214+
* @param future the future
215+
* @return a proxy of the future type {@code <T>}
216+
*/
217+
@SuppressWarnings("unchecked")
218+
public static <T> T proxyFutureResult(Future<T> future) {
219+
Assert.notNull(future, "Future must not be null");
220+
Class<?> type = GenericTypeResolver.resolveTypeArgument(future.getClass(),
221+
Future.class);
222+
Assert.notNull(type, "Unable to resolve generic type for Future");
223+
return proxyFutureResult(future, (Class<T>) type);
224+
}
225+
226+
/**
227+
* Return a proxy for some Future result. Calls on the proxy will cause the
228+
* {@link Future#get()} method to be called.
229+
* @param future the future
230+
* @param the result type
231+
* @return a proxy of the future type {@code <T>}
232+
*/
233+
@SuppressWarnings("unchecked")
234+
public static <T> T proxyFutureResult(final Future<T> future, Class<? super T> resultType) {
235+
Assert.notNull(future, "Future must not be null");
236+
Assert.notNull(resultType, "ResultType must not be null");
237+
238+
// No need to proxy an already completed future
239+
if (future.isDone()) {
240+
return get(future);
241+
}
242+
243+
Enhancer enhancer = new Enhancer();
244+
enhancer.setSuperclass(resultType);
245+
enhancer.setCallback(new MethodInterceptor() {
246+
public Object intercept(Object obj, Method method, Object[] args,
247+
MethodProxy proxy) throws Throwable {
248+
return method.invoke(get(future), args);
249+
}
250+
});
251+
252+
try {
253+
return (T) enhancer.create();
254+
}
255+
catch (IllegalArgumentException ex) {
256+
throw new IllegalArgumentException("Unable to proxy future result", ex);
257+
}
258+
}
259+
260+
/**
261+
* Calls {@link Future#get()}, wrapping any checked exceptions in
262+
* {@link RuntimeException}.
263+
* @param future the future
264+
* @return the result of {@link Future#get()}
265+
*/
266+
public static <T> T get(Future<T> future) {
267+
Assert.notNull(future, "Future must not be null");
268+
try {
269+
return future.get();
270+
}
271+
catch (InterruptedException ex) {
272+
Thread.currentThread().interrupt();
273+
throw new RuntimeException(ex);
274+
}
275+
catch (ExecutionException ex) {
276+
throw new RuntimeException(ex);
277+
}
278+
}
279+
280+
/**
281+
* Adapt a {@link AsyncTaskExecutor} to a {@link ExecutorService}.
282+
* @param executor the {@link AsyncTaskExecutor} (may be {@code null})
283+
* @return an {@link ExecutorService} or {@code null} of the source executor was {@code null}
284+
*/
285+
private static ExecutorService asExecutorService(AsyncTaskExecutor executor) {
286+
return (executor == null ? null : new ExecutorServiceAdapter(executor));
287+
}
288+
289+
290+
/**
291+
* Simple {@link Future} implementation that resolves a task immediately.
292+
* @param <T> The type
293+
*/
294+
private static class CompletedFuture<T> implements Future<T> {
295+
296+
private T result;
297+
298+
private ExecutionException exception;
299+
300+
301+
public CompletedFuture(Callable<T> task) {
302+
try {
303+
this.result = task.call();
304+
}
305+
catch (Exception ex) {
306+
this.exception = new ExecutionException(ex);
307+
}
308+
}
309+
310+
311+
public boolean cancel(boolean mayInterruptIfRunning) {
312+
return true;
313+
}
314+
315+
public boolean isCancelled() {
316+
return false;
317+
}
318+
319+
public boolean isDone() {
320+
return true;
321+
}
322+
323+
public T get(long timeout, TimeUnit unit) throws InterruptedException,
324+
ExecutionException, TimeoutException {
325+
return get();
326+
}
327+
328+
public T get() throws InterruptedException, ExecutionException {
329+
if(this.exception != null) {
330+
throw this.exception;
331+
}
332+
return result;
333+
}
334+
}
335+
}

0 commit comments

Comments
 (0)