Skip to content

Commit 4b2847d

Browse files
committed
Add ChannelInterceptor to spring-messaging module
Issue: SPR-10866
1 parent 467a6b9 commit 4b2847d

12 files changed

+844
-61
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageHeaderAccessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ protected SimpMessageHeaderAccessor(SimpMessageType messageType, Map<String, Lis
6868
*/
6969
protected SimpMessageHeaderAccessor(Message<?> message) {
7070
super(message);
71-
Assert.notNull(message, "message is required");
7271
}
7372

7473

spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ public Map<String, List<String>> toNativeHeaderMap() {
107107
return result;
108108
}
109109

110-
protected List<String> getNativeHeader(String headerName) {
110+
/**
111+
* Return all values for the specified native header or {@code null}.
112+
*/
113+
public List<String> getNativeHeader(String headerName) {
111114
if (this.nativeHeaders.containsKey(headerName)) {
112115
return this.nativeHeaders.get(headerName);
113116
}
@@ -117,23 +120,28 @@ else if (this.originalNativeHeaders != null) {
117120
return null;
118121
}
119122

123+
/**
124+
* Return the first value for the specified native header of {@code null}.
125+
*/
120126
public String getFirstNativeHeader(String headerName) {
121127
List<String> values = getNativeHeader(headerName);
122128
return CollectionUtils.isEmpty(values) ? null : values.get(0);
123129
}
124130

125131
/**
126-
* Set the value for the given header name. If the provided value is {@code null} the
127-
* header will be removed.
132+
* Set the specified native header value.
128133
*/
129-
protected void putNativeHeader(String name, List<String> value) {
134+
public void setNativeHeader(String name, String value) {
130135
if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) {
131-
this.nativeHeaders.put(name, value);
136+
this.nativeHeaders.set(name, value);
132137
}
133138
}
134139

135-
public void setNativeHeader(String name, String value) {
136-
this.nativeHeaders.set(name, value);
140+
/**
141+
* Add the specified native header value.
142+
*/
143+
public void addNativeHeader(String name, String value) {
144+
this.nativeHeaders.add(name, value);
137145
}
138146

139147
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.support.channel;
18+
19+
import java.util.List;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
import org.springframework.beans.factory.BeanNameAware;
24+
import org.springframework.messaging.Message;
25+
import org.springframework.messaging.MessageChannel;
26+
import org.springframework.messaging.MessageDeliveryException;
27+
import org.springframework.messaging.MessagingException;
28+
import org.springframework.util.Assert;
29+
import org.springframework.util.ObjectUtils;
30+
31+
32+
/**
33+
* Abstract base class for {@link MessageChannel} implementations.
34+
*
35+
* @author Rossen Stoyanchev
36+
* @since 4.0
37+
*/
38+
public abstract class AbstractMessageChannel implements MessageChannel, BeanNameAware {
39+
40+
protected Log logger = LogFactory.getLog(getClass());
41+
42+
private String beanName;
43+
44+
private final ChannelInterceptorChain interceptorChain = new ChannelInterceptorChain();
45+
46+
47+
public AbstractMessageChannel() {
48+
this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
49+
}
50+
51+
/**
52+
* {@inheritDoc}
53+
* <p>Used primarily for logging purposes.
54+
*/
55+
@Override
56+
public void setBeanName(String name) {
57+
this.beanName = name;
58+
}
59+
60+
/**
61+
* @return the name for this channel.
62+
*/
63+
public String getBeanName() {
64+
return this.beanName;
65+
}
66+
67+
/**
68+
* Set the list of channel interceptors. This will clear any existing interceptors.
69+
*/
70+
public void setInterceptors(List<ChannelInterceptor> interceptors) {
71+
this.interceptorChain.set(interceptors);
72+
}
73+
74+
/**
75+
* Add a channel interceptor to the end of the list.
76+
*/
77+
public void addInterceptor(ChannelInterceptor interceptor) {
78+
this.interceptorChain.add(interceptor);
79+
}
80+
81+
/**
82+
* Return a read-only list of the configured interceptors.
83+
*/
84+
public List<ChannelInterceptor> getInterceptors() {
85+
return this.interceptorChain.getInterceptors();
86+
}
87+
88+
/**
89+
* Exposes the interceptor list for subclasses.
90+
*/
91+
protected ChannelInterceptorChain getInterceptorChain() {
92+
return this.interceptorChain;
93+
}
94+
95+
96+
@Override
97+
public final boolean send(Message<?> message) {
98+
return send(message, INDEFINITE_TIMEOUT);
99+
}
100+
101+
@Override
102+
public final boolean send(Message<?> message, long timeout) {
103+
104+
Assert.notNull(message, "Message must not be null");
105+
if (logger.isTraceEnabled()) {
106+
logger.trace("[" + this.beanName + "] sending message " + message);
107+
}
108+
109+
message = this.interceptorChain.preSend(message, this);
110+
if (message == null) {
111+
return false;
112+
}
113+
114+
try {
115+
boolean sent = sendInternal(message, timeout);
116+
this.interceptorChain.postSend(message, this, sent);
117+
return sent;
118+
}
119+
catch (Exception e) {
120+
if (e instanceof MessagingException) {
121+
throw (MessagingException) e;
122+
}
123+
throw new MessageDeliveryException(message,
124+
"Failed to send message to channel '" + this.getBeanName() + "'", e);
125+
}
126+
}
127+
128+
protected abstract boolean sendInternal(Message<?> message, long timeout);
129+
130+
}

spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractSubscribableChannel.java

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,8 @@
1616

1717
package org.springframework.messaging.support.channel;
1818

19-
import org.apache.commons.logging.Log;
20-
import org.apache.commons.logging.LogFactory;
21-
import org.springframework.beans.factory.BeanNameAware;
22-
import org.springframework.messaging.Message;
2319
import org.springframework.messaging.MessageHandler;
2420
import org.springframework.messaging.SubscribableChannel;
25-
import org.springframework.util.Assert;
26-
import org.springframework.util.ObjectUtils;
2721

2822

2923
/**
@@ -32,57 +26,17 @@
3226
* @author Rossen Stoyanchev
3327
* @since 4.0
3428
*/
35-
public abstract class AbstractSubscribableChannel implements SubscribableChannel, BeanNameAware {
29+
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel {
3630

37-
protected Log logger = LogFactory.getLog(getClass());
38-
39-
private String beanName;
40-
41-
42-
public AbstractSubscribableChannel() {
43-
this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
44-
}
45-
46-
/**
47-
* {@inheritDoc}
48-
* <p>Used primarily for logging purposes.
49-
*/
50-
@Override
51-
public void setBeanName(String name) {
52-
this.beanName = name;
53-
}
54-
55-
/**
56-
* @return the name for this channel.
57-
*/
58-
public String getBeanName() {
59-
return this.beanName;
60-
}
61-
62-
@Override
63-
public final boolean send(Message<?> message) {
64-
return send(message, INDEFINITE_TIMEOUT);
65-
}
66-
67-
@Override
68-
public final boolean send(Message<?> message, long timeout) {
69-
Assert.notNull(message, "Message must not be null");
70-
if (logger.isTraceEnabled()) {
71-
logger.trace("[" + this.beanName + "] sending message " + message);
72-
}
73-
return sendInternal(message, timeout);
74-
}
75-
76-
protected abstract boolean sendInternal(Message<?> message, long timeout);
7731

7832
@Override
7933
public final boolean subscribe(MessageHandler handler) {
8034
if (hasSubscription(handler)) {
81-
logger.warn("[" + this.beanName + "] handler already subscribed " + handler);
35+
logger.warn("[" + getBeanName() + "] handler already subscribed " + handler);
8236
return false;
8337
}
8438
if (logger.isDebugEnabled()) {
85-
logger.debug("[" + this.beanName + "] subscribing " + handler);
39+
logger.debug("[" + getBeanName() + "] subscribing " + handler);
8640
}
8741
return subscribeInternal(handler);
8842
}
@@ -94,7 +48,7 @@ public final boolean subscribe(MessageHandler handler) {
9448
@Override
9549
public final boolean unsubscribe(MessageHandler handler) {
9650
if (logger.isDebugEnabled()) {
97-
logger.debug("[" + this.beanName + "] unsubscribing " + handler);
51+
logger.debug("[" + getBeanName() + "] unsubscribing " + handler);
9852
}
9953
return unsubscribeInternal(handler);
10054
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2002-2010 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.support.channel;
18+
19+
import org.springframework.messaging.Message;
20+
import org.springframework.messaging.MessageChannel;
21+
22+
/**
23+
* Interface for interceptors that are able to view and/or modify the
24+
* {@link Message Messages} being sent-to and/or received-from a
25+
* {@link MessageChannel}.
26+
*
27+
* @author Mark Fisher
28+
* @since 4.0
29+
*/
30+
public interface ChannelInterceptor {
31+
32+
/**
33+
* Invoked before the Message is actually sent to the channel.
34+
* This allows for modification of the Message if necessary.
35+
* If this method returns <code>null</code>, then the actual
36+
* send invocation will not occur.
37+
*/
38+
Message<?> preSend(Message<?> message, MessageChannel channel);
39+
40+
/**
41+
* Invoked immediately after the send invocation. The boolean
42+
* value argument represents the return value of that invocation.
43+
*/
44+
void postSend(Message<?> message, MessageChannel channel, boolean sent);
45+
46+
/**
47+
* Invoked as soon as receive is called and before a Message is
48+
* actually retrieved. If the return value is 'false', then no
49+
* Message will be retrieved. This only applies to PollableChannels.
50+
*/
51+
boolean preReceive(MessageChannel channel);
52+
53+
/**
54+
* Invoked immediately after a Message has been retrieved but before
55+
* it is returned to the caller. The Message may be modified if
56+
* necessary. This only applies to PollableChannels.
57+
*/
58+
Message<?> postReceive(Message<?> message, MessageChannel channel);
59+
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.support.channel;
18+
19+
import org.springframework.messaging.Message;
20+
import org.springframework.messaging.MessageChannel;
21+
22+
/**
23+
* A {@link ChannelInterceptor} with empty method implementations as a convenience.
24+
*
25+
* @author Mark Fisher
26+
* @since 4.0
27+
*/
28+
public class ChannelInterceptorAdapter implements ChannelInterceptor {
29+
30+
31+
public Message<?> preSend(Message<?> message, MessageChannel channel) {
32+
return message;
33+
}
34+
35+
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
36+
}
37+
38+
public boolean preReceive(MessageChannel channel) {
39+
return true;
40+
}
41+
42+
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
43+
return message;
44+
}
45+
46+
}

0 commit comments

Comments
 (0)