Learning Kafka Streams in a Week (or less)

Happy New Year!!!

Last year I had an end-of-the-year resolution rather than a new-year resolution. I wanted to make sure that I do not end 2017 Christmas holiday break in an utter lack of productivity. Firing up a cold engine takes efforts, you know!

So, I had signed up to review a brand new book on Stream Processing by Manning publication. Kafka Streams in Action. It turned out to be just the right thing to keep cylinders firing through the 11 days break.

This is second book I reviewed for Manning and provided my feedback preprint. (The first one was Functional Programming in C++). It’s safe to say I like reviewing technical books. And why not? You get to learn something and you give something tangible back. A blog post like this is a bonus. And the best of all there’s a deadline—a short one. Three weeks in this case. In reality, I ended up getting just about a week to review 9 chapters.

TL;DR

If you use Kafka in your org already and anticipate developing a number of new applications that need to produce results in real-time, I would certainly recommend looking at Kafka Streams.

Kafka Streams in Action is the easiest way out there to learn Kafka Streams. Some background in Kafka, Stream Processing, and a little bit of functional programming background (comparable to Java 8 Streams API) will really accelerate learning—in a week or less!

Speed-Learning Kafka Streams

Part 1 of the book is about getting started with Kafka Streams. It’s introductory and you might want to read the first chapter if you are new to the stream processing world. Kafka knowledge is a must have so there’s second chapter on that.

From chapter 3 onwards things get interesting. There’s a ZMart example that evolves as you progress through the chapters. There are stateless operators like filter, filterNot, selectKey, branch, map, etc. They do what one might expect with a little catch. The selectKey and map operators are special. Both operators allow modifying the key arbitrarily. In essence they repartition the data—logically. Let me repeat, LOGICALLY.

The consequences of modifying the key can’t be understated. If aggregation or join operator is used subsequently, during that step, Kafka Streams physically materializes the KStream as a Kafka topic. Basically, it creates a new topic, creates a producer, produces all the data to the new topic (from the previously logically repartitioned KStream), creates a consumer, and consumes the newly created topic. That’s some heavy lifting.

Physical materialization of the KStream after logical repartitioning is essential because if you have two independent instances of the same Kafka Streams application running, each application is looking at only half of the data (remember consumers distribute partitions among themselves). The keys are not mutually exclusive as two instances could be maintaining their own view of the keys. Changing the key on only half of the data is ok as long as there’s no per-key aggregation or any joins. As soon as you insert any of those, physical repartitioning occurs by sending data to a keyed topic and consuming it right back in. This time, however, data that belongs to the same key, ends up getting sent to the same partition (from the independent instances), and the internal consumers read the data back with mutually exclusive keys because consumers own partitions exclusively. Now, any subsequent per-key computations are mutually exclusive.

This bring me to a slight diversion about what I call partitionable global data space (it has nothing to do with the book)

Kafka Streams—A DSL for manipulating Partitionable Global Data Space

WTH is a global data space? It’s a term that has seen some use in Industrial IoT space popularized by Data Distribution Service (DDS) community. A vendor-neutral explanation is found at the Object Management Group website.

Quoting OMG,

… To the application, the global data space looks like native memory accessed via an API… the global data space is a virtual concept that is really only a collection of local [in-memory] stores.

OK, that’s exactly how Kafka Streams make it appear (as long as you know the name of the topic and have serde for it). You know, map, filter are just local operators. But as soon as a join or per-key aggregation are used, repartitioning of the global data space (for that topic) happens so that the subsequent operators and state stores (RocksDB) operate on keys with strong locality (in a way, keys get pinned to a machine). There’s actually a place for the global data space; It’s stored physically in Kafka.

Scalability

Partitionable has another very important consequence—Scalability. In this day and age of big-data, no machine has enough resources to deal with the global data space in its entirety. So, divide and conquer, baby! Kafka with group-managed consumers…

Scaling an application is nearly trivial as you have to launch just a new instance and the workload gets distributed automatically, dynamically. Aggregation and join operators partition the global key space with mutual exclusion for efficient local key-value lookups. Note, however, any existing locally stored key-value mappings might have to copied to the new instance. So it may not be trivial. 😉

Oh, btw, KStream.through operator will also materialize your Stream, if you feel like so. This operator enables very flexible component boundaries. All operators in a Kafka Stream application need not have the same # of instances (cardinality). For instance, with some simple application-specific configuration and custom code, one could have first half of the flow with N instances and the second half with 2N instances as long as the two components are separated by materialized Kafka topic.

This is exactly the point made in the paper: Reactive Stream Processing for Data-centric Publish/Subscribe

“a stream of intermediate results can either be distributed over a DDS [also Kafka] topic for remote processing or can be used for local processing by chaining stream operators. The details of whether the “downstream” processing happens locally or remotely can be abstracted away using the Dependency Injection pattern. As a consequence, component boundaries become more agile and the decision of data distribution need not be taken at design time but can be deferred until deployment.”

If you are interested in learning why Kafka is a great choice for large-scale data movement for the Industrial IoT, you might be interested in my previous blogpost Kafka in Industrial IoT.

But, I digressed.

Chapter 3

It talks about to, print, foreach operators. These are terminal operators. I.e., you cannot chain operators downstream from these operators. It’s not clear why not. There’s a general-purpose KStream.peek operator that’s not terminal. It looks like one could write a non-terminal print using peek. May be I’m missing something and the book did not help here.

You also realize that the application DAG is “traversed” in depth-first fashion. That’s a strong indication that the internal implementation of KStreams is push-based—later confirmed in chapter 6, the processor API.

Finally, I was left wondering whether it would be possible to process different groups of keys in different threads or partition the keys across multiple threads for parallelization. Sounds like the current recommended way to partition processing of key-space is by launching more consumers (KStreamBuilder.stream). It may be worth to add an operator with custom thread-pool (ExecutorService) in the flow and distribute the local key-space among the threads.

Chapter 4

The book now gets into stateful transformations. For that KStream provides transformValues (instead of mapValues). transformValues accepts a ValueTransformerSupplier,  which is a SAM interface to create valueTransformer objects. However, it’s not clear whether such a factory needs to provide a different instances when called each time. Also, I’m still wondering under what circumstances, it might be called multiple times.

Speaking of State stores, there’s actually a statestore-supplier-factory-creator. You got that right. There’s such a thing. For instance, you can configure StreamBuilder with a state store supplier.

StateStoreSupplier stateStoreSupplier = 
  Stores.create("somename").withStringKeys().withIntegerValues().inMemory().build();
kStreamBuilder.addStateStore(stateStoreSupplier);

So what would you call Stores.create? It’s a statestore-supplier-factory-creator. Told you so!

I noted that branching (KStream.branch) and rekeying (repartitioning) are orthogonal. A non-partitioned stream if branched, stays a non-partitioned stream, similarly, a partitioned stream if branched stays partitioned.

For KStream-KStream joins, the streams should be co-partitioned. I.e., both streams (underlying Kafka topics) must have the same number of partitions and the producers should use the same partitioner. Otherwise, the co-location of keys is lost and joins are going to be incomplete/inaccurate.

Chapter 5

This is the heart of Kafka Streams… the fusion of Streams and Tables… KSTream+KTable. How would you interpret Stream as a table and table as a stream? It’s pretty straightforward. Quoting the book

“if we consider our stream of events as a log, we can consider this stream of updates [implicitly grouped by key] as a change-log… The distinction between the two is in a log you want to read all records, but in change log, you only want the latest record for each key.”

KTable is an abstraction of a change-log stream from a primary-keyed table. KTable emits updates. It’s typically a subset of events in a KStream (if there’re updates to the same key). When a KTable saves the state of the processor (a.k.a. committing), it forces a cache flush  and sends the latest updated, deduplicated records downstream. KTable is filtering with a twist—updates that not the latest are not seen.

Logically,

KTable = KStream + cache + commit interval

Smaller the cache, faster the updates; Shorter the commit time, faster the updates. So much so that, a KTable with zero size cache or zero commit interval is exactly same as KStream itself. The config names are cache.max.bytes.buffering and “commit.interval.ms”.

There are two ways to create a KTable.

  1. Directly from a keyed topic
  2. Apply groupBy + reduce operators to a KStream

One of the best examples of the power of Kafka Stream DSL from Chapter 5 of the book.

KTable<String, ShareVolume> shareVolume = 
  kStreamBuilder.stream(EARLIEST, stringSerde, stockTransactionSerde, STOCK_TOPIC) ①
                .mapValues(st -> ShareVolume.newBuilder(st).build())
                .groupBy((k, v) -> v.getSymbol(), stringSerde, shareVolumeSerde)
                .reduce(ShareVolume::reduce, "stock-transaction-reductions");

A note for those familiar with RxJava. Observable.groupBy in Rx returns a Observable<GroupedObservable<K,V>>—a stream of keyed streams. This nesting often requires flatMap so that the inner GroupedObservables are “subscribed” and activated. That’s different from how KStream does it. In KStream, groupBy returns KGroupedStream<K,V> (a flat structure), which is an intermediate stream only to be converted to a KTable using some sort of per-key  reduction/aggregation.

Windowed Computations

Joins are windowed computations because the streams are basically infinite. Joins are based on coincidence window. If two events have the same key and they coincide, you can join them. You decide what coincidence means by specifying a JoinWindows instance. JoinWindows.of(2000) creates a  window that spans 2 seconds before and 2 seconds after every event in the first stream. There’s JoinWindows.before if you want the window to end at the event and JoinWindows.after if you want the window to begin at the event. When you want to join on three or more KStreams (or KTables), you will need to chain joins and map the records with a new key and repeat the repartitioning process.

Using a window-based operators turn KStream<K,V> and KTable<K,V> to KStream<Windowed,V> and KTable<Windowed,V>. It is straightforward as well as elegant at the same time. Separating key and value types clearly helps. If Windowed stream gets a typed representation then why use just a flag for a to-be-repartitioned stream? It could also be well-typed as in KPartitionedStream<K,V> or something like that. I mean what if KStream.through returns a KPartitionedStream and KPartitionedStream.map returns KStream (non-partitioned), etc. Having a properly partitioned Stream is important for application correctness and types are about enforcing correctness. Just a thought.

There are there types of windows.

  1. Session Window
  2. Tumbling Window
  3. Sliding/Hopping Window

Sliding windows perform a new calculation after waiting for an interval smaller than the duration of the entire window. TimeWindow.advanceBy converts a tumbling window to a sliding window.

GlobalKTable allows non-key joins because all data is available locally. Also the event stream (that’s globally available) need not be partitioned by the key of the lookup data.

Chapter 6

This is my most favorite chapter because it makes me an owner of a successful brewery. Beer and Hops what not to like about that?

This chapter is about the “processor” API that’s lower-level than the Streams DSL described before. IN the KStream DSL, KTables​, have to rely on a combination of committing and cache flushing to forward records. In the lower-level API, there’s no such restriction. Forwarding events can be totally custom, dependent on data, or anything else for that matter. Cool.

Punctuator is a scheduler. That’s just plain english as it turns out. Local stores are RocksDB. It’s not clear why local stores need compaction. RocksDB should hide that complexity anyway right? With the in-memory and LRU based stores, infrequently used keys and values might eventually be removed. So to avoid that use a persistent store.

Is KStream a Monad?

Beloved flatMap gets mentioned in Chapter 6 for the first time and to my utter dismay it’s not about flattening nested KStream. There does not appear to be a way to create a nested KStream. So there’s no question of flattening one. So what is flatMap doing?

So KStream.flatMap is about flattening an Iterable. It accepts a mapper that returns an Iterable<KeyValue<K,V>>. The resulting KStream flattens the Iterable. In that sense KStream API tries to be practical rather than purist. I suppose there’s probably some hocus-pocus way to convert a KStream<K,V> into an Iterable but it does not appear to be built-in. So, no KStream is not truly monadic it call still flatten.

Chapter 7

This chapter talks about monitoring Kafka Stream applications. How to monitor consumer lag, producer and consumer interceptors. For example, consumer has onConsume and onCommit callbacks and producer has onSend and onAcknowledgement callbacks. Kafka Stream applications have StateListener and StateRestoreListenerwith callback such as onRestoreStart, onBatchComplete, onRestoreEnd. They are expected to be stateless.

Monitoring of state restoration from a change-log is built-in KStreams. The next chapter is about testing Kafka Streams applications. Kafka Streams allows testing the entire topology without Kafka running. In just a unit test. That’s cool.

Chapter 9

One of the coolest things in Kafka Streams is that it gives read-only access to the streaming application state store. It’s possible to expose it over REST or any other RPC mechanism of your choice. No RPC mechanism is provided however. It’s a just a plain K-V lookup.

When exposing the internal state store to the world, care should be exercised to allow queries only when the Kafka Streams application is the running state. Any other states such as shutting down, error, etc should disallow access to the local state store. This can be achieved by registering a statelistener.

Alright, this blog post ended up being a whirlwind tour of Kafka Streams and the book Kafka Streams in action.Any inaccuracies are mine. I hope someone out there finds this blogpost useful or at least will inspire someone to explore Kafka Streams.