Skip to content

Commit 387d061

Browse files
authored
Merge pull request aol#217 from aol/clean-load
configure datacleaner and dataloader conditionally
2 parents 2bcca85 + e1eb55a commit 387d061

File tree

17 files changed

+385
-9
lines changed

17 files changed

+385
-9
lines changed

micro-async-data-loader/readme.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ This plugin supports asyncrhonously reading data from a store such as S3 or Couc
1717

1818
This plugin must be used in conjunction with an implementation of the interfaces in micro-manifest-comparator (either micro-s3 or micro-couchbase should be on the classpath). The first steps to using the AsyncDataLoader should be to configure access to your data store as per the appropriate plugin (configure access keys for S3, servers / user / password for Couchbase).
1919

20+
To configure a manifest comparator for Couchbase please set this property
21+
22+
couchbase.manifest.comparison.key=<key>
23+
24+
To configure a manifest comparator for S3 please set this property
25+
26+
s3.manifest.comparator.key=<key>
27+
2028
### Additional properties are
2129

2230
asyc.data.schedular.cron.loader=0 * * * * *
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.aol.micro.server.async.data.loader;
2+
3+
public interface ConditionallyLoad {
4+
5+
public boolean shouldLoad();
6+
}

micro-async-data-loader/src/main/java/com/aol/micro/server/async/data/loader/ConfigureScheduling.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.List;
44
import java.util.concurrent.Executors;
5+
import java.util.function.BinaryOperator;
56

67
import org.springframework.beans.factory.annotation.Autowired;
78
import org.springframework.beans.factory.annotation.Value;
@@ -31,6 +32,8 @@ public class ConfigureScheduling {
3132

3233
@Autowired(required = false)
3334
private List<ManifestComparator> defaultComparators = ListX.empty();
35+
@Autowired(required = false)
36+
private List<ConditionallyLoad> predicates = ListX.empty();
3437

3538
private ListX<DataLoader> dataLoaders() {
3639
SetX<ManifestComparator> comparatorSet = SetX.fromIterable(dataLoaders)
@@ -46,9 +49,14 @@ private ListX<DataLoader> dataLoaders() {
4649

4750
@Bean
4851
public LoaderSchedular asyncDataLoader() {
52+
ConditionallyLoad cc = () -> true;
53+
BinaryOperator<ConditionallyLoad> accumulator = (cc1, cc2) -> () -> cc1.shouldLoad() && cc2.shouldLoad();
54+
4955
LoaderSchedular schedular = new LoaderSchedular(
5056
dataLoaders(),
51-
Executors.newScheduledThreadPool(schedularThreads), bus);
57+
Executors.newScheduledThreadPool(schedularThreads), bus,
58+
predicates.stream()
59+
.reduce(cc, accumulator));
5260
schedular.schedule();
5361
return schedular;
5462
}

micro-async-data-loader/src/main/java/com/aol/micro/server/async/data/loader/LoaderSchedular.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class LoaderSchedular {
1515
private ListX<DataLoader> loader;
1616
private final ScheduledExecutorService executor;
1717
private final EventBus bus;
18+
private final ConditionallyLoad condition;
1819

1920
public void schedule() {
2021

@@ -32,7 +33,9 @@ public void schedule() {
3233
}
3334

3435
private ReactiveSeq<SystemData<String, String>> create(DataLoader dl) {
35-
return ReactiveSeq.generate(() -> dl.scheduleAndLog())
36+
return ReactiveSeq.generate(() -> 1)
37+
.filter(in -> condition.shouldLoad())
38+
.map(in -> dl.scheduleAndLog())
3639
.peek(sd -> bus.post(sd));
3740
}
3841
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package app.loader.scheduled.off.com.aol.micro.server;
2+
3+
import javax.ws.rs.GET;
4+
import javax.ws.rs.Path;
5+
import javax.ws.rs.Produces;
6+
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
9+
import com.aol.cyclops.control.Maybe;
10+
import com.aol.cyclops.data.collections.extensions.persistent.PStackX;
11+
import com.aol.micro.server.async.data.loader.DataLoader;
12+
import com.aol.micro.server.auto.discovery.Rest;
13+
import com.aol.micro.server.distributed.DistributedMap;
14+
import com.aol.micro.server.events.SystemData;
15+
import com.google.common.eventbus.EventBus;
16+
import com.google.common.eventbus.Subscribe;
17+
18+
@Path("/couchbase")
19+
@Rest
20+
public class CouchbaseResource {
21+
22+
private final DistributedMap client;
23+
private volatile PStackX<SystemData> dataLoads = PStackX.empty();
24+
25+
@Autowired
26+
public CouchbaseResource(DistributedMap client, EventBus bus) {
27+
this.client = client;
28+
bus.register(this);
29+
}
30+
31+
@Subscribe
32+
public synchronized void events(SystemData event) {
33+
dataLoads = dataLoads.plus(event);
34+
35+
}
36+
37+
@GET
38+
@Path("/loading-events")
39+
@Produces("application/json")
40+
public synchronized PStackX<SystemData> loadingEvents() {
41+
return dataLoads;
42+
}
43+
44+
@GET
45+
@Path("/maybe")
46+
@Produces("application/json")
47+
public Maybe<String> maybe() {
48+
return Maybe.just("hello-world");
49+
}
50+
51+
@GET
52+
@Path("/get")
53+
public String bucket() {
54+
return client.get("hello")
55+
.toString();
56+
}
57+
58+
@GET
59+
@Path("/put")
60+
public String put() {
61+
client.put("hello", "world");
62+
return "added";
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package app.loader.scheduled.off.com.aol.micro.server;
2+
3+
import static org.hamcrest.CoreMatchers.containsString;
4+
import static org.junit.Assert.assertThat;
5+
import static org.junit.Assert.assertTrue;
6+
7+
import java.util.List;
8+
import java.util.concurrent.ExecutionException;
9+
10+
import org.couchbase.mock.CouchbaseMock;
11+
import org.junit.After;
12+
import org.junit.Before;
13+
import org.junit.Test;
14+
15+
import com.aol.micro.server.MicroserverApp;
16+
import com.aol.micro.server.config.Microserver;
17+
import com.aol.micro.server.module.ConfigurableModule;
18+
import com.aol.micro.server.rest.jackson.JacksonUtil;
19+
import com.aol.micro.server.testing.RestAgent;
20+
21+
@Microserver(properties = { "couchbaseServers", "http://localhost:8091/pools", "couchbasePassword", "",
22+
"couchbaseBucket", "beer-sample", "asyc.data.schedular.cron.loader", "* * * * * ?",
23+
"asyc.data.schedular.cron.cleaner", "* * * * * ?" })
24+
public class CouchbaseRunnerTest {
25+
26+
RestAgent rest = new RestAgent();
27+
28+
MicroserverApp server;
29+
30+
@Before
31+
public void startServer() {
32+
try {
33+
// couchbase already running?
34+
rest.get("http://localhost:8091/pools");
35+
} catch (Exception e) {
36+
// start mock couchbase
37+
CouchbaseMock.main(new String[] { "-S" });
38+
}
39+
server = new MicroserverApp(
40+
ConfigurableModule.builder()
41+
.context("simple-app")
42+
.build());
43+
44+
server.start();
45+
46+
}
47+
48+
@After
49+
public void stopServer() {
50+
server.stop();
51+
}
52+
53+
@Test
54+
public void runAppAndBasicTest() throws InterruptedException, ExecutionException {
55+
rest.get("http://localhost:8080/simple-app/couchbase/put");
56+
assertThat(rest.get("http://localhost:8080/simple-app/couchbase/get"), containsString("world"));
57+
58+
Thread.sleep(2000);
59+
String json = rest.getJson("http://localhost:8080/simple-app/couchbase/loading-events");
60+
List list = JacksonUtil.convertFromJson(json, List.class);
61+
System.out.println(list);
62+
assertTrue(list.size() == 0);
63+
64+
}
65+
66+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package app.loader.scheduled.off.com.aol.micro.server;
2+
3+
import org.springframework.stereotype.Component;
4+
5+
import com.aol.micro.server.async.data.loader.ConditionallyLoad;
6+
7+
@Component
8+
public class TurnOff implements ConditionallyLoad {
9+
10+
@Override
11+
public boolean shouldLoad() {
12+
return false;
13+
}
14+
15+
}

micro-async-data-writer/readme.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ This plugin supports asyncrhonously writing data to a store such as S3 or Couchb
1616

1717
This plugin must be used in conjunction with an implementation of the interfaces in micro-manifest-comparator (either micro-s3 or micro-couchbase should be on the classpath). The first steps to using the AsyncDataWriter should be to configure access to your data store as per the appropriate plugin (configure access keys for S3, servers / user / password for Couchbase).
1818

19+
1920
### Additional properties are
2021

2122
asyc.data.writer.threads=no. of threads for asynchronous writing
@@ -87,7 +88,7 @@ New ManifestComparators targeting a different data key can be created from preco
8788
return existing.withKey("new-data-key");
8889
}
8990

90-
91+
```
9192

9293
# Data Cleaner Plugin Features
9394

@@ -97,6 +98,24 @@ asyc.data.schedular.cron.cleaner=0 0 * * * ?
9798

9899
When the cleaner runs it triggers an event with processing and error stats published to a Guava Event Bus.
99100

101+
## Conditionally turning the cleaner off
102+
103+
The DataCleaner can be truned off conditionally by having a Spring Bean implement ConditionallyClean. shouldClean is called everytime the DataCleaner is scheduled to determine if it is run.
104+
105+
```java
106+
107+
@Component
108+
public class TurnOff implements ConditionallyClean {
109+
110+
@Override
111+
public boolean shouldClean() {
112+
113+
return false;
114+
}
115+
116+
}
117+
118+
```
100119

101120
## Getting The Microserver Async Data Writer Plugin
102121

micro-async-data-writer/src/main/java/com/aol/micro/server/async/data/cleaner/CleanerSchedular.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@ public class CleanerSchedular {
1414
private final ListX<DataCleaner> cleaner;
1515
private final ScheduledExecutorService executor;
1616
private final EventBus bus;
17+
private final ConditionallyClean condition;
1718

1819
public void schedule() {
1920
cleaner.forEach(cl -> {
20-
ReactiveSeq.generate(() -> cl.scheduleAndLog())
21+
ReactiveSeq.generate(() -> 1)
22+
.filter(in -> condition.shouldClean())
23+
.map(i -> cl.scheduleAndLog())
2124
.peek(sd -> bus.post(sd))
2225
.schedule(cl.getCron(), executor);
2326
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.aol.micro.server.async.data.cleaner;
2+
3+
public interface ConditionallyClean {
4+
5+
public boolean shouldClean();
6+
}

micro-async-data-writer/src/main/java/com/aol/micro/server/async/data/cleaner/ConfigureScheduling.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.List;
44
import java.util.concurrent.Executors;
5+
import java.util.function.BinaryOperator;
56

67
import org.springframework.beans.factory.annotation.Autowired;
78
import org.springframework.beans.factory.annotation.Value;
@@ -23,6 +24,8 @@ public class ConfigureScheduling {
2324

2425
@Autowired(required = false)
2526
private List<DataCleaner> dataCleaners = ListX.empty();
27+
@Autowired(required = false)
28+
private List<ConditionallyClean> predicates = ListX.empty();
2629

2730
@Autowired
2831
private EventBus bus;
@@ -42,9 +45,13 @@ private ListX<DataCleaner> dataCleaners() {
4245

4346
@Bean
4447
public CleanerSchedular asyncDataCleaner() {
48+
ConditionallyClean cc = () -> true;
49+
BinaryOperator<ConditionallyClean> accumulator = (cc1, cc2) -> () -> cc1.shouldClean() && cc2.shouldClean();
4550
CleanerSchedular schedular = new CleanerSchedular(
4651
dataCleaners(),
47-
Executors.newScheduledThreadPool(schedularThreads), bus);
52+
Executors.newScheduledThreadPool(schedularThreads), bus,
53+
predicates.stream()
54+
.reduce(cc, accumulator));
4855
schedular.schedule();
4956
return schedular;
5057
}
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.aol.micro.server.async.data.cleaner;
1+
package com.aol.micro.server.async.data.writer;
22

33
import java.util.List;
44
import java.util.concurrent.Executor;
@@ -12,8 +12,6 @@
1212
import org.springframework.context.annotation.Configuration;
1313

1414
import com.aol.cyclops.data.collections.extensions.standard.ListX;
15-
import com.aol.micro.server.async.data.writer.AsyncDataWriter;
16-
import com.aol.micro.server.async.data.writer.MultiDataWriter;
1715
import com.aol.micro.server.manifest.ManifestComparator;
1816
import com.google.common.eventbus.EventBus;
1917

micro-async-data-writer/src/main/java/com/aol/micro/server/async/data/writer/plugin/AsyncDataWriterPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import com.aol.cyclops.data.collections.extensions.standard.SetX;
66
import com.aol.micro.server.Plugin;
7-
import com.aol.micro.server.async.data.cleaner.ConfigureDataWriter;
87
import com.aol.micro.server.async.data.cleaner.ConfigureScheduling;
8+
import com.aol.micro.server.async.data.writer.ConfigureDataWriter;
99

1010
public class AsyncDataWriterPlugin implements Plugin {
1111

0 commit comments

Comments
 (0)