Learning Kafka Streams in a Week (or less)

Happy New Year!!!

Last year I had an end-of-the-year resolution rather than a new-year resolution. I wanted to make sure that I do not end 2017 Christmas holiday break in an utter lack of productivity. Firing up a cold engine takes efforts, you know!

So, I had signed up to review a brand new book on Stream Processing by Manning publication. Kafka Streams in Action. It turned out to be just the right thing to keep cylinders firing through the 11 days break.

This is second book I reviewed for Manning and provided my feedback preprint. (The first one was Functional Programming in C++). It’s safe to say I like reviewing technical books. And why not? You get to learn something and you give something tangible back. A blog post like this is a bonus. And the best of all there’s a deadline—a short one. Three weeks in this case. In reality, I ended up getting just about a week to review 9 chapters.

TL;DR

If you use Kafka in your org already and anticipate developing a number of new applications that need to produce results in real-time, I would certainly recommend looking at Kafka Streams.

Kafka Streams in Action is the easiest way out there to learn Kafka Streams. Some background in Kafka, Stream Processing, and a little bit of functional programming background (comparable to Java 8 Streams API) will really accelerate learning—in a week or less!

Speed-Learning Kafka Streams

Part 1 of the book is about getting started with Kafka Streams. It’s introductory and you might want to read the first chapter if you are new to the stream processing world. Kafka knowledge is a must have so there’s second chapter on that.

From chapter 3 onwards things get interesting. There’s a ZMart example that evolves as you progress through the chapters. There are stateless operators like filter, filterNot, selectKey, branch, map, etc. They do what one might expect with a little catch. The selectKey and map operators are special. Both operators allow modifying the key arbitrarily. In essence they repartition the data—logically. Let me repeat, LOGICALLY.

The consequences of modifying the key can’t be understated. If aggregation or join operator is used subsequently, during that step, Kafka Streams physically materializes the KStream as a Kafka topic. Basically, it creates a new topic, creates a producer, produces all the data to the new topic (from the previously logically repartitioned KStream), creates a consumer, and consumes the newly created topic. That’s some heavy lifting.

Physical materialization of the KStream after logical repartitioning is essential because if you have two independent instances of the same Kafka Streams application running, each application is looking at only half of the data (remember consumers distribute partitions among themselves). The keys are not mutually exclusive as two instances could be maintaining their own view of the keys. Changing the key on only half of the data is ok as long as there’s no per-key aggregation or any joins. As soon as you insert any of those, physical repartitioning occurs by sending data to a keyed topic and consuming it right back in. This time, however, data that belongs to the same key, ends up getting sent to the same partition (from the independent instances), and the internal consumers read the data back with mutually exclusive keys because consumers own partitions exclusively. Now, any subsequent per-key computations are mutually exclusive.

This bring me to a slight diversion about what I call partitionable global data space (it has nothing to do with the book)

Kafka Streams—A DSL for manipulating Partitionable Global Data Space

WTH is a global data space? It’s a term that has seen some use in Industrial IoT space popularized by Data Distribution Service (DDS) community. A vendor-neutral explanation is found at the Object Management Group website.

Quoting OMG,

… To the application, the global data space looks like native memory accessed via an API… the global data space is a virtual concept that is really only a collection of local [in-memory] stores.

OK, that’s exactly how Kafka Streams make it appear (as long as you know the name of the topic and have serde for it). You know, map, filter are just local operators. But as soon as a join or per-key aggregation are used, repartitioning of the global data space (for that topic) happens so that the subsequent operators and state stores (RocksDB) operate on keys with strong locality (in a way, keys get pinned to a machine). There’s actually a place for the global data space; It’s stored physically in Kafka.

Scalability

Partitionable has another very important consequence—Scalability. In this day and age of big-data, no machine has enough resources to deal with the global data space in its entirety. So, divide and conquer, baby! Kafka with group-managed consumers…

Scaling an application is nearly trivial as you have to launch just a new instance and the workload gets distributed automatically, dynamically. Aggregation and join operators partition the global key space with mutual exclusion for efficient local key-value lookups. Note, however, any existing locally stored key-value mappings might have to copied to the new instance. So it may not be trivial. 😉

Oh, btw, KStream.through operator will also materialize your Stream, if you feel like so. This operator enables very flexible component boundaries. All operators in a Kafka Stream application need not have the same # of instances (cardinality). For instance, with some simple application-specific configuration and custom code, one could have first half of the flow with N instances and the second half with 2N instances as long as the two components are separated by materialized Kafka topic.

This is exactly the point made in the paper: Reactive Stream Processing for Data-centric Publish/Subscribe

“a stream of intermediate results can either be distributed over a DDS [also Kafka] topic for remote processing or can be used for local processing by chaining stream operators. The details of whether the “downstream” processing happens locally or remotely can be abstracted away using the Dependency Injection pattern. As a consequence, component boundaries become more agile and the decision of data distribution need not be taken at design time but can be deferred until deployment.”

If you are interested in learning why Kafka is a great choice for large-scale data movement for the Industrial IoT, you might be interested in my previous blogpost Kafka in Industrial IoT.

But, I digressed.

Chapter 3

It talks about to, print, foreach operators. These are terminal operators. I.e., you cannot chain operators downstream from these operators. It’s not clear why not. There’s a general-purpose KStream.peek operator that’s not terminal. It looks like one could write a non-terminal print using peek. May be I’m missing something and the book did not help here.

You also realize that the application DAG is “traversed” in depth-first fashion. That’s a strong indication that the internal implementation of KStreams is push-based—later confirmed in chapter 6, the processor API.

Finally, I was left wondering whether it would be possible to process different groups of keys in different threads or partition the keys across multiple threads for parallelization. Sounds like the current recommended way to partition processing of key-space is by launching more consumers (KStreamBuilder.stream). It may be worth to add an operator with custom thread-pool (ExecutorService) in the flow and distribute the local key-space among the threads.

Chapter 4

The book now gets into stateful transformations. For that KStream provides transformValues (instead of mapValues). transformValues accepts a ValueTransformerSupplier,  which is a SAM interface to create valueTransformer objects. However, it’s not clear whether such a factory needs to provide a different instances when called each time. Also, I’m still wondering under what circumstances, it might be called multiple times.

Speaking of State stores, there’s actually a statestore-supplier-factory-creator. You got that right. There’s such a thing. For instance, you can configure StreamBuilder with a state store supplier.

StateStoreSupplier stateStoreSupplier = 
  Stores.create("somename").withStringKeys().withIntegerValues().inMemory().build();
kStreamBuilder.addStateStore(stateStoreSupplier);

So what would you call Stores.create? It’s a statestore-supplier-factory-creator. Told you so!

I noted that branching (KStream.branch) and rekeying (repartitioning) are orthogonal. A non-partitioned stream if branched, stays a non-partitioned stream, similarly, a partitioned stream if branched stays partitioned.

For KStream-KStream joins, the streams should be co-partitioned. I.e., both streams (underlying Kafka topics) must have the same number of partitions and the producers should use the same partitioner. Otherwise, the co-location of keys is lost and joins are going to be incomplete/inaccurate.

Chapter 5

This is the heart of Kafka Streams… the fusion of Streams and Tables… KSTream+KTable. How would you interpret Stream as a table and table as a stream? It’s pretty straightforward. Quoting the book

“if we consider our stream of events as a log, we can consider this stream of updates [implicitly grouped by key] as a change-log… The distinction between the two is in a log you want to read all records, but in change log, you only want the latest record for each key.”

KTable is an abstraction of a change-log stream from a primary-keyed table. KTable emits updates. It’s typically a subset of events in a KStream (if there’re updates to the same key). When a KTable saves the state of the processor (a.k.a. committing), it forces a cache flush  and sends the latest updated, deduplicated records downstream. KTable is filtering with a twist—updates that not the latest are not seen.

Logically,

KTable = KStream + cache + commit interval

Smaller the cache, faster the updates; Shorter the commit time, faster the updates. So much so that, a KTable with zero size cache or zero commit interval is exactly same as KStream itself. The config names are cache.max.bytes.buffering and “commit.interval.ms”.

There are two ways to create a KTable.

  1. Directly from a keyed topic
  2. Apply groupBy + reduce operators to a KStream

One of the best examples of the power of Kafka Stream DSL from Chapter 5 of the book.

KTable<String, ShareVolume> shareVolume = 
  kStreamBuilder.stream(EARLIEST, stringSerde, stockTransactionSerde, STOCK_TOPIC) ①
                .mapValues(st -> ShareVolume.newBuilder(st).build())
                .groupBy((k, v) -> v.getSymbol(), stringSerde, shareVolumeSerde)
                .reduce(ShareVolume::reduce, "stock-transaction-reductions");

A note for those familiar with RxJava. Observable.groupBy in Rx returns a Observable<GroupedObservable<K,V>>—a stream of keyed streams. This nesting often requires flatMap so that the inner GroupedObservables are “subscribed” and activated. That’s different from how KStream does it. In KStream, groupBy returns KGroupedStream<K,V> (a flat structure), which is an intermediate stream only to be converted to a KTable using some sort of per-key  reduction/aggregation.

Windowed Computations

Joins are windowed computations because the streams are basically infinite. Joins are based on coincidence window. If two events have the same key and they coincide, you can join them. You decide what coincidence means by specifying a JoinWindows instance. JoinWindows.of(2000) creates a  window that spans 2 seconds before and 2 seconds after every event in the first stream. There’s JoinWindows.before if you want the window to end at the event and JoinWindows.after if you want the window to begin at the event. When you want to join on three or more KStreams (or KTables), you will need to chain joins and map the records with a new key and repeat the repartitioning process.

Using a window-based operators turn KStream<K,V> and KTable<K,V> to KStream<Windowed,V> and KTable<Windowed,V>. It is straightforward as well as elegant at the same time. Separating key and value types clearly helps. If Windowed stream gets a typed representation then why use just a flag for a to-be-repartitioned stream? It could also be well-typed as in KPartitionedStream<K,V> or something like that. I mean what if KStream.through returns a KPartitionedStream and KPartitionedStream.map returns KStream (non-partitioned), etc. Having a properly partitioned Stream is important for application correctness and types are about enforcing correctness. Just a thought.

There are there types of windows.

  1. Session Window
  2. Tumbling Window
  3. Sliding/Hopping Window

Sliding windows perform a new calculation after waiting for an interval smaller than the duration of the entire window. TimeWindow.advanceBy converts a tumbling window to a sliding window.

GlobalKTable allows non-key joins because all data is available locally. Also the event stream (that’s globally available) need not be partitioned by the key of the lookup data.

Chapter 6

This is my most favorite chapter because it makes me an owner of a successful brewery. Beer and Hops what not to like about that?

This chapter is about the “processor” API that’s lower-level than the Streams DSL described before. IN the KStream DSL, KTables​, have to rely on a combination of committing and cache flushing to forward records. In the lower-level API, there’s no such restriction. Forwarding events can be totally custom, dependent on data, or anything else for that matter. Cool.

Punctuator is a scheduler. That’s just plain english as it turns out. Local stores are RocksDB. It’s not clear why local stores need compaction. RocksDB should hide that complexity anyway right? With the in-memory and LRU based stores, infrequently used keys and values might eventually be removed. So to avoid that use a persistent store.

Is KStream a Monad?

Beloved flatMap gets mentioned in Chapter 6 for the first time and to my utter dismay it’s not about flattening nested KStream. There does not appear to be a way to create a nested KStream. So there’s no question of flattening one. So what is flatMap doing?

So KStream.flatMap is about flattening an Iterable. It accepts a mapper that returns an Iterable<KeyValue<K,V>>. The resulting KStream flattens the Iterable. In that sense KStream API tries to be practical rather than purist. I suppose there’s probably some hocus-pocus way to convert a KStream<K,V> into an Iterable but it does not appear to be built-in. So, no KStream is not truly monadic it call still flatten.

Chapter 7

This chapter talks about monitoring Kafka Stream applications. How to monitor consumer lag, producer and consumer interceptors. For example, consumer has onConsume and onCommit callbacks and producer has onSend and onAcknowledgement callbacks. Kafka Stream applications have StateListener and StateRestoreListenerwith callback such as onRestoreStart, onBatchComplete, onRestoreEnd. They are expected to be stateless.

Monitoring of state restoration from a change-log is built-in KStreams. The next chapter is about testing Kafka Streams applications. Kafka Streams allows testing the entire topology without Kafka running. In just a unit test. That’s cool.

Chapter 9

One of the coolest things in Kafka Streams is that it gives read-only access to the streaming application state store. It’s possible to expose it over REST or any other RPC mechanism of your choice. No RPC mechanism is provided however. It’s a just a plain K-V lookup.

When exposing the internal state store to the world, care should be exercised to allow queries only when the Kafka Streams application is the running state. Any other states such as shutting down, error, etc should disallow access to the local state store. This can be achieved by registering a statelistener.

Alright, this blog post ended up being a whirlwind tour of Kafka Streams and the book Kafka Streams in action.Any inaccuracies are mine. I hope someone out there finds this blogpost useful or at least will inspire someone to explore Kafka Streams.

 

Tuning Truly Global Kafka Pipelines

“Running Kafka pipelines is like flying a jet engine…”—someone commented during the Tuning Kafka Pipelines talk at the Silicon Valley Code Camp, 2017—“… You have to look at the ratios and not rpms like in a car engine….”

I could not agree more. Setting up Kafka is just the beginning. Monitoring and tuning Kafka pipelines for reliability, performance, latency, uptime, maintenance efficiency pose the true challenge in running Kafka in production.

In this talk, I’ve shared my experience and some valuable lessons of tuning the performance of a truly global production Kafka pipeline at Linkedin.

Furthermore, this was one of the best audience I’ve seen at the SVCC in years. There were more than 20 questions from the audience. Very engaged and knowledgeable. Check it out!

1:46 Here’s how a truly global kafka data pipeline looks like

5:30
Q: Is it not efficient to get data from follower replica?
A: Theoretically, it is possible to read data from the follower replica as long as the follower is in sync with the leader. However, Kafka does not support that (just yet).

6:15
Q: So the replication is just ensure resiliency in case broker #2 fails?
A: That’s right. If one of the brokers fails, the consumers can resume consumption from the follower replica.

8:56
Q: Why Kafka pipelines?
A: Kafka provides a very large, reliable, durable buffer between various stages in the pipeline. There are many other reasons too.

11:00
Q: What is the smallest Kafka pipeline?
A: One producer, one Kafka broker, one consumer

12:24 Running Kafka is not a trivial thing. Kafka needs a Zookeeper cluster.

12:38
Q: When is the right time to introduce Kafka?
A: Right now! Kafka should be multi-tenant. Multiple apps should be able to run through the same pipeline.

13:35 In a single process of mirror-maker there are multiple consumers but one producer

14:18 No single point of failure. Kafka achieves that by replicating data. Kafka pipelines achieve that by replicating data over multiple datacenter.

15:13 LinkedIn’s crossbar topology of Kafka pipelines. Active-active replication. Supports traffic shifts.

16:27 Durability interacts with latency and throughput. Highest durability with acks=all

18:11 If you want to increase throughput of the entire Kafka cluster, add more brokers and hosts. If you want to increase throughput of a specific application, increase the # of partitions. Pipeline throughput depends on number of things including colocation. “Remote consume” achieves the highest pipeline throughput.

20:16 Producer performance tuning. Effect of batching, linger.ms, compression type, acks, max in flight requests, etc.

23:30 Consumer performance tuning. More partitions more parallelization, fetch message max bytes.

25:13 Broker performance tuning. Performance depends on degree of replication. number of replication threads, batching. inter-broker replications over ssl has overheads

26:55
Q: Do the number of partitions affect producer performance?
A: I totally screwed this one up. They do. Increasing partitions allow more parallelism and more throughput. Producer can send more batches in flight. What was I thinking?

29:17
Q: How many partitions should be used? How do you determine the # of partitions?
A: There’s a formula. The answer depends on qps (messages per second), average message size, and retention time. These three factors multiplied gives you the total size on the disk of a single replica. However, size of replica partition on a single machine should not exceed over X GB (say 25 GB) because moving partitions (when needed) takes a very long time. Secondly, if a broker crashes and comes back up, it needs to rebuild the index from the stored log. The time for reboot is proportional to the size of partition on disk.

30:00 Kafka pipeline latency is typically a few 100 milliseconds. However, the SLA (for customers) of a pipeline would be much longer (in minutes) because pipeline maintenance may have some downtime when SLA can’t be met.

32:53
Q: Do you talk about Zookeeper tuning?
A: High write throughput is a problem with zookeeper.

33:00 Performance tuning of truly global production Kafka pipelines. Expected global replication SLA was under 30 minutes for 100 GB data (single topic). Pipeline to Asia had really long tail (3 hrs). Kafka Mirror Maker is a CPU-bound process due to decompression.

38:00
Q: What’s happening on the either side of the ramp in the chart?
A: It’s a Hadoop push of 100 GB data in 12 minutes. Hence ramp up and ramp down.

39:00 Production pipeline machine setup: 100 GB data pushed in 12 minutes, 840 mappers, 4 large brokers, broker replication over SSL, ACK=-1, 4 KMM clusters, 10 KMM processes each.

40:15
Q: Are all mirror-makers on the source datacenter or not?
A: Only one pipeline (out of 4) has mirror-makers on the destination datacenter.

41:16 Text book solution for the mirror-maker performance tuning. Remote Consumer-Local Produce. However, that was not practical.

43:09
Q: Why Under-replicated partitions in the source cluster?
A: Too many producers (Hadoop mappers in this case)

45:00 True production fire-fighting. A producer bug prevented us from sending 1 MB size batches. Mirror-makers used to bail out unpredictably due to a compression estimation bug (KAFKA-3995).

46:45 CPU utilization of 4 mirror-makers in a well-behaved global Kafka pipeline. Monitoring 100s of such metrics is the most important prerequisite in running a successful Kafka pipeline.

And here comes the best comment…

49:35 “… running Kafka pipelines is like flying with a Jet engine. Look at the ratios. They don’t maintain rpms like car …”  I agree!

50:45 The end!

This talk would not have been possible without continuous help from Kafka SREs and the Venice team in Linkedin. See Kafka and Venice articles on the Linkedin Engineering Blog.

Kafka in Industrial IoT

Sensors and smart data analysis algorithms are key to any Internet of Things (IoT) system. But they are not everything. The conduit that moves vast corpuses of data from sensors to data analysis systems is equally (if not more) important that makes the whole wheel spin. I.e., data movement is the life-blood of IoT systems.

Industrial IoT

Industrial IoT—the big brother of consumer IoT—is about collecting and analyzing data from machines, buildings, factories, power grids, hospitals and such. Like any other IoT system, IIoT systems need technologies to transport data reliably, securely, efficiently, and scalably (if there is a word like that). There are plethora of technologies out there that claim to be the right choice for the data movement: MQTT, CoAP, OPC, OPC-UA, DDS, Alljoyn, IoTivity to name just a few.

If you step back from this alphabet-soup and try to distill what these technologies are trying to achieve, common architectural patterns begin to emerge. These patterns have been well-documented previously. Specifically, the Industrial Internet Consortium (IIC) has published a reference architecture that describes three architectural patterns.

  • Three-tier architecture pattern
  • Gateway-Mediated Edge Connectivity and Management architecture pattern
  • Layered Databus pattern

I’ll pick the Layered Databus pattern for further discussion here because it’s a more specific instance of the three-tier architecture pattern, I’m more familiar with it, and it strongly resembles the architecture of the applications that use Kafka.

The Layered Databus Pattern

three-layer-databus-arch

What’s a databus anyway? And why does it rhyme so closely with a database?

The reference architecture defines it very generally as

“… a logical connected space that implements a set of common schema and communicates using those set of schema between endpoints.”

Then come the layers. The purpose of the layers is

“[to] federate these systems into a system-of-systems [to] enable complex, internet-scale, potentially-cloud-based, control, monitoring and analytic applications.”

That makes sense. The scope of the “databus”widens as you move up the hierarchy. Each databus layer has it’s own data-model (a fancy word for collection of schemas/types). The raw data is filtered/reduced/transformed/merged at the boundary where the gateways are. The higher layers are probably not piping through all the data generated at the lower layers. Especially in cases such as wind turbines that generate 1 terabyte of data per week per turbine.

OMG Data Distribution Service (DDS) is a standard api to design and implement Industrial IoT systems using this architectural style. It’s a good way to implement a databus. DDS brings in many unique aspects (e.g., declarative QoS model, < 100µs latency, interoperability) into the mix that is way beyond the scope of this article. It’s perhaps sufficient to say that I’ve seen the evolution of the Layered Databus pattern in the IIC reference architecture very closely during my 5+ years tenure at Real-Time Innovations, Inc.

So far so good…

What’s in it for Kafka, anyway?

I submit that one of the best technologies to implement the higher layers of “databus” is Kafka.

Kafkaesque IIoT Architecture

Kafka is, by definition, a “databus”.  Kafka is a scalable, distributed, reliable, highly-available, persistent, broker-based, publish-subscribe data integration platform for connecting disparate systems together. The publishing and consuming systems are decoupled in time (they don’t have to be up at the same time), space (they are located at different places), and consumption rate (consumers poll at their own pace). Kafka is schema-agnostic as it does not care about the data encoder/decoder technology used to transmit data. Having said that, you are much better off adopting a well-supported serde technology such as Avro, Protobuf and manage the collection and evolution of schemas using some sort of schema-registry service.

Data-Centric Kafka: Log-Compaction

According to the IIC reference architecture, a data-centric pub-sub model is “central to the databus”. A data-centric pub-sub model focusses on

  1. Defining unique instances of data sources
  2. Managing the lifecycle of the instances
  3. Propagation of modifications to the instance attributes as instance updates (not just a message).

It’s conceptually similar to a database table where each row has a primary key that determines the “instance”. The main difference is that the column values of rows (independent instances) change frequently and the middleware communicates them as first-class instance updates. The instance lifecycle events (e.g., creation of an instance, deletion of an instance) are first-class and can be observed. Note that data-centric pub-sub in no way requires a database.

Kafka has built-in abstractions for data-centric pub-sub communication model. They are called log-compacted topics. Nominally, a Kafka topic with a finite retention time, key-value pairs are deleted when it’s time. In a log-compacted topic, on the other hand, only a single value survives for a given key. Only the latest value of  given key-value pair survives until the tombstone for the key is written. Nominal retention isn’t applicable anymore. Only a tombstone deletes the key from the partition. The consumers can recognize the creation of a new key and deletion of an existing key via the consumer API. As a result, Kafka is fully equipped for data-centric communication model.

Any consumer that is subscribed to the log-compacted topic from the time topic is created is able to see all updates to the keys produced into the topic. This is because there’s a finite delay before Kafka selects keys for compaction. A caught-up consumer observes all the changes to all the keys.

A consumer that subscribes long after the creation of log-compacted topic only observes the final value of keys that have been compacted. Any such consumer given enough time catches up to the head of the topic and from that point on observes all the updates to the subsequently produced keys.

This feature enables many use-cases.

  • The first one is database replication. Large internet companies like Linkedin have moved away from the native replication technology of say Oracle/MySQL replication and have replaced them with incremental data capture pipelines using Kafka.
  • The second use-case is backing up local key-value stores of stream-processing jobs. The processing state of Samza/Kafka-Streams jobs is periodically check-pointed locally and to Kafka log-compacted topics as a backup.

Data-Centric Kafka All the Way

There are two key aspects that makes Kafka a great choice for the layered databus architectural pattern.

  1. Free (as in Freedom) Keys
  2. KafkaStreams for Dataflow-oriented Application Implementation

Free (as in Freedom) Keys

Kafka’s notion of a record (a.k.a message) is a key-value pair, where key and value are orthogonal to each other. The key, even if it’s just a subset of attributes in the value (e.g., a userId, sensorId, etc.), is separate from the value. The key could contain attributes outside the value altogether. The key structure (type) is in no way statically bound to the value structure (type). The key attributes can grow/shrink independently of value should the need arise.

This flexibility allows application designers to publish a stream of values to different topics each with a potentially different key type. This could also be done in a serial pipeline where an application reads a log-compacted topic with (k1, v1) type and republishes it with (k2, v1) type. I.e., Kafka accommodates logical repartitioning of the dataspace to suite the target application semantics.  (Note: The logical repartitioning is not to be confused with with physical repartitioning by increasing/decreasing the number of partitions. The former is about domain-level semantics, while the later is about managing parallelism and scalability.)

KafkaStreams for Dataflow-oriented Application Implementation

The data-centric principles should not be limited to just the communication substrate. If the consuming application is structured around the notion of logically parallel, stateful transformations of data in a pipeline (i.e., a flow) and the infrastructure handles state management around the core flow abstraction, there is much to gain from such an application design. First, the application can scale naturally with the increasing number of simultaneously active keys (e.g., growing number of sensors). It can also scale easily with the increasing data volume and update velocity (e.g., growing resolution/accuracy of the sensors). Scaling for both dimensions is achieved by adding processing nodes and/or threads because the dataflow design admits concurrency very easily. Managing growing scale requires effective strategies for distributed data partitioning and multi-threading on each processing node.

Not coincidently, dataflow-oriented stream processing libraries have become increasingly popular due to improved level of abstraction, ease of use, expressiveness, and their support for flexible scale-out strategies. Dataflow-oriented design and implementation has been shown effective in easily distributing the workload across multiple processing nodes/cores. There are many options including Apache Beam, Apache SamzaApache Spark, Apache Flink, Apache Nifi, and Kafka Streams for Kafka.

Check out how Kafka Streams relates to Global Data Space.

The most touted middleware for implementing the layered databus pattern, DDS, supports dataflow-oriented thinking at the middleware level but the API makes no attempt to structure the applications around the idea of instance lifecycle. Having said that, Rx4DDS is a research effort that attempts to bridge the gap and enforce data-centric design all the way to the application layer.

Once again, Kafka comes to rescue with Kafka Streams which includes high-level composable abstractions such as KStream, KTable, and GlobalKTable. By composable, I mean it’s an API DSL with fluent interface. KStream is just a record stream. KTable and GlobalKTable are more interesting as they provide a table abstraction where only the last update to a key-value pair is visible. It’s ideal for consuming log-compacted topics. Application-level logic is implemented as transformations (map), sequencing (flatMap), aggregations (reduce), grouping (groupBy), and joins (product of multiple tables). Application state management (snapshots, failure-recovery, at-least once processing) is delegated to the infrastructure.

These capabilities are by no means unique to Kafka Streams. The same ideas surface in number previously mentioned stream processing frameworks. The point, therefore, is that Kafka together with stream processing frameworks provide an excellent alternative to implement the layered databus pattern.

Kafka Pipelines for Federation (aka Layers)

It does not take a rocket scientist to realize that the “Layered Databus” is just a geographically distributed Kafka pipeline (or pipelines). Yeah, that’s everyday business for Kafka.

A common purpose of deploying Kafka pipelines in Internet companies is tracking. User activity events captured at front-end applications and personal devices are hauled to offline systems (e.g., Hadoop) in near realtime. The tracking pipelines commonly involve “aggregate” clusters that collect data from all regional clusters. The regional clusters are deployed world-wide. Kafka Mirror-makers are used to replicate data from one cluster to the another over long latency links. It’s essentially a federation of Kafka clusters for data propagation in primarily one direction. The other direction is pretty similar.

Here’s how the same architecture can be applied to an abstract IIoT scenario.

kafka-pipeline-iiot

This picture shows how Kafka can be used to aggregate data from remote industrial sites.

A gateway publishes data from machines to a local Kafka cluster where it may be retained for a few days (configurable). The mirror-maker cluster in the data-centers securely replicate the source data into the aggregate clusters. Large volumes of data can be replicated efficiently by using built-in compression in Kafka. Enterprise apps consume data from the aggregate clusters.

A pipeline in reverse direction could be used for the control data. The mirror-maker cluster is separate due to direction and different latency requirements. The control pipeline in the replica data-center is passive.

Interestingly, a similar architecture is used by British Gas in production.

Kafka offers a number of advantages in this case.

  • Aggregate clusters support seamless failover in the face of regional unavailability.
  • Kafka and mirror-makers support no-loss data pipelines.
  • There’s no need of a separate services for durability as Kafka itself is a reliable, persistent commit log.
  • With growing scale (more topics, more keys, larger values, faster values, etc), more instances of Kafka brokers and mirror-maker can be easily added to meet the throughput requirements.
  • Identity mirroring maintains the original instance lifecycle across the pipeline. Identity mirroring means that the source and destination clusters have same # of partitions and keys that belong in partition p in the source cluster also belong to partition p in the destination cluster. Order of records is guaranteed.
  • Whitelisting and blacklisting of topics in mirror-maker allows only a subset of data to pass through.
  • Replication is secured using SSL.
  • Kafka is designed for operability and multi-tenancy. Each Kafka cluster can be monitored as Kafka publishes numerous metrics (as JMX Mbeans) for visibility, troubleshooting, and alerting during infrastructure problems.

Summary

Kafka (the infrastructure) provides a powerful way to implement the layered databus architecture (a.k.a system-of-systems) for Industrial IoT. Kafka Streams (the API) provides first-class abstractions for dataflow-oriented stream processing that lends itself well to the notion of data-centric application design. Together, they allow the system architects and other key stake-holders to position their assets (apps and infrastructure) for future growth in unique sensors, data volume/velocity, and analysis dimensions while maintaining a clean dataflow architecture.