Skip to content

Commit ea80d7f

Browse files
committed
Add ConcurrentInmemoryMessageService
1 parent 2ccefef commit ea80d7f

File tree

3 files changed

+137
-39
lines changed

3 files changed

+137
-39
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package ie.emeraldjava.inmemory.message;
2+
3+
import ie.emeraldjava.inmemory.consumer.MessageConsumer;
4+
import ie.emeraldjava.inmemory.producer.MessageProducer;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
@Slf4j
8+
abstract class AbstractInmemoryMessageService implements MessageService {
9+
10+
MessageProcessor requestProcessor;
11+
MessageProcessor responseProcessor;
12+
MessageProcessor producerControlProcessor;
13+
protected MessageProcessor consumerControlProcessor;
14+
15+
public void registerProcessorForTopic(MessageProcessor processor, Topic topic) {
16+
log.info("register[{},{},{}]",topic,processor.hashCode(),processor.getClass().getName());
17+
if(topic.equals(Topic.REQUEST)) {
18+
this.requestProcessor=processor;
19+
} else if(topic.equals(Topic.RESPONSE)) {
20+
this.responseProcessor = processor;
21+
} else if(topic.equals(Topic.CONTROL)) {
22+
if(processor instanceof MessageProducer) {
23+
this.producerControlProcessor = processor;
24+
log.info("register:consumerControlProcessor[{}]",this.producerControlProcessor.hashCode());
25+
} else if (processor instanceof MessageConsumer){
26+
this.consumerControlProcessor = processor;
27+
log.info("register:producerControlProcessor[{}]",this.consumerControlProcessor.hashCode());
28+
}
29+
}
30+
}
31+
32+
public abstract void publishMessage(Topic topic, Object message);
33+
34+
MessageProcessor getMessageProcessor(Topic topic,Object message) {
35+
MessageProcessor p = null;
36+
if(topic.equals(Topic.REQUEST)) {
37+
p = this.requestProcessor;
38+
} else if(topic.equals(Topic.RESPONSE)) {
39+
p = this.responseProcessor;
40+
} else if(topic.equals(Topic.CONTROL)) {
41+
if(message instanceof String) {
42+
p = this.consumerControlProcessor;
43+
} else {
44+
p = this.producerControlProcessor;
45+
}
46+
}
47+
return p;
48+
}
49+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package ie.emeraldjava.inmemory.message;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
import lombok.Setter;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.beans.factory.annotation.Value;
8+
import org.springframework.stereotype.Component;
9+
10+
import javax.annotation.PostConstruct;
11+
import java.util.concurrent.Executor;
12+
import java.util.concurrent.Executors;
13+
14+
@Slf4j
15+
@Component
16+
public class ConcurrentInmemoryMessageService extends AbstractInmemoryMessageService implements MessageService {
17+
18+
@Value("${threadPoolSize:4")
19+
private int threadPoolSize = 4;
20+
21+
private Executor executor = Executors.newFixedThreadPool(threadPoolSize);
22+
23+
@PostConstruct
24+
public void setup() {
25+
log.info("ConcurrentInmemoryMessageService");
26+
}
27+
28+
@Override
29+
public void publishMessage(Topic topic, Object message) {
30+
executor.execute(new MyRunnable(topic,message));
31+
}
32+
33+
@Getter
34+
@Setter
35+
@AllArgsConstructor
36+
class MyRunnable implements Runnable {
37+
38+
Topic topic;
39+
Object message;
40+
41+
public void run() {
42+
log.info("publishMessage[{},{},{},{}]",topic,message.hashCode(),message,(message instanceof String));
43+
MessageProcessor p = getMessageProcessor(topic,message);
44+
log.info("-->processor[{}]",p.hashCode());
45+
p.processMessage(topic, message);
46+
log.info("<--processor[{}]",p.hashCode());
47+
}
48+
}
49+
50+
51+
}
Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
11
package ie.emeraldjava.inmemory.message;
22

3-
import ie.emeraldjava.inmemory.consumer.MessageConsumer;
4-
import ie.emeraldjava.inmemory.producer.MessageProducer;
53
import lombok.extern.slf4j.Slf4j;
64
import org.springframework.stereotype.Component;
75

86
@Slf4j
97
@Component
10-
public class InmemoryMessageService implements MessageService {
8+
public class InmemoryMessageService extends AbstractInmemoryMessageService implements MessageService {
119

12-
private MessageProcessor requestProcessor;
13-
private MessageProcessor responseProcessor;
14-
private MessageProcessor producerControlProcessor;
15-
private MessageProcessor consumerControlProcessor;
10+
// private MessageProcessor requestProcessor;
11+
// private MessageProcessor responseProcessor;
12+
// private MessageProcessor producerControlProcessor;
13+
// private MessageProcessor consumerControlProcessor;
1614

17-
@Override
18-
public void registerProcessorForTopic(MessageProcessor processor, Topic topic) {
19-
log.info("register[{},{},{}]",topic,processor.hashCode(),processor.getClass().getName());
20-
if(topic.equals(Topic.REQUEST)) {
21-
this.requestProcessor=processor;
22-
} else if(topic.equals(Topic.RESPONSE)) {
23-
this.responseProcessor = processor;
24-
} else if(topic.equals(Topic.CONTROL)) {
25-
if(processor instanceof MessageProducer) {
26-
this.producerControlProcessor = processor;
27-
log.info("register:consumerControlProcessor[{}]",this.producerControlProcessor.hashCode());
28-
} else if (processor instanceof MessageConsumer){
29-
this.consumerControlProcessor = processor;
30-
log.info("register:producerControlProcessor[{}]",this.consumerControlProcessor.hashCode());
31-
}
32-
}
33-
}
15+
// @Override
16+
// public void registerProcessorForTopic(MessageProcessor processor, Topic topic) {
17+
// log.info("register[{},{},{}]",topic,processor.hashCode(),processor.getClass().getName());
18+
// if(topic.equals(Topic.REQUEST)) {
19+
// this.requestProcessor=processor;
20+
// } else if(topic.equals(Topic.RESPONSE)) {
21+
// this.responseProcessor = processor;
22+
// } else if(topic.equals(Topic.CONTROL)) {
23+
// if(processor instanceof MessageProducer) {
24+
// this.producerControlProcessor = processor;
25+
// log.info("register:consumerControlProcessor[{}]",this.producerControlProcessor.hashCode());
26+
// } else if (processor instanceof MessageConsumer){
27+
// this.consumerControlProcessor = processor;
28+
// log.info("register:producerControlProcessor[{}]",this.consumerControlProcessor.hashCode());
29+
// }
30+
// }
31+
// }
3432

3533
@Override
3634
public void publishMessage(Topic topic, Object message) {
@@ -41,19 +39,19 @@ public void publishMessage(Topic topic, Object message) {
4139
log.info("<--processor[{}]",p.hashCode());
4240
}
4341

44-
private MessageProcessor getMessageProcessor(Topic topic,Object message) {
45-
MessageProcessor p = null;
46-
if(topic.equals(Topic.REQUEST)) {
47-
p = this.requestProcessor;
48-
} else if(topic.equals(Topic.RESPONSE)) {
49-
p = this.responseProcessor;
50-
} else if(topic.equals(Topic.CONTROL)) {
51-
if(message instanceof String) {
52-
p = this.consumerControlProcessor;
53-
} else {
54-
p = this.producerControlProcessor;
55-
}
56-
}
57-
return p;
58-
}
42+
// private MessageProcessor getMessageProcessor(Topic topic,Object message) {
43+
// MessageProcessor p = null;
44+
// if(topic.equals(Topic.REQUEST)) {
45+
// p = this.requestProcessor;
46+
// } else if(topic.equals(Topic.RESPONSE)) {
47+
// p = this.responseProcessor;
48+
// } else if(topic.equals(Topic.CONTROL)) {
49+
// if(message instanceof String) {
50+
// p = this.consumerControlProcessor;
51+
// } else {
52+
// p = this.producerControlProcessor;
53+
// }
54+
// }
55+
// return p;
56+
// }
5957
}

0 commit comments

Comments
 (0)