Skip to content

Commit ecea150

Browse files
enothereskadguy
authored andcommitted
MINOR: Document how to create source streams and tables
Originally reviewed as part of apache#3490. Author: Eno Thereska <eno.thereska@gmail.com> Reviewers: Damian Guy <damian.guy@gmail.com> Closes apache#3701 from enothereska/minor-docs-create-source-streams
1 parent ee8e934 commit ecea150

File tree

1 file changed

+117
-12
lines changed

1 file changed

+117
-12
lines changed

docs/streams/developer-guide.html

Lines changed: 117 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -450,21 +450,126 @@ <h5><a id="streams_kstream_ktable" href="#streams_kstream_ktable">KStream, KTabl
450450

451451
If these records a KStream and the stream processing application were to sum the values it would return <code>4</code>. If these records were a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
452452

453-
<h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h4>
453+
<h4><a id="streams_dsl_source" href="#streams_dsl_source">Creating Source Streams from Kafka</a></h4>
454454

455455
<p>
456-
Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
457-
can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
458-
from a single topic).
456+
You can easily read data from Kafka topics into your application. We support the following operations.
459457
</p>
460-
461-
<pre class="brush: java;">
462-
StreamsBuilder builder = new StreamsBuilder();
463-
464-
KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", "topic2");
465-
KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", "stateStoreName");
466-
GlobalKTable&lt;String, GenericRecord&gt; source2 = builder.globalTable("topic4", "globalStoreName");
467-
</pre>
458+
<table class="data-table" border="1">
459+
<tbody><tr>
460+
<th>Reading from Kafka</th>
461+
<th>Description</th>
462+
</tr>
463+
<tr>
464+
<td><b>Stream</b>: input topic(s) &rarr; <code>KStream</code></td>
465+
<td>Create a <code>KStream</code> from the specified Kafka input topic(s), interpreting the data as a record stream.
466+
A <code>KStream</code> represents a partitioned record stream.
467+
<p>
468+
Slightly simplified, in the case of a KStream, the local KStream instance of every application instance will be populated
469+
with data from only a <b>subset</b> of the partitions of the input topic. Collectively, i.e. across all application instances,
470+
all the partitions of the input topic will be read and processed.
471+
</p>
472+
<pre class="brush: java;">
473+
import org.apache.kafka.common.serialization.Serdes;
474+
import org.apache.kafka.streams.StreamsBuilder;
475+
import org.apache.kafka.streams.kstream.KStream;
476+
477+
StreamsBuilder builder = new StreamsBuilder();
478+
479+
KStream&lt;String, Long&gt; wordCounts = builder.stream(
480+
Serdes.String(), /* key serde */
481+
Serdes.Long(), /* value serde */
482+
"word-counts-input-topic" /* input topic */);
483+
</pre>
484+
When to provide serdes explicitly:
485+
<ul>
486+
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
487+
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic(s) do not match
488+
the configured default serdes. </li>
489+
</ul>
490+
Several variants of <code>stream</code> exist to e.g. specify a regex pattern for input topics to read from.</td>
491+
</tr>
492+
<tr>
493+
<td><b>Table</b>: input topic(s) &rarr; <code>KTable</code></td>
494+
<td>
495+
Reads the specified Kafka input topic into a <code>KTable</code>. The topic is interpreted as a changelog stream,
496+
where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or
497+
as DELETE (when the value is null) for that key.
498+
<p>
499+
Slightly simplified, in the case of a KTable, the local KTable instance of every application instance will be populated
500+
with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, all
501+
the partitions of the input topic will be read and processed.
502+
</p>
503+
<p>
504+
You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
505+
When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
506+
When a name is not provided the table will not queryable and an internal name will be provided for the state store.
507+
</p>
508+
<pre class="brush: java;">
509+
import org.apache.kafka.common.serialization.Serdes;
510+
import org.apache.kafka.streams.StreamsBuilder;
511+
import org.apache.kafka.streams.kstream.KTable;
512+
513+
StreamsBuilder builder = new StreamsBuilder();
514+
515+
KTable&lt;String, Long&gt; wordCounts = builder.table(
516+
Serdes.String(), /* key serde */
517+
Serdes.Long(), /* value serde */
518+
"word-counts-input-topic", /* input topic */
519+
"word-counts-partitioned-store" /* table/store name */);
520+
</pre>
521+
522+
When to provide serdes explicitly:
523+
<ul>
524+
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
525+
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
526+
match the configured default serdes.</li>
527+
</ul>
528+
529+
Several variants of <code>table</code> exist to e.g. specify the <code>auto.offset.reset</code>
530+
policy to be used when reading from the input topic.
531+
</td>
532+
<tr>
533+
<td><b>Global Table</b>: input topic &rarr; <code>GlobalKTable</code></td>
534+
<td>
535+
Reads the specified Kafka input topic into a <code>GlobalKTable</code>. The topic is interpreted as a changelog stream, where records
536+
with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or as DELETE (when the
537+
value is <code>null</code>) for that key.
538+
<p>
539+
Slightly simplified, in the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be
540+
populated with data from all the partitions of the input topic. In other words, when using a global table, every application
541+
instance will get its own, full copy of the topic's data.
542+
</p>
543+
<p>
544+
You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
545+
When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
546+
When a name is not provided the table will not queryable and an internal name will be provided for the state store.
547+
</p>
548+
<pre class="brush: java;">
549+
import org.apache.kafka.common.serialization.Serdes;
550+
import org.apache.kafka.streams.StreamsBuilder;
551+
import org.apache.kafka.streams.kstream.GlobalKTable;
552+
553+
StreamsBuilder builder = new StreamsBuilder();
554+
555+
GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
556+
Serdes.String(), /* key serde */
557+
Serdes.Long(), /* value serde */
558+
"word-counts-input-topic", /* input topic */
559+
"word-counts-global-store" /* table/store name */);
560+
</pre>
561+
562+
When to provide serdes explicitly:
563+
<ul>
564+
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
565+
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
566+
match the configured default serdes.</li>
567+
</ul>
568+
Several variants of <code>globalTable<code> exist to e.g. specify explicit serdes.
569+
570+
</td>
571+
</tbody>
572+
</table>
468573

469574
<h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h4>
470575
A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:

0 commit comments

Comments
 (0)