Kafka Streams Topology Skill
Expert knowledge of Kafka Streams library for building stream processing topologies in Java/Kotlin.
What I Know
Core Abstractions
KStream (Event Stream - Unbounded, Append-Only):
-
Represents immutable event sequences
-
Each record is an independent event
-
Use for: Clickstreams, transactions, sensor readings
KTable (Changelog Stream - Latest State by Key):
-
Represents mutable state (compacted topic)
-
Updates override previous values (by key)
-
Use for: User profiles, product catalog, account balances
GlobalKTable (Replicated Table - Available on All Instances):
-
Full table replicated to every stream instance
-
No partitioning (broadcast)
-
Use for: Reference data (countries, products), lookups
Key Differences:
// KStream: Every event is independent KStream<Long, Click> clicks = builder.stream("clicks"); clicks.foreach((key, value) -> { System.out.println(value); // Prints every click event });
// KTable: Latest value wins (by key) KTable<Long, User> users = builder.table("users"); users.toStream().foreach((key, value) -> { System.out.println(value); // Prints only current user state });
// GlobalKTable: Replicated to all instances (no partitioning) GlobalKTable<Long, Product> products = builder.globalTable("products"); // Available for lookups on any instance (no repartitioning needed)
When to Use This Skill
Activate me when you need help with:
-
Topology design ("How to design Kafka Streams topology?")
-
KStream vs KTable ("When to use KStream vs KTable?")
-
Stream operations ("Filter and transform events")
-
Joins ("Join KStream with KTable")
-
Windowing ("Tumbling vs hopping vs session windows")
-
Exactly-once semantics ("Enable EOS")
-
Topology optimization ("Optimize stream processing")
Common Patterns
Pattern 1: Filter and Transform
Use Case: Clean and enrich events
StreamsBuilder builder = new StreamsBuilder();
// Input stream KStream<Long, ClickEvent> clicks = builder.stream("clicks");
// Filter out bot clicks KStream<Long, ClickEvent> humanClicks = clicks .filter((key, value) -> !value.isBot());
// Transform: Extract page from URL KStream<Long, String> pages = humanClicks .mapValues(click -> extractPage(click.getUrl()));
// Write to output topic pages.to("pages");
Pattern 2: Branch by Condition
Use Case: Route events to different paths
Map<String, KStream<Long, Order>> branches = orders .split(Named.as("order-")) .branch((key, order) -> order.getTotal() > 1000, Branched.as("high-value")) .branch((key, order) -> order.getTotal() > 100, Branched.as("medium-value")) .defaultBranch(Branched.as("low-value"));
// High-value orders → priority processing branches.get("order-high-value").to("priority-orders");
// Low-value orders → standard processing branches.get("order-low-value").to("standard-orders");
Pattern 3: Enrich Stream with Table (Stream-Table Join)
Use Case: Add user details to click events
// Users table (current state) KTable<Long, User> users = builder.table("users");
// Clicks stream KStream<Long, ClickEvent> clicks = builder.stream("clicks");
// Enrich clicks with user data (left join) KStream<Long, EnrichedClick> enriched = clicks.leftJoin( users, (click, user) -> new EnrichedClick( click.getPage(), user != null ? user.getName() : "unknown", user != null ? user.getEmail() : "unknown" ), Joined.with(Serdes.Long(), clickSerde, userSerde) );
enriched.to("enriched-clicks");
Pattern 4: Aggregate with Windowing
Use Case: Count clicks per user, per 5-minute window
KTable<Windowed<Long>, Long> clickCounts = clicks .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(Materialized.as("click-counts-store"));
// Convert to stream for output clickCounts.toStream() .map((windowedKey, count) -> { Long userId = windowedKey.key(); Instant start = windowedKey.window().startTime(); Instant end = windowedKey.window().endTime(); return KeyValue.pair(userId, new WindowedCount(userId, start, end, count)); }) .to("click-counts");
Pattern 5: Stateful Processing with State Store
Use Case: Detect duplicate events within 10 minutes
// Define state store StoreBuilder<KeyValueStore<Long, Long>> storeBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("dedup-store"), Serdes.Long(), Serdes.Long() );
builder.addStateStore(storeBuilder);
// Deduplicate events KStream<Long, Event> deduplicated = events.transformValues( () -> new ValueTransformerWithKey<Long, Event, Event>() { private KeyValueStore<Long, Long> store;
@Override
public void init(ProcessorContext context) {
this.store = context.getStateStore("dedup-store");
}
@Override
public Event transform(Long key, Event value) {
Long lastSeen = store.get(key);
long now = System.currentTimeMillis();
// Duplicate detected (within 10 minutes)
if (lastSeen != null && (now - lastSeen) < 600_000) {
return null; // Drop duplicate
}
// Not duplicate, store timestamp
store.put(key, now);
return value;
}
},
"dedup-store"
).filter((key, value) -> value != null); // Remove nulls
deduplicated.to("unique-events");
Join Types
- Stream-Stream Join (Inner)
Use Case: Correlate related events within time window
// Page views and clicks within 10 minutes KStream<Long, PageView> views = builder.stream("page-views"); KStream<Long, Click> clicks = builder.stream("clicks");
KStream<Long, ClickWithView> joined = clicks.join( views, (click, view) -> new ClickWithView(click, view), JoinWindows.of(Duration.ofMinutes(10)), StreamJoined.with(Serdes.Long(), clickSerde, viewSerde) );
- Stream-Table Join (Left)
Use Case: Enrich events with current state
// Add product details to order items KTable<Long, Product> products = builder.table("products"); KStream<Long, OrderItem> items = builder.stream("order-items");
KStream<Long, EnrichedOrderItem> enriched = items.leftJoin( products, (item, product) -> new EnrichedOrderItem( item, product != null ? product.getName() : "Unknown", product != null ? product.getPrice() : 0.0 ) );
- Table-Table Join (Inner)
Use Case: Combine two tables (latest state)
// Join users with their current shopping cart KTable<Long, User> users = builder.table("users"); KTable<Long, Cart> carts = builder.table("shopping-carts");
KTable<Long, UserWithCart> joined = users.join( carts, (user, cart) -> new UserWithCart(user.getName(), cart.getTotal()) );
- Stream-GlobalKTable Join
Use Case: Enrich with reference data (no repartitioning)
// Add country details to user registrations GlobalKTable<String, Country> countries = builder.globalTable("countries"); KStream<Long, UserRegistration> registrations = builder.stream("registrations");
KStream<Long, EnrichedRegistration> enriched = registrations.leftJoin( countries, (userId, registration) -> registration.getCountryCode(), // Key extractor (registration, country) -> new EnrichedRegistration( registration, country != null ? country.getName() : "Unknown" ) );
Windowing Strategies
Tumbling Windows (Non-Overlapping)
Use Case: Aggregate per fixed time period
// Count events every 5 minutes KTable<Windowed<Long>, Long> counts = events .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) .count();
// Windows: [0:00-0:05), [0:05-0:10), [0:10-0:15)
Hopping Windows (Overlapping)
Use Case: Moving average or overlapping aggregates
// Count events in 10-minute windows, advancing every 5 minutes KTable<Windowed<Long>, Long> counts = events .groupByKey() .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofMinutes(10), Duration.ofMinutes(5) ).advanceBy(Duration.ofMinutes(5))) .count();
// Windows: [0:00-0:10), [0:05-0:15), [0:10-0:20)
Session Windows (Event-Based)
Use Case: User sessions with inactivity gap
// Session ends after 30 minutes of inactivity KTable<Windowed<Long>, Long> sessionCounts = events .groupByKey() .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30))) .count();
Sliding Windows (Continuous)
Use Case: Anomaly detection over sliding time window
// Detect >100 events in any 1-minute period KTable<Windowed<Long>, Long> slidingCounts = events .groupByKey() .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))) .count();
Best Practices
- Partition Keys Correctly
✅ DO:
// Repartition by user_id before aggregation KStream<Long, Event> byUser = events .selectKey((key, value) -> value.getUserId());
// Now aggregation is efficient KTable<Long, Long> userCounts = byUser .groupByKey() .count();
❌ DON'T:
// WRONG: groupBy with different key (triggers repartitioning!) KTable<Long, Long> userCounts = events .groupBy((key, value) -> KeyValue.pair(value.getUserId(), value)) .count();
- Use Appropriate Serdes
✅ DO:
// Define custom serde for complex types Serde<User> userSerde = new JsonSerde<>(User.class);
KStream<Long, User> users = builder.stream( "users", Consumed.with(Serdes.Long(), userSerde) );
❌ DON'T:
// WRONG: No serde specified (uses default String serde!) KStream<Long, User> users = builder.stream("users");
- Enable Exactly-Once Semantics
✅ DO:
Properties props = new Properties(); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // EOS v2 (recommended) props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // Commit frequently
- Use Materialized Stores for Queries
✅ DO:
// Named store for interactive queries KTable<Long, Long> counts = events .groupByKey() .count(Materialized.<Long, Long, KeyValueStore<Bytes, byte[]>>as("user-counts") .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.Long()));
// Query store from REST API ReadOnlyKeyValueStore<Long, Long> store = streams.store(StoreQueryParameters.fromNameAndType( "user-counts", QueryableStoreTypes.keyValueStore() ));
Long count = store.get(userId);
Topology Optimization
- Combine Operations
GOOD (Single pass):
KStream<Long, String> result = events .filter((key, value) -> value.isValid()) .mapValues(value -> value.toUpperCase()) .filterNot((key, value) -> value.contains("test"));
BAD (Multiple intermediate topics):
KStream<Long, Event> valid = events.filter((key, value) -> value.isValid()); valid.to("valid-events"); // Unnecessary write
KStream<Long, Event> fromValid = builder.stream("valid-events"); KStream<Long, String> upper = fromValid.mapValues(v -> v.toUpperCase());
- Reuse KTables
GOOD (Shared table):
KTable<Long, User> users = builder.table("users");
KStream<Long, EnrichedClick> enrichedClicks = clicks.leftJoin(users, ...); KStream<Long, EnrichedOrder> enrichedOrders = orders.leftJoin(users, ...);
BAD (Duplicate tables):
KTable<Long, User> users1 = builder.table("users"); KTable<Long, User> users2 = builder.table("users"); // Duplicate!
Testing Topologies
Topology Test Driver
@Test public void testClickFilter() { // Setup topology StreamsBuilder builder = new StreamsBuilder(); KStream<Long, Click> clicks = builder.stream("clicks"); clicks.filter((key, value) -> !value.isBot()) .to("human-clicks");
Topology topology = builder.build();
// Create test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology);
// Input topic
TestInputTopic<Long, Click> inputTopic = testDriver.createInputTopic(
"clicks",
Serdes.Long().serializer(),
clickSerde.serializer()
);
// Output topic
TestOutputTopic<Long, Click> outputTopic = testDriver.createOutputTopic(
"human-clicks",
Serdes.Long().deserializer(),
clickSerde.deserializer()
);
// Send test data
inputTopic.pipeInput(1L, new Click(1L, "page1", false)); // Human
inputTopic.pipeInput(2L, new Click(2L, "page2", true)); // Bot
// Assert output
List<Click> output = outputTopic.readValuesToList();
assertEquals(1, output.size()); // Only human click
assertFalse(output.get(0).isBot());
testDriver.close();
}
Common Issues & Solutions
Issue 1: StreamsException - Not Co-Partitioned
Error: Topics not co-partitioned for join
Root Cause: Joined streams/tables have different partition counts
Solution: Repartition to match:
// Ensure same partition count KStream<Long, Event> repartitioned = events .through("events-repartitioned", Produced.with(Serdes.Long(), eventSerde) .withStreamPartitioner((topic, key, value, numPartitions) -> (int) (key % 12) // Match target partition count ) );
Issue 2: Out of Memory (Large State Store)
Error: Java heap space
Root Cause: State store too large, windowing not used
Solution: Add time-based cleanup:
// Use windowing to limit state size KTable<Windowed<Long>, Long> counts = events .groupByKey() .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofHours(24), // Window size Duration.ofHours(1) // Grace period )) .count();
Issue 3: High Lag, Slow Processing
Root Cause: Blocking operations, inefficient transformations
Solution: Use async processing:
// BAD: Blocking HTTP call events.mapValues(value -> { return httpClient.get(value.getUrl()); // BLOCKS! });
// GOOD: Async processing with state store events.transformValues(() -> new AsyncEnricher());
References
-
Kafka Streams Documentation: https://kafka.apache.org/documentation/streams/
-
Kafka Streams Tutorial: https://kafka.apache.org/documentation/streams/tutorial
-
Testing Guide: https://kafka.apache.org/documentation/streams/developer-guide/testing.html
Invoke me when you need topology design, joins, windowing, or exactly-once semantics expertise!