Skip to content

Commit 255e06a

Browse files
Mitja Gomboclukas-krecan
authored andcommitted
Provider for Cassandra project added. Lock provider and storage accessor implementation
1 parent 8095629 commit 255e06a

File tree

7 files changed

+430
-0
lines changed

7 files changed

+430
-0
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
<module>providers/redis/shedlock-provider-redis-jedis</module>
4444
<module>providers/redis/shedlock-provider-redis-spring</module>
4545
<module>providers/dynamodb/shedlock-provider-dynamodb</module>
46+
<module>providers/cassandra/shedlock-provider-cassandra</module>
4647
</modules>
4748

4849
<properties>
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<parent>
5+
<artifactId>shedlock-parent</artifactId>
6+
<groupId>net.javacrumbs.shedlock</groupId>
7+
<version>4.1.1-SNAPSHOT</version>
8+
<relativePath>../../../pom.xml</relativePath>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>shedlock-provider-cassandra</artifactId>
13+
<version>4.1.1-SNAPSHOT</version>
14+
15+
<properties>
16+
<java-driver.version>4.3.1</java-driver.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>net.javacrumbs.shedlock</groupId>
22+
<artifactId>shedlock-core</artifactId>
23+
<version>${project.version}</version>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>com.datastax.oss</groupId>
28+
<artifactId>java-driver-core</artifactId>
29+
<version>${java-driver.version}</version>
30+
</dependency>
31+
32+
<dependency>
33+
<groupId>com.datastax.oss</groupId>
34+
<artifactId>java-driver-query-builder</artifactId>
35+
<version>${java-driver.version}</version>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>net.javacrumbs.shedlock</groupId>
40+
<artifactId>shedlock-test-support</artifactId>
41+
<version>${project.version}</version>
42+
<scope>test</scope>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>ch.qos.logback</groupId>
47+
<artifactId>logback-classic</artifactId>
48+
<scope>test</scope>
49+
</dependency>
50+
</dependencies>
51+
52+
<build>
53+
<plugins>
54+
<plugin>
55+
<groupId>org.apache.maven.plugins</groupId>
56+
<artifactId>maven-jar-plugin</artifactId>
57+
<configuration>
58+
<archive>
59+
<manifestEntries>
60+
<Automatic-Module-Name>
61+
net.javacrumbs.shedlock.provider.cassandra
62+
</Automatic-Module-Name>
63+
</manifestEntries>
64+
</archive>
65+
</configuration>
66+
</plugin>
67+
</plugins>
68+
</build>
69+
70+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package net.javacrumbs.shedlock.provider.cassandra;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import net.javacrumbs.shedlock.support.StorageBasedLockProvider;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
/**
8+
* Cassandra Lock Provider needs a keyspace and uses a lock table
9+
* <br>
10+
* Example creating keyspace and table
11+
* <pre>
12+
* CREATE KEYSPACE shedlock with replication={'class':'SimpleStrategy', 'replication_factor':1} and durable_writes=true;
13+
* CREATE TABLE shedlock.lock (name text PRIMARY KEY, lockUntil timestamp, lockedAt timestamp, lockedBy text);
14+
* </pre>
15+
*/
16+
public class CassandraLockProvider extends StorageBasedLockProvider {
17+
18+
static final String DEFAULT_CONCACT_POINT = "localhost";
19+
static final int DEFAULT_PORT = 9042;
20+
static final String DEFAULT_DATACENTER = "datacenter1";
21+
static final String DEFAULT_KEYSPACE = "shedlock";
22+
static final String DEFAULT_TABLE = "lock";
23+
24+
/**
25+
* Default configuration contact point is localhost at port 9042 at datacenter1 using keyspace shedlock anb table lock
26+
*/
27+
public CassandraLockProvider() {
28+
super(new CassandraStorageAccessor(DEFAULT_CONCACT_POINT, DEFAULT_PORT, DEFAULT_DATACENTER, DEFAULT_KEYSPACE, DEFAULT_TABLE));
29+
}
30+
31+
public CassandraLockProvider(@NotNull String contactPoint, @NotNull int port, @NotNull String datacenter, @NotNull String keyspace, @NotNull String table) {
32+
super(new CassandraStorageAccessor(contactPoint, port, datacenter, keyspace, table));
33+
}
34+
35+
/**
36+
* Using default table lock
37+
*
38+
* @param cqlSession
39+
*/
40+
public CassandraLockProvider(@NotNull CqlSession cqlSession) {
41+
super(new CassandraStorageAccessor(cqlSession, DEFAULT_TABLE));
42+
}
43+
44+
public CassandraLockProvider(@NotNull CqlSession cqlSession, @NotNull String table) {
45+
super(new CassandraStorageAccessor(cqlSession, table));
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package net.javacrumbs.shedlock.provider.cassandra;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.ResultSet;
5+
import com.datastax.oss.driver.api.core.cql.Row;
6+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
7+
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
8+
import net.javacrumbs.shedlock.core.LockConfiguration;
9+
import net.javacrumbs.shedlock.support.AbstractStorageAccessor;
10+
import net.javacrumbs.shedlock.support.Utils;
11+
import org.jetbrains.annotations.NotNull;
12+
13+
import java.net.InetSocketAddress;
14+
import java.time.Instant;
15+
import java.util.Optional;
16+
17+
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
18+
19+
public class CassandraStorageAccessor extends AbstractStorageAccessor {
20+
21+
private static final String LOCK_NAME = "name";
22+
private static final String LOCK_UNTIL = "lockUntil";
23+
private static final String LOCKED_AT = "lockedAt";
24+
private static final String LOCKED_BY = "lockedBy";
25+
26+
private final String hostname;
27+
private final String table;
28+
private final CqlSession cqlSession;
29+
30+
public CassandraStorageAccessor(@NotNull String contactPoint, @NotNull int port, @NotNull String datacenter, @NotNull String keyspace, @NotNull String table) {
31+
this.hostname = Utils.getHostname();
32+
this.table = table;
33+
cqlSession = CqlSession.builder()
34+
.addContactPoint(new InetSocketAddress(contactPoint, port))
35+
.withLocalDatacenter(datacenter)
36+
.withKeyspace(keyspace)
37+
.build();
38+
}
39+
40+
public CassandraStorageAccessor(@NotNull CqlSession cqlSession, @NotNull String table) {
41+
this.hostname = Utils.getHostname();
42+
this.table = table;
43+
this.cqlSession = cqlSession;
44+
}
45+
46+
@Override
47+
public boolean insertRecord(@NotNull LockConfiguration lockConfiguration) {
48+
if (find(lockConfiguration.getName()).isPresent()) {
49+
return false;
50+
}
51+
52+
return insert(lockConfiguration.getName(), lockConfiguration.getLockAtMostUntil());
53+
}
54+
55+
@Override
56+
public boolean updateRecord(@NotNull LockConfiguration lockConfiguration) {
57+
Optional<Lock> lock = find(lockConfiguration.getName());
58+
if (!lock.isPresent() || lock.get().getLockUntil().isAfter(Instant.now())) {
59+
return false;
60+
}
61+
62+
return update(lockConfiguration.getName(), lockConfiguration.getLockAtMostUntil());
63+
}
64+
65+
@Override
66+
public void unlock(@NotNull LockConfiguration lockConfiguration) {
67+
updateUntil(lockConfiguration.getName(), lockConfiguration.getUnlockTime());
68+
}
69+
70+
@Override
71+
public boolean extend(@NotNull LockConfiguration lockConfiguration) {
72+
Optional<Lock> lock = find(lockConfiguration.getName());
73+
if (!lock.isPresent() || lock.get().getLockUntil().isBefore(Instant.now()) || !lock.get().getLockedBy().equals(hostname)) {
74+
logger.trace("extend false");
75+
return false;
76+
}
77+
78+
return updateUntil(lockConfiguration.getName(), lockConfiguration.getLockAtMostUntil());
79+
}
80+
81+
/**
82+
* Find existing row by primary key lock.name
83+
*
84+
* @param name lock name
85+
* @return optional lock row or empty
86+
*/
87+
Optional<Lock> find(String name) {
88+
SimpleStatement selectStatement = QueryBuilder.selectFrom(table)
89+
.column(LOCK_NAME)
90+
.column(LOCK_UNTIL)
91+
.column(LOCKED_AT)
92+
.column(LOCKED_BY)
93+
.whereColumn(LOCK_NAME).isEqualTo(literal(name))
94+
.build();
95+
96+
ResultSet resultSet = cqlSession.execute(selectStatement);
97+
Row row = resultSet.one();
98+
return row != null ?
99+
Optional.of(new Lock(row.getString(LOCK_NAME), row.getInstant(LOCK_UNTIL), row.getInstant(LOCKED_AT), row.getString(LOCKED_BY))) :
100+
Optional.empty();
101+
}
102+
103+
/**
104+
* Insert new lock row
105+
*
106+
* @param name lock name
107+
* @param until new until instant value
108+
*/
109+
boolean insert(String name, Instant until) {
110+
SimpleStatement insertStatement = QueryBuilder.insertInto(table)
111+
.value(LOCK_NAME, literal(name))
112+
.value(LOCK_UNTIL, literal(until))
113+
.value(LOCKED_AT, literal(Instant.now()))
114+
.value(LOCKED_BY, literal(hostname))
115+
.ifNotExists()
116+
.build();
117+
118+
ResultSet resultSet = cqlSession.execute(insertStatement);
119+
if (resultSet == null) {
120+
return false;
121+
}
122+
return resultSet.wasApplied();
123+
}
124+
125+
/**
126+
* Update existing lock row
127+
*
128+
* @param name lock name
129+
* @param until new until instant value
130+
*/
131+
boolean update(String name, Instant until) {
132+
SimpleStatement updateStatement = QueryBuilder.update(table)
133+
.setColumn(LOCK_UNTIL, literal(until))
134+
.setColumn(LOCKED_AT, literal(Instant.now()))
135+
.setColumn(LOCKED_BY, literal(hostname))
136+
.whereColumn(LOCK_NAME).isEqualTo(literal(name))
137+
.ifColumn(LOCK_UNTIL).isLessThan(literal(Instant.now()))
138+
.build();
139+
140+
ResultSet resultSet = cqlSession.execute(updateStatement);
141+
if (resultSet == null) {
142+
return false;
143+
}
144+
return resultSet.wasApplied();
145+
}
146+
147+
/**
148+
* Updates lock.until field where lockConfiguration.name
149+
*
150+
* @param name lock name
151+
* @param until new until instant value
152+
*/
153+
boolean updateUntil(String name, Instant until) {
154+
SimpleStatement updateStatement = QueryBuilder.update(table)
155+
.setColumn(LOCK_UNTIL, literal(until))
156+
.whereColumn(LOCK_NAME).isEqualTo(literal(name))
157+
.ifColumn(LOCK_UNTIL).isGreaterThanOrEqualTo(literal(Instant.now()))
158+
.ifColumn(LOCKED_BY).isEqualTo(literal(hostname))
159+
.build();
160+
161+
ResultSet resultSet = cqlSession.execute(updateStatement);
162+
if (resultSet == null) {
163+
return false;
164+
}
165+
return resultSet.wasApplied();
166+
}
167+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package net.javacrumbs.shedlock.provider.cassandra;
2+
3+
import java.time.Instant;
4+
5+
public class Lock {
6+
7+
private String name;
8+
private Instant lockUntil;
9+
private Instant lockedAt;
10+
private String lockedBy;
11+
12+
public Lock(String name, Instant lockUntil, Instant lockedAt, String lockedBy) {
13+
this.name = name;
14+
this.lockUntil = lockUntil;
15+
this.lockedAt = lockedAt;
16+
this.lockedBy = lockedBy;
17+
}
18+
19+
public String getName() {
20+
return name;
21+
}
22+
23+
public void setName(String name) {
24+
this.name = name;
25+
}
26+
27+
public Instant getLockUntil() {
28+
return lockUntil;
29+
}
30+
31+
public void setLockUntil(Instant lockUntil) {
32+
this.lockUntil = lockUntil;
33+
}
34+
35+
public Instant getLockedAt() {
36+
return lockedAt;
37+
}
38+
39+
public void setLockedAt(Instant lockedAt) {
40+
this.lockedAt = lockedAt;
41+
}
42+
43+
public String getLockedBy() {
44+
return lockedBy;
45+
}
46+
47+
public void setLockedBy(String lockedBy) {
48+
this.lockedBy = lockedBy;
49+
}
50+
}

0 commit comments

Comments
 (0)