Skip to content

Commit 5c7fe5e

Browse files
Added sample code - RSI introduction (oracle-samples#220)
1 parent 44728c2 commit 5c7fe5e

File tree

11 files changed

+562
-0
lines changed

11 files changed

+562
-0
lines changed

java/rsi-introduction/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
## Java - Oracle Developers on Medium.com
2+
[Getting Started with the Java library for Reactive Streams Ingestion (RSI)](https://medium.com/oracledevs/getting-started-with-the-java-library-for-reactive-streams-ingestion-rsi-afbc808e6e24)

java/rsi-introduction/pom.xml

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.oracle.dev.jdbc</groupId>
7+
<artifactId>rsi-introduction</artifactId>
8+
<version>1.0-SNAPSHOT</version>
9+
10+
<name>rsi-introduction</name>
11+
<description>Intro to Java Library for Reactive Streams Ingestion</description>
12+
<url>https://github.com/juarezjuniorgithub/rsi-introduction</url>
13+
14+
<properties>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<maven.compiler.source>1.10</maven.compiler.source>
17+
<maven.compiler.target>1.10</maven.compiler.target>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>junit</groupId>
23+
<artifactId>junit</artifactId>
24+
<version>3.8.1</version>
25+
</dependency>
26+
27+
<!-- Oracle JDBC JARs -->
28+
<dependency>
29+
<groupId>com.oracle.database.jdbc</groupId>
30+
<artifactId>rsi</artifactId>
31+
<version>21.6.0.0.1</version>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>com.oracle.database.jdbc</groupId>
36+
<artifactId>ojdbc11</artifactId>
37+
<version>21.6.0.0.1</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>com.oracle.database.jdbc</groupId>
42+
<artifactId>ucp11</artifactId>
43+
<version>21.6.0.0.1</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>com.oracle.ojdbc</groupId>
48+
<artifactId>ons</artifactId>
49+
<version>19.3.0.0</version>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>com.oracle.database.jdbc</groupId>
54+
<artifactId>oraclepki</artifactId>
55+
<version>19.11.0.0</version>
56+
</dependency>
57+
58+
<dependency>
59+
<groupId>com.oracle.database.security</groupId>
60+
<artifactId>osdt_core</artifactId>
61+
<version>21.6.0.0.1</version>
62+
</dependency>
63+
<dependency>
64+
<groupId>com.oracle.database.security</groupId>
65+
<artifactId>osdt_cert</artifactId>
66+
<version>21.6.0.0.1</version>
67+
</dependency>
68+
69+
70+
</dependencies>
71+
72+
<build>
73+
<pluginManagement>
74+
<!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
75+
<plugins>
76+
<plugin>
77+
<artifactId>maven-clean-plugin</artifactId>
78+
<version>3.1.0</version>
79+
</plugin>
80+
<plugin>
81+
<artifactId>maven-site-plugin</artifactId>
82+
<version>3.7.1</version>
83+
</plugin>
84+
<plugin>
85+
<artifactId>maven-project-info-reports-plugin</artifactId>
86+
<version>3.0.0</version>
87+
</plugin>
88+
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
89+
<plugin>
90+
<artifactId>maven-resources-plugin</artifactId>
91+
<version>3.0.2</version>
92+
</plugin>
93+
<plugin>
94+
<artifactId>maven-compiler-plugin</artifactId>
95+
<version>3.8.0</version>
96+
</plugin>
97+
<plugin>
98+
<artifactId>maven-surefire-plugin</artifactId>
99+
<version>2.22.1</version>
100+
</plugin>
101+
<plugin>
102+
<artifactId>maven-jar-plugin</artifactId>
103+
<version>3.0.2</version>
104+
</plugin>
105+
<plugin>
106+
<artifactId>maven-install-plugin</artifactId>
107+
<version>2.5.2</version>
108+
</plugin>
109+
<plugin>
110+
<artifactId>maven-deploy-plugin</artifactId>
111+
<version>2.8.2</version>
112+
</plugin>
113+
</plugins>
114+
</pluginManagement>
115+
</build>
116+
117+
<reporting>
118+
<plugins>
119+
<plugin>
120+
<artifactId>maven-project-info-reports-plugin</artifactId>
121+
</plugin>
122+
</plugins>
123+
</reporting>
124+
</project>
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.jdbc.dev.rsi;
23+
24+
import oracle.rsi.StreamEntity;
25+
import oracle.rsi.StreamField;
26+
27+
@StreamEntity(tableName = "customers")
28+
public class Customer {
29+
30+
public Customer(long id, String name, String region) {
31+
super();
32+
this.id = id;
33+
this.name = name;
34+
this.region = region;
35+
}
36+
37+
@StreamField
38+
public long id;
39+
40+
@StreamField
41+
public String name;
42+
43+
@StreamField(columnName = "region")
44+
public String region;
45+
46+
String someRandomField;
47+
48+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.jdbc.dev.rsi;
23+
24+
import java.io.IOException;
25+
import java.nio.file.Files;
26+
import java.nio.file.Path;
27+
import java.util.Properties;
28+
29+
/**
30+
* <p>
31+
* Configuration for connecting code samples to an Oracle Database instance.
32+
* </p>
33+
*/
34+
public class DatabaseConfig {
35+
36+
private static final Properties CONFIG = new Properties();
37+
38+
static {
39+
try {
40+
var fileStream = Files.newInputStream(
41+
Path.of("C:\\java-projects\\rsi-introduction\\src\\main\\resources\\config.properties"));
42+
CONFIG.load(fileStream);
43+
} catch (IOException e) {
44+
e.printStackTrace();
45+
}
46+
}
47+
48+
/** Host name where an Oracle Database instance is running */
49+
static final String HOST = CONFIG.getProperty("HOST");
50+
51+
/** Port number where an Oracle Database instance is listening */
52+
static final int PORT = Integer.parseInt(CONFIG.getProperty("PORT"));
53+
54+
/** Service name of an Oracle Database */
55+
static final String SERVICE_NAME = CONFIG.getProperty("DATABASE");
56+
57+
/** User name that connects to an Oracle Database */
58+
static final String USER = CONFIG.getProperty("USER");
59+
60+
/** Password of the user that connects to an Oracle Database */
61+
static final String PASSWORD = CONFIG.getProperty("PASSWORD");
62+
63+
/** Database schema */
64+
static final String SCHEMA = CONFIG.getProperty("SCHEMA");
65+
66+
/** The file system path of a wallet directory */
67+
static final String WALLET_LOCATION = CONFIG.getProperty("WALLET_LOCATION");
68+
69+
/** Colon for URL composition */
70+
static final String COLON = ":";
71+
72+
/** JDBC EZConnect URL format */
73+
static final String JDBC_EZ_CONNECT_FORMAT = "jdbc:oracle:thin:@";
74+
75+
/** Helper method to get the JDBC URL */
76+
static final String getJdbcConnectionUrl() {
77+
StringBuilder url = new StringBuilder(JDBC_EZ_CONNECT_FORMAT).append(DatabaseConfig.HOST).append(COLON)
78+
.append(PORT).append(COLON).append(SERVICE_NAME);
79+
return url.toString();
80+
}
81+
82+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.jdbc.dev.rsi;
23+
24+
import java.sql.SQLException;
25+
import java.time.Duration;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
29+
import oracle.rsi.ReactiveStreamsIngestion;
30+
31+
public class SimpleFlowPublisher {
32+
33+
public static void main(String[] args) throws SQLException {
34+
35+
ExecutorService workerThreadPool = Executors.newFixedThreadPool(2);
36+
37+
// Reference for JDBC URL formats at
38+
// https://docs.oracle.com/en/database/oracle/oracle-database/21/jajdb/
39+
ReactiveStreamsIngestion rsi = ReactiveStreamsIngestion.builder().url(DatabaseConfig.getJdbcConnectionUrl())
40+
.username(DatabaseConfig.USER).password(DatabaseConfig.PASSWORD).schema(DatabaseConfig.SCHEMA)
41+
.executor(workerThreadPool).bufferRows(10).bufferInterval(Duration.ofSeconds(20)).entity(Customer.class)
42+
.build();
43+
44+
SimpleObjectPublisher<Object[]> publisher = new SimpleObjectPublisher<Object[]>();
45+
publisher.subscribe(rsi.subscriber());
46+
47+
SimpleObjectPublisher<Object[]> anotherPublisher = new SimpleObjectPublisher<Object[]>();
48+
anotherPublisher.subscribe(rsi.subscriber());
49+
50+
publisher.accept(new Object[] { 7, "John Doe", "North" });
51+
publisher.accept(new Object[] { 8, "Jane Doe", "North" });
52+
publisher.accept(new Object[] { 9, "John Smith", "South" });
53+
54+
anotherPublisher.accept(new Object[] { 10, "John Doe", "North" });
55+
anotherPublisher.accept(new Object[] { 11, "Jane Doe", "North" });
56+
anotherPublisher.accept(new Object[] { 12, "John Smith", "South" });
57+
58+
rsi.close();
59+
60+
workerThreadPool.shutdown();
61+
62+
}
63+
64+
}
65+
66+
67+
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright (c) 2021, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package com.oracle.jdbc.dev.rsi;
23+
24+
import java.util.concurrent.Flow.Publisher;
25+
import java.util.concurrent.Flow.Subscriber;
26+
import java.util.concurrent.Flow.Subscription;
27+
import java.util.function.Consumer;
28+
29+
public class SimpleObjectPublisher<T> implements Publisher<T>, Consumer<T> {
30+
31+
Subscriber<? super T> subscriber;
32+
33+
Subscription subscription = new SimpleObjectSubscription();
34+
35+
//Data streaming starts
36+
37+
@Override
38+
public void subscribe(Subscriber<? super T> subscriber) {
39+
this.subscriber = subscriber;
40+
this.subscriber.onSubscribe(subscription);
41+
}
42+
43+
@Override
44+
public void accept(T t) {
45+
subscriber.onNext(t);
46+
}
47+
48+
}

0 commit comments

Comments
 (0)