Skip to content

Latest commit

 

History

History
75 lines (63 loc) · 3.86 KB

File metadata and controls

75 lines (63 loc) · 3.86 KB

Exercise 13: Join KStream KTable

Before

Re-run exercise 5. Re-run exercise 12.

or

  • create a TemperatureKey and Temperature and Tuple classes
  • create relative serdes
  • create a temperature topic
  • create a configuration topic co-partitioned with temperature
bin/kafka-topics --bootstrap-server localhost:9092 --create \
                      --partitions 2 \
                      --replication-factor 2 \
                      --topic configuration

Using the kafka stream builder we create a KTable from the configuration topic

We simply join the temperature stream and the Ktable to create an enriched stream

The topology shows how the join relies on an external state-storage.

topology

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [temperature])
      --> KSTREAM-LEFTJOIN-0000000004
    Processor: KSTREAM-LEFTJOIN-0000000004 (stores: [configurations-STATE-STORE-0000000001])
      --> KSTREAM-PRINTER-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Source: KSTREAM-SOURCE-0000000002 (topics: [configurations])
      --> KTABLE-SOURCE-0000000003
    Processor: KSTREAM-PRINTER-0000000005 (stores: [])
      --> none
      <-- KSTREAM-LEFTJOIN-0000000004
    Processor: KTABLE-SOURCE-0000000003 (stores: [configurations-STATE-STORE-0000000001])
      --> none
      <-- KSTREAM-SOURCE-0000000002

Question: What would happen if we update a configuration?

{"location":"room0"}	{"prefVal":42,"timestamp":1584318440065,"author":"Riccardo"}
{"location":"room1"}	{"prefVal":21,"timestamp":1584318445800,"author":"John Doe"}
{"location":"room2"}	{"prefVal":33,"timestamp":1584318450806,"author":"Jane Doe"}
{"location":"room3"}	{"prefVal":24,"timestamp":1584318455811,"author":"Marvin"}
{"location":"room4"}	{"prefVal":29,"timestamp":1584318460818,"author":"Thor"}
{"location":"room1"}	{"prefVal":42,"timestamp":1584318741707,"author":"Riccardo"}
{"location":"room4"}	{"prefVal":100,"timestamp":1584318747114,"author":"Thor"}
{"location":"room2"}	{"prefVal":42,"timestamp":1584318827423,"author":"Riccardo"}
{"location":"room3"}	{"prefVal":100,"timestamp":1584318832841,"author":"Thor"}
{"location":"room2"}	{"prefVal":42,"timestamp":1584318868259,"author":"Riccardo"}
{"location":"room3"}	{"prefVal":100,"timestamp":1584318873674,"author":"Thor"}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room3, RichTemperature{value=(10000,28), configuration=(1584318455811,24,Marvin)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room4, RichTemperature{value=(12000,33), configuration=(1584318460818,29,Thor)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room2, RichTemperature{value=(16000,34), configuration=(1584318450806,33,Jane Doe)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room1, RichTemperature{value=(20000,29), configuration=(1584318445800,21,John Doe)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room4, RichTemperature{value=(26000,39), configuration=(1584318460818,29,Thor)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room3, RichTemperature{value=(28000,33), configuration=(1584318455811,24,Marvin)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room3, RichTemperature{value=(36000,35), configuration=(1584318455811,24,Marvin)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room1, RichTemperature{value=(56000,25), configuration=(1584318445800,21,John Doe)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room3, RichTemperature{value=(58000,29), configuration=(1584318455811,24,Marvin)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room3, RichTemperature{value=(66000,36), configuration=(1584318455811,24,Marvin)}
[KSTREAM-FILTER-0000000005]: TemperatureKey 'location' room3, RichTemperature{value=(68000,34), configuration=(1584318455811,24,Marvin)}