Skip to content

Commit d892e74

Browse files
committed
Improve README.md
* always set the right catalog * fix typos
1 parent c88c2c6 commit d892e74

File tree

1 file changed

+52
-34
lines changed

1 file changed

+52
-34
lines changed

README.md

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,25 @@
66
* Batch/Stream unification of queries in action
77
* Different ways to join dynamic data
88
* Creating Tables with DDL
9+
* Maintaining materialize views with continuous SQL queries in Kafka and MySQL
910

1011
* Scenario is an online store receiving orders.
11-
1212
* The order system consists of six tables (derived from the well-known TPC-H benchmark)
13-
* orders: one row for each order
14-
* lineitem: individual items of an order
15-
* customer: customer data
16-
* nation: denormalized nation data
17-
* region: denormalized region data
18-
* rates: exchange rates for currencies
13+
* `PROD_ORDERS` one row for each order
14+
* `PROD_LINEITEM`: individual items of an order
15+
* `PROD_CUSTOMER`: customer data
16+
* `PROD_NATION`: denormalized nation data
17+
* `PROD_REGION`: denormalized region data
18+
* `PROD_RATES`: exchange rates for currencies
19+
* `PROD_RATES_HISTORY`: history of all currency exchange rates
1920

2021
Depending on their update characteristic (frequency, insert-only) tables are stored in different systems:
21-
* Kafka: orders, linetimes, rates
22-
* MySQL: customer, nation, region
22+
* Kafka: `PROD_ORDERS`, `PROD_LINEITEM`, `PROD_RATES_HISTORY`
23+
* MySQL: `PROD_CUSTOMER`, `PROD_NATION`, `PROD_REGION`, `PROD_RATES`
2324

24-
# Prepare the Data
25+
# Get the Data
2526

26-
Please download the TPCH demo data from Google Drive and extract the zip archive into the `./data` folder (as `./data/*.tbl`).
27+
Please download the demo data from Google Drive and extract the zip archive into the `./data` folder (as `./data/*.tbl`).
2728

2829
https://drive.google.com/file/d/15LWUBGZenWaW3R_WNxqvcTQOuA_Kgtjv
2930

@@ -70,7 +71,7 @@ quit;
7071

7172
## Grafana
7273

73-
* Visualization tool
74+
* Dashboard tool: [http://localhost:3000](http://localhost:3000)
7475

7576
## Minio (S3-compatible Storage)
7677

@@ -123,6 +124,10 @@ SELECT * FROM prod_nation;
123124

124125
* Create a table backed by a file in S3.
125126

127+
```
128+
USE CATALOG hive;
129+
```
130+
126131
```
127132
CREATE TABLE dev_orders (
128133
o_orderkey INTEGER,
@@ -150,12 +155,9 @@ CREATE TABLE dev_orders (
150155
INSERT INTO dev_orders SELECT * FROM default_catalog.default_database.prod_orders;
151156
```
152157

153-
* Go to Flink UI and show number of processed records.
154-
158+
* Show running job in Flink Web UI [http://localhost:8081](http://localhost:8081)
155159
* Manually cancel the job
156-
157160
* Show file in Minio: [http://localhost:9000](http://localhost:9000)
158-
159161
* Show data in SQL client
160162

161163
```
@@ -165,7 +167,7 @@ SELECT COUNT(*) AS rowCnt FROM dev_orders;
165167

166168
### Run a Query on the Snapshotted Data
167169

168-
* Using the batch engine for better efficieny
170+
* Using the batch engine for better efficiency
169171

170172
```
171173
SET execution.type=batch;
@@ -185,7 +187,7 @@ GROUP BY
185187
CEIL(o_ordertime TO MINUTE);
186188
```
187189

188-
* Run the same in streaming
190+
* Run the same query as a continuous streaming query
189191

190192
```
191193
SET execution.type=streaming;
@@ -197,13 +199,6 @@ SET execution.type=streaming;
197199
SET execution.result-mode=changelog;
198200
```
199201

200-
* Reset to previous settings
201-
202-
```
203-
SET execution.result-mode=table;
204-
SET execution.type=batch;
205-
```
206-
207202
* We can streamify the query a bit with a TUMBLE window
208203

209204
```
@@ -218,11 +213,17 @@ GROUP BY
218213
TUMBLE(o_ordertime, INTERVAL '1' MINUTE);
219214
```
220215

221-
* Query is still executed with the batch engine
216+
* We can execute this query also with the batch engine
217+
* Reset to previous settings
218+
219+
```
220+
SET execution.result-mode=table;
221+
SET execution.type=batch;
222+
```
222223

223224
### Move query to streaming data
224225

225-
* We can run the same query on the dynamic table
226+
* We can run the same query on the table backed by a Kafka topic
226227

227228
```
228229
SET execution.type=streaming;
@@ -254,6 +255,10 @@ GROUP BY
254255
SET execution.type=batch;
255256
```
256257

258+
```
259+
USE CATALOG hive;
260+
```
261+
257262
* show customers and their orders by region and priority for a specific day
258263
* query joins the snapshot of the orders table
259264

@@ -284,13 +289,17 @@ ORDER BY r_name, o_orderpriority;
284289
SET execution.type=streaming;
285290
```
286291

292+
```
293+
USE CATALOG default_catalog;
294+
```
295+
287296
```
288297
SELECT
289298
r_name AS `region`,
290299
o_orderpriority AS `priority`,
291300
COUNT(DISTINCT c_custkey) AS `number_of_customers`,
292301
COUNT(o_orderkey) AS `number_of_orders`
293-
FROM default_catalog.default_database.prod_orders
302+
FROM prod_orders
294303
JOIN prod_customer ON o_custkey = c_custkey
295304
JOIN prod_nation ON c_nationkey = n_nationkey
296305
JOIN prod_region ON n_regionkey = r_regionkey
@@ -309,7 +318,6 @@ GROUP BY r_name, o_orderpriority;
309318

310319
* A common requirement is to join events of two (or more) dynamic tables that are related with each other in a temporal context, for example events that happened around the same time.
311320
* Flink SQL features special optimizations for such joins.
312-
313321
* First switch to the default catalog (which contains all dynamic tables)
314322

315323
```
@@ -344,6 +352,10 @@ WHERE
344352

345353
* Example query: enrich `prod_lineitem` table with current exchange rate from `prod_rates` table to compute EUR-normalize amounts.
346354

355+
```
356+
USE CATALOG default_catalog;
357+
```
358+
347359
```
348360
SELECT
349361
l_proctime AS `querytime`,
@@ -373,10 +385,13 @@ UPDATE PROD_RATES SET RS_TIMESTAMP = '2020-04-01 01:00:00.000', RS_RATE = 1.234
373385
### Enrichment Join against Temporal Table
374386

375387
* Same use case as before.
376-
377388
* Instead of looking up rates from MySQL, we ingest updates from another Kafka table.
378389
* Kafka table is accessed via a TemporalTableFunction `prod_rates_temporal` which looks up the most-recent exchange rate for a currency.
379390

391+
```
392+
USE CATALOG default_catalog;
393+
```
394+
380395
```
381396
SELECT
382397
l_ordertime AS `ordertime`,
@@ -399,14 +414,18 @@ WHERE rs_symbol = l_currency AND
399414
## Matching Patterns on Dynamic Tables
400415

401416
* Matching patterns on new data with low latency is a common use case for stream processing
402-
* SQL:2016 introduced the MATCH RECOGNIZE clause to match patterns on tables
403-
* The combination of MATCH RECOGNIZE and dynamic tables is very powerful
404-
* Flink supports MATCH RECOGNIZE since several releases
417+
* SQL:2016 introduced the `MATCH_RECOGNIZE` clause to match patterns on tables
418+
* The combination of `MATCH_RECOGNIZE` and dynamic tables is very powerful
419+
* Flink supports `MATCH_RECOGNIZE` since several releases
405420

406421
* Find customers that have changed their delivery behavior.
407422
* Search for a pattern where the last x lineitems had regular shippings
408423
* But now the customer whats to pay on delivery (collect on delivery = CoD)
409424

425+
```
426+
USE CATALOG default_catalog;
427+
```
428+
410429
```
411430
CREATE VIEW lineitem_with_customer AS
412431
SELECT * FROM (
@@ -580,4 +599,3 @@ GROUP BY
580599

581600
* Monitor query result in Grafana: [http://localhost:3000](http://localhost:3000)
582601
* Go to dashboard region stats and set refresh rate to 1s
583-

0 commit comments

Comments
 (0)