Rust-In-Action Book Review

Manning publication gave me yet another funtastic opportunity to learn a new technology and contribute back to the technical community at the same time. I am privileged to help in the making of Manning’s Rust-In-Action written by Tim McNamara.

Rust In Action

Rust In Action is for programmers who aspire to be system programmers without the background of C and C++. This book is not necessarily an in-depth review of every nook and cranny of Rust programming language (although there are quite a few details). It’s geared more towards programmers who want to learn system programming through a modern language.

If this book were could be named “System Programming Using Rust”. It won’t be a misnomer. As such, Rust In Action is also a fine name. After all, you learn Rust through application to system programming.

Who is this book for

This book is for folks new to Rust AND systems programming. The readers learn to do various systems programming tasks using Rust. A number of operating system aspects are covered in this book including memory, stack, heap, pointers, virtual memory, file I/O, signals and interrupts, clocks, network programming, and more. In that sense it’s a very good starting point for CS students learning system programming fundamentals in a modern language. 

From Rust language point of view significant material is covered including program structure, lifetime, borrow and ownership semantics, pointers, references, functions, closures, smart pointers, generics, traits and so on. A number of examples show usage of standard and third-party libraries including argument parsing, file i/o, reference counting, common traits, and so on. Each chapter includes good commentary about Rust tooling including Cargo commands, debug/release program compilation, package management, creating crates, taking third-party crate dependencies etc.

Having said that, it feels a little light on deeper aspects core Rust and systems programming. Being a language guy, I was hoping to dive into Rust, the language, as much as possible. I’m generally ready for clever programs and knock-your-socks-off kind of abstractions— to learn but not necessarily in production code ;). You won’t find them in this book. That’s not the focus here. Secondly, proficient system-level programmer will learn a new syntax and libraries for doing what you already know—albeit with more safety.

Let’s look at it chapter by chapter. The source code for each chapter is available on github.

Chapter 1: Introduction

A number of things stand out in chapter 1. The key selling point of Rust are Safety, control, productivity, and fearless concurrency. Rust has a standard build system, package manager, and documentation generator called Cargo. There are library and app templates for Cargo, which help get a project off the ground very fast. It also provides an ability for library writers to tweak the compiler’s behavior with plugins. Data in Rust is immutable by default. This aids in safety and concurrency aspects. Finally, errors have standard numbers in Rustc—the Rust compiler. For example, see error E0308. Neat.

error[E0308]: mismatched types
  --> hello_world.rs:3:6
   |
 3 |   if a = 10 {
   |      ^^^^^^
   |   
   |
   |      expected bool, found ()
   |      help: try comparing for equality: `a == 10`
   |
   = note: expected type `bool`
              found type `()`

 error: aborting due to previous error

 For more information about this error, try `rustc --explain E0308`.

The content in the first chapter is a little sparse. Too much text is about the community and community’s goals. I would have liked seeing something that blew my mind. Perhaps something from generics, concurrency, safety, expressiveness, etc. The example code including UTF-8 German and Japanese letters is interesting but not enough to my taste. The book barely mentions the paradigms supported by the language. I think the book mentions that Rust is not object-oriented. Paradigm-level comparison with other languages would be nice to have.

Chapter 2: Language Foundations

Chapter 2 in the book is where you really start to get a feel for the languages. There are examples of standard library, regex library, argument parsing library, pattern matching, I/O, error-handling, command-line argument parsing, reading from stdin, and constrained generics, etc.

Cargo makes life really easy. A good standard package manager and build system is a must for a modern language. All details of linking/libs are hidden and still the programmers are productive in Rust.

Some highlights from chapter 2

  1. Signed and unsigned integers. i8, i16, i32, i64, u8, u16, u32, u64, isize, usize (native CPU width)
  2. String and str are two different types. Compare that to C++ char *, const char *, std::string, and more.
  3. Size of the array matters to the type system. For example, [u8; 3] is a different type than [u8; 4].
  4. Slices for arrays resembles a lot to C++ spans.
  5. Rust has Scala-like pattern matching called match.
  6. Command line arguments are not passes via arguments to main().
  7. Regex pattern matching, argument parsing, file I/O (e.g., open file, read line) all return Result<T,E> an either type, which minimally must be unwrapped. I.e., unwrapping an error result is by-default a panic (like an exception).
  8. Regex.find(&line). Pass-by-reference is a caller’s responsibility. This is the first time you encounter Rust automatic move-semantics

Chapter 3: Compound Data Types

The third chapter talks about compound data types including structs, enumerations, and Rust’s Result type. Structs combined with Traits appear a lot like objects. A thing that stands out about Rust structs is separation of struct definition and application of a trait (an interface) to a struct. I.e., It seems possible to attach multiple traits to a struct type after the fact. This is a really nice feature. Interfaces in C++, Java are intrusive—they must be BOTH known AND attached at the time of definition of the struct/class. Rust looks different. Here’s an example from the book.

#[derive(Debug)]
struct File {
  name: String,
  data: Vec,
}
trait Read {
  fn read(self: &Self, save_to: &mut Vec) -> Result;
}
impl Read for File {
  fn read(self: &File, save_to: &mut Vec) -> Result {
    Ok(0)
  }
}

First, Read is defined after File. Later, the capability of Read (i.e., read function) is “attached” to File after it is defined. Third, &self is of type non-mutable reference to File. I wonder though if it works across translation units if different translation units apply different set of traits or different implementations of the same trait. Basically, how does Rust ensure C++’s ODR (one definition rule)?

This chapter is lacking enough discussion of references and full discussion of & (which is chapter #4). So it’s unclear why match would use *self but accessing struct members is self.name and self.state.

Constructors in Rust are named constructors. For example, File::new(…). In this case, new is a convention. I wonder if they can be overloaded. I tried. Overloading did not compile. Rust has unsafe keyword and block. It provides the same level of safety offered by C at all times. There’s const keyword for values that may never change.

A few thoughts popped up while reading this chapter.

Chapter 4: Lifetimes, Ownership, and Borrowing

This is the chapter I’m primarily interested in. I’ve heard a lot about linear types–move semantics by default. In Rust, compound data types have move semantics by default. I.e., Simple expressions like assignments are in fact moves. Passing objects to and from functions is a move by default. Move occurs when a compound data type does not implement the Copy trait. Due to default move semantics, one of the main things compiler does is track use-after-move errors and fail compilation. That’s kinda neat.

The types that implement the Copy trait is basically same trivially_copyable types in C++. I.e., simple byte by byte copy is sufficient to create another value. Types with Clone trait implementation are types with user-defined copy-constructor. Primitive types have copy-semantics by default.

This example is the simplest example of move semantics checked by Rust compiler

OKCompiler Error
#[derive(Debug, Copy, Clone)]
struct CubeSat { id: u64 }
fn main () {
  let sat_a = CubeSat { id: 0 };

  let mut sat_b = sat_a;
  sat_b.id = 1;
  println!("a: {:?}, b: {:?}", sat_a, sat_b); // 0, 1
}

#[derive(Debug)]
struct CubeSat { id: u64 }
fn main () {
  let sat_a = CubeSat { id: 0 };

  let mut sat_b = sat_a;
  sat_b.id = 1;
  // Can’t use sat_a below because it has
  // been moved to sat_b. Compiler error!
  println!("a: {:?}, b: {:?}", sat_a, sat_b);
}

The best part of course is the clarity of error diagnostics. Just take a look yourself.

error[E0382]: use of moved value: `sat_a`
  --> ch4-check-sats-clone-and-copy-traits.rs:25:32
   |
23 |   let mut sat_b = sat_a;
   |       --------- value moved here
24 |   sat_b.id = 1;
25 |   println!("a: {:?}, b: {:?}", sat_a, sat_b);
   |                                ^^^^^ value used here after move
   |
   = note: move occurs because `sat_a` has type `CubeSat`, which does not implement the `Copy` trait

error: aborting due to previous error

The compiler is making abundantly clear why it does not like your program. Hats off to the Rust compiler writers!

Attempting to overwrite a value that’s still available from elsewhere in the program will cause the compiler to refuse to compile your program. There is a distinction between a value’s lifetime and its scope. When values go out of scope or their lifetimes end for some other reason, their destructors are called. A destructor is a function that removes traces of the value from the program by deleting references and freeing memory.

To provide a custom destructor for a type, implement Drop trait. This will typically be needed in cases where you have used unsafe blocks to allocate memory. Drop has one method, drop(&mut self) that you can use to conduct any necessary wind up activities. It is possible to return ownership of objects back to their original (caller’s scope) variables via functions’ return values.

In Rust Copy semantics can be controlled using Copy and Clone. Copy is implicit. Copy is a shallow copy and C++ does it by default. Clone is potentially deep expensive copy (a copy-constructor). Copy implies Clone. That’s weird.

The book mentions four general strategies can help with ownership issues:

  1. Use references where full ownership is not required (pass-by-reference)
  2. Duplicate the value (copy-semantics)
  3. Refactoring code to reduce the number of long-lived objects
  4. Wrap your data in a type designed to assist with movement issues

I could not help but wonder what happens when an object is borrowed but there’s an exception before using it? Is the ownership passed back to the caller?

The main Rust highlights I learned from this chapter are as follows.

  • Attempting to overwrite a value that’s still available from elsewhere in the program will cause the compiler to refuse to compile your program.
  • There is a distinction between a value’s lifetime and its (lexical) scope. This is true in case C++ as well although not by default.
  • An owner cleans up when its value’s lifetimes end. Basically, destructors.
  • When values go out of scope or their lifetimes end for some other reason, their destructors are called.
  • To provide a custom destructor for a type, implement Drop. This will typically be needed in cases where you have used unsafe blocks to allocate memory. Drop has one method, drop(&mut self) that you can use to conduct any necessary wind up activities. The argument is a reference to a mutable *this. 
  • It is possible to return ownership of objects back to their original (caller’s scope) variables via functions’ return values

The questions that remain unanswered for me are

  • What happens when an object is borrowed but there’s an exception before using it? Is the ownership passed back to the caller? In C++, just documenting this would suffice if the argument is of type Foo&&.
    • Move seems to happen conditionally in Rust. If the function is unsuccessful before using the moved value, the function must move the value back to the caller via the return type. This affects the api.

What could be better in this chapter?

  • In section 4.5.4, a sentence suggests that rust allows programmers to opt-in to runtime garbage collection. The section talks about reference counting Rc<T>. Reference counting is not runtime garbage collection. An important difference between reference counting and garbage collection is determinism. Garbage collection as in Java/C# is non-deterministic. Reference Counting and destruction is deterministic, right? Reference counting is not runtime garbage collection.
  • Dereferencing is not explained in this chapter. It has been used earlier but it still remains illusive. Other than that this chapter is a good read. Reference counting is introduced but no discussion about breaking the cycles is presented. Not sure if it’s later in the book.

Chapter 5: Data In Depth

This chapter goes in great detail to shed light on low-level bit manipulation and CPU instruction processing. This chapter gives programmers a taste of low level machine representation of integers. Integer overflow, underflow are interesting concepts known in systems programming circles but not necessarily to freshers. I skimmed this chapter as the pain of trying to understand low-level bit manipulation was really excruciating.

The following quote stood out from the chapter.

Developing strategies for preventing integer overflow is one of the ways that systems programmers are distinguished from others.

Chapter 6: Memory

The chapter on memory is ambitious and at the same time not enough. It’s leaning towards computer memory management, operating systems, virtual memory, paging, low-level program image kind of things (an area dominated by C forget C++ and Rust) rather than the “language” aspects to simplify programming. It tries to open up low level details of a computer but if you already know them, there isn’t anything to learn here. At the same time, you feel slightly cheated that a chapter that would been fine place for Box, Rc, Arc, Arena, etc, get only lip-service.

A chapter on memory in a high-level programming language book should perhaps focus on managing struct sizes, cache locality, combined control block and data optimization (std::make_shared kid of optimizations), small string optimizations, if it exists in Rust, Weak pointers, shared_from_this, etc.

This chapter feels like someone rewrote a memory chapter from an operating systems book and sprinkled some Rust. I felt it overuses unwrap(). To bypass error handling, there are some usages like unwrap().unwrap(). This is simplistic and dying for “for comprehensions” for the Option type.

This chapter sets expectation to learn pointers, smart pointers, stack, heap, etc. It feels like reading a book on C. Rust Foreign Function Interface (FFI).

The following things stand out in the chapter.

Rust’s std::os::raw::c_char is like C’s char—the sizeof char is not set by the standard. Same with Rust. Slices have no compile-time length. Internally, they’re a pointer to some part of an array.

As a library author, it can simplify downstream application code if you could accept both &str and String types to your functions. For example the following code does not work with because pw is &str.

fn is_strong(password: String) -> bool { 
  password.len() > 5 
}
let pw = “justok”; 
is_strong(pw);

Generics and implicit conversion strategies must be used to allow the program to work.

fn is_strong(passwd: String) -> bool {
  passwd.len() > 5
}
let s : String = String.from(“justok”);
is_strong(s); // OK
is_strong(s); // NOT OK. Use-after-move
is_strong(“justok”); // NOT OK. type mismatch
fn is_strong(passwd: &str) -> bool {
  passwd.len() > 5
}
is_strong(“justok”); // OK
let s : String = String.from(“justok”);
is_strong(s); // does not compile
fn is_strong<T: AsRef<str>>(passwd: T) -> bool {
  passwd.as_ref().len() > 5 
}
is_strong(“justok”); // OK
let passwd : String = String::from(“justok”);
is_strong(passwd); // OK
is_strong(passwd); // NOT OK. Huh!
// use-after-move. It’s still a compound type.
fn is_strong<T: AsRef<str>>(passwd: T) -> bool {
  passwd.as_ref().len() > 5 
}
let passwd : &str = “justok”;
is_strong(passwd); // OK
is_strong(passwd); // OK. Not a use-after-move
fn is_strong<T: Into<String>>(passwd: T) -> bool {
  passwd.into().len() > 5 
}
let passwd : String = String::from(“justok”);
is_strong(passwd); // OK
is_strong(passwd); // Not OK. Use-after-move
fn is_strong<T: Into<String>>(passwd: T) -> bool {
  passwd.into().len() > 5 
}

let passwd : &str = “justok”;
is_strong(passwd); // OK
is_strong(passwd); // OK. Not a use-after-move
AsRef<String>Exploded—doesn’t have a size known at compile-time. Huh!

There are a lot of moving parts here. I struggled here to pick the right type just like rookie C++ programmers struggle in modern C++. Clearly, if you know C++ well, learning Rust requires some non-trivial unlearning and new learning. Hopefully, the samples above give you a decent idea of the level of thinking programmers have to do to pass a string ergonomically and efficiently. 

  • Overloading is disallowed in Rust. So you have to choose the right version. I feel powerless.
  • The book suggests this: “To improve dynamic allocation speed one alternative is to use arrays of uninitialized objects. But you’re circumventing Rust’s lifetime checks.”

For programmers well-versed in C and C++, there’s a difference between “a const pointer to T” and “a pointer to const T” and they can’t be used interchangeably. It would be very helpful to clarify if that distinction exists in Rust. I.e., “*mut T” and “*const T”.

Chapter 7: Files and Storage

Chapter 7 deals with Storage, file I/O, checksums, endianness, HashMap and BTreeMap data structures. By the end of the chapter, you would have built a working key-value store that’s guaranteed to be durable to hardware failure at any stage.

Chapter 8: Networking

The chapter on networking looks encouraging as it peels the layers of the HTTP network stack one-by-one. Lot of error handling and a thorough discussion to trait objects.

Compiling ch-8/ch8-simple was a breeze. The following Cargo.toml downloaded all the necessary dependencies and compiled the main program under a minute.

[package]
name = "ch8-simple"
version = "0.1.0"
authors = ["Tim McNamara <code@timmcnamara.co.nz>"]
edition = "2018"

[dependencies]
reqwest = "0.9"

And the program.

extern crate reqwest;

fn main() -> Result<(), Box<dyn std::error::Error>> {
  let url = "http://www.rustinaction.com/";
  let mut response = reqwest::get(url)?;

  let content = response.text()?;
  print!("{}", content);

  Ok(())
}

Box<dyn std::error::Error> is a trait object which enables runtime polymorphism. Trait objects are a form of type erasure. The compiler does not have access to the original type. &Trait is a reference to something that implements “Trait” whereas &Type is a reference to an object of type Type. Traits are used to create collections of heterogenous objects and dynamic dispatch.

Next, the chapter talks about ergonomic error handling. Rust has short form syntax for unwrapping a Result<T, E> type. It’s just a ? (question mark). Here’s an example.

use std::fs::File;
use std::net::Ipv6Addr;
fn main() -> Result<(), std::io::Error> {
  let _f = File::open("invisible.txt")?;
  let _localhost = "::1".parse::<Ipv6Addr>()?;
  Ok(())
}

Did you notice it? It’s so inconspicuous. The ? is roughly equivalent to the following pseudo-code.

macro try {
  match File::open("invisible.txt") {
    Result::Ok(val) => val,
    Result::Err(err) => {
      let converted = convert::From::from(err);
      return Result::Err(converted);
    }
  });
}

The chapter later on describes creating an enumeration type UpstreamError that is a union (Either type) of IO and ParseError. This technique enumerates error types of two unrelated libraries into one. I’m unsure about the craftsmanship of such composite error types. As more and more libraries are used, such a union type may bloat over time having to support conversion from a myriad (unrelated) error types. On the flip side it allows only enumerated error types to be converted—adding stronger type safety.

#[derive(Debug)]
enum UpstreamError{
  IO(io::Error),
  Parsing(net::AddrParseError),
}
impl From<io::Error> for UpstreamError {
  fn from(error: io::Error) -> Self {
    UpstreamError::IO(error)
  }
}
impl From<net::AddrParseError> for UpstreamError {
  fn from(error: net::AddrParseError) -> Self {
    UpstreamError::Parsing(error)
  }
}
fn main() -> Result<(), UpstreamError> {
  let _f = File::open("invisible.txt")?; // Calls From::from
  let _localhost = "::1".parse::<Ipv6Addr>()?; // Calls From::from
  Ok(())
}

I liked how Rust compiler inserts From::from(err) calls where ? is used. The extra conversion function (hook) allows descriptive error messages to be retained and bubble up. Rust-lang.org has an interesting how try! macro evolved into the ? syntax.

An alternative is to use unwrap() and expect() but that’s like assuming that an Option will always have a value.

This chapter covers a lot of ground including MAC addresses, TCP, UDP, DNS, error handling, traits, and Rust crates such as smoltcp, std::net, trust_dns, etc. It has a good balance to language and systems knowledge.

Chapter 9: Time and Time Keeping

This chapter has interesting discussion of a variety of different clocks: realtime clock, system clock, monotonically increasing clock, steady clock, high accuracy clock, high resolution clock, fast clock, and atomic clock. This is the longest list of types of clocks I’ve ever seen. Neat.

It also talks about setjmp and longjmp—which never gets stale as it’s a really old nifty tool to hack program stack. 

My general thoughts after reading this chapter.

  • Rust has #[cfg(not(windows))] which works a lot like conditional macros in C. Basically, there’s no escape from platform-specific code unless you are using managed runtimes (JVM, .NET).

Chapter 10: Processes, Threads, and Containers

This chapter introduces Rust closures—anonymous functions. It also exposes you to the standard library and crossbeam and rayon crates. Crossbeam is used for asynchronous messaging passing whereas rayon is used for parallel programming in Rust.

The book shows a comparison between thread::sleep and spinning loop to kill wall clock time. Data shows that as the number of threads increases beyond the number of cores (hyper-threaded) in a CPU, the accuracy of operating system sleep(20ms) is better than a spinning for 20ms. At 500 threads and beyond the variance in “spinning pause” is significantly higher than “OS sleep”. What if the spin loop is much shorter than 20ms?

Even for a small number of threads (less than 20), comparing the time taken to wait for 20ms using sleep and spin loop strategies, shows that sleep is more accurate (less variance) than spinning.

Mutex and Arc (Atomic Reference Count) are not unified into a single type in Rust to provide programmers with added flexibility. Consider a struct with several fields. You may only need a Mutex on a single field, but you could put the Arc around the whole struct. This approach provides faster read access to the fields that are not protected by the Mutex. A single Mutex retains maximum protection for the field that has read/write access.

Chapter 11: Kernel

Chapter 11 in this book describes how a minimalistic operating system kernel can be built using Rust. Why would you do that? Well, very small embedded devices have just one program running in them. It is intriguing that Rust caters to such environments. I had no idea.

Rust build-system comes ready with cross compilation to many platforms. On my Mac installation of rust, “rustc target list” showed 77 architectures. That’s cool.

Rust enums can specify size of each enumeration using #[repr(u8)] annotation. C++ equivalent would be an “enum class Foo : uint8_t { …}”. Writing to raw memory referred by a pointer can be done in two different varieties. The following two Rust snippets are equivalent.

let mut framebuffer = 0xb8000 as *mut u8;
  unsafe {
    framebuffer.offset(1).write_volatile(0x30);
  }

And direct pointer arithmetic.

let mut framebuffer = 0xb8000 as *mut u8;
  unsafe {
    *(framebuffer + 1) = 0x30;
  }

This chapter goes into a lot of low level details such writing to VGA framebuffer, kernel panic handler, the halt instruction, etc. This low-level fiddling with memory reminded me of MS-DOS programming back in 1990s. “Advanced MS-DOS Programming” by Ray Duncan anyone?

Chapter 12: Signals, interrupts, exceptions

I feel this chapter’s name should not include interrupts and perhaps exceptions either. Simply “signal handling” because both interrupts as in hardware interrupts are not discussed much. Most programmers think about Exceptions as in language level control-flow rather than Intel’s definition of exceptions. Unless this chapter is extended to include hardware interrupts and exceptions.

All in all, this book is a good start for aspiring system programmers. For seasoned system programmers wanting to learn the language the book may be a good start. For hardcore language fanatics, I would recommend the book by Carol Nichols and Steve Klabnik—The Rust Programming Language.

Learning Kafka Streams in a Week (or less)

Happy New Year!!!

Last year I had an end-of-the-year resolution rather than a new-year resolution. I wanted to make sure that I do not end 2017 Christmas holiday break in an utter lack of productivity. Firing up a cold engine takes efforts, you know!

So, I had signed up to review a brand new book on Stream Processing by Manning publication. Kafka Streams in Action. It turned out to be just the right thing to keep cylinders firing through the 11 days break.

This is second book I reviewed for Manning and provided my feedback preprint. (The first one was Functional Programming in C++). It’s safe to say I like reviewing technical books. And why not? You get to learn something and you give something tangible back. A blog post like this is a bonus. And the best of all there’s a deadline—a short one. Three weeks in this case. In reality, I ended up getting just about a week to review 9 chapters.

TL;DR

If you use Kafka in your org already and anticipate developing a number of new applications that need to produce results in real-time, I would certainly recommend looking at Kafka Streams.

Kafka Streams in Action is the easiest way out there to learn Kafka Streams. Some background in Kafka, Stream Processing, and a little bit of functional programming background (comparable to Java 8 Streams API) will really accelerate learning—in a week or less!

Speed-Learning Kafka Streams

Part 1 of the book is about getting started with Kafka Streams. It’s introductory and you might want to read the first chapter if you are new to the stream processing world. Kafka knowledge is a must have so there’s second chapter on that.

From chapter 3 onwards things get interesting. There’s a ZMart example that evolves as you progress through the chapters. There are stateless operators like filter, filterNot, selectKey, branch, map, etc. They do what one might expect with a little catch. The selectKey and map operators are special. Both operators allow modifying the key arbitrarily. In essence they repartition the data—logically. Let me repeat, LOGICALLY.

The consequences of modifying the key can’t be understated. If aggregation or join operator is used subsequently, during that step, Kafka Streams physically materializes the KStream as a Kafka topic. Basically, it creates a new topic, creates a producer, produces all the data to the new topic (from the previously logically repartitioned KStream), creates a consumer, and consumes the newly created topic. That’s some heavy lifting.

Physical materialization of the KStream after logical repartitioning is essential because if you have two independent instances of the same Kafka Streams application running, each application is looking at only half of the data (remember consumers distribute partitions among themselves). The keys are not mutually exclusive as two instances could be maintaining their own view of the keys. Changing the key on only half of the data is ok as long as there’s no per-key aggregation or any joins. As soon as you insert any of those, physical repartitioning occurs by sending data to a keyed topic and consuming it right back in. This time, however, data that belongs to the same key, ends up getting sent to the same partition (from the independent instances), and the internal consumers read the data back with mutually exclusive keys because consumers own partitions exclusively. Now, any subsequent per-key computations are mutually exclusive.

This bring me to a slight diversion about what I call partitionable global data space (it has nothing to do with the book)

Kafka Streams—A DSL for manipulating Partitionable Global Data Space

WTH is a global data space? It’s a term that has seen some use in Industrial IoT space popularized by Data Distribution Service (DDS) community. A vendor-neutral explanation is found at the Object Management Group website.

Quoting OMG,

… To the application, the global data space looks like native memory accessed via an API… the global data space is a virtual concept that is really only a collection of local [in-memory] stores.

OK, that’s exactly how Kafka Streams make it appear (as long as you know the name of the topic and have serde for it). You know, map, filter are just local operators. But as soon as a join or per-key aggregation are used, repartitioning of the global data space (for that topic) happens so that the subsequent operators and state stores (RocksDB) operate on keys with strong locality (in a way, keys get pinned to a machine). There’s actually a place for the global data space; It’s stored physically in Kafka.

Scalability

Partitionable has another very important consequence—Scalability. In this day and age of big-data, no machine has enough resources to deal with the global data space in its entirety. So, divide and conquer, baby! Kafka with group-managed consumers…

Scaling an application is nearly trivial as you have to launch just a new instance and the workload gets distributed automatically, dynamically. Aggregation and join operators partition the global key space with mutual exclusion for efficient local key-value lookups. Note, however, any existing locally stored key-value mappings might have to copied to the new instance. So it may not be trivial. 😉

Oh, btw, KStream.through operator will also materialize your Stream, if you feel like so. This operator enables very flexible component boundaries. All operators in a Kafka Stream application need not have the same # of instances (cardinality). For instance, with some simple application-specific configuration and custom code, one could have first half of the flow with N instances and the second half with 2N instances as long as the two components are separated by materialized Kafka topic.

This is exactly the point made in the paper: Reactive Stream Processing for Data-centric Publish/Subscribe

“a stream of intermediate results can either be distributed over a DDS [also Kafka] topic for remote processing or can be used for local processing by chaining stream operators. The details of whether the “downstream” processing happens locally or remotely can be abstracted away using the Dependency Injection pattern. As a consequence, component boundaries become more agile and the decision of data distribution need not be taken at design time but can be deferred until deployment.”

If you are interested in learning why Kafka is a great choice for large-scale data movement for the Industrial IoT, you might be interested in my previous blogpost Kafka in Industrial IoT.

But, I digressed.

Chapter 3

It talks about to, print, foreach operators. These are terminal operators. I.e., you cannot chain operators downstream from these operators. It’s not clear why not. There’s a general-purpose KStream.peek operator that’s not terminal. It looks like one could write a non-terminal print using peek. May be I’m missing something and the book did not help here.

You also realize that the application DAG is “traversed” in depth-first fashion. That’s a strong indication that the internal implementation of KStreams is push-based—later confirmed in chapter 6, the processor API.

Finally, I was left wondering whether it would be possible to process different groups of keys in different threads or partition the keys across multiple threads for parallelization. Sounds like the current recommended way to partition processing of key-space is by launching more consumers (KStreamBuilder.stream). It may be worth to add an operator with custom thread-pool (ExecutorService) in the flow and distribute the local key-space among the threads.

Chapter 4

The book now gets into stateful transformations. For that KStream provides transformValues (instead of mapValues). transformValues accepts a ValueTransformerSupplier,  which is a SAM interface to create valueTransformer objects. However, it’s not clear whether such a factory needs to provide a different instances when called each time. Also, I’m still wondering under what circumstances, it might be called multiple times.

Speaking of State stores, there’s actually a statestore-supplier-factory-creator. You got that right. There’s such a thing. For instance, you can configure StreamBuilder with a state store supplier.

StateStoreSupplier stateStoreSupplier = 
  Stores.create("somename").withStringKeys().withIntegerValues().inMemory().build();
kStreamBuilder.addStateStore(stateStoreSupplier);

So what would you call Stores.create? It’s a statestore-supplier-factory-creator. Told you so!

I noted that branching (KStream.branch) and rekeying (repartitioning) are orthogonal. A non-partitioned stream if branched, stays a non-partitioned stream, similarly, a partitioned stream if branched stays partitioned.

For KStream-KStream joins, the streams should be co-partitioned. I.e., both streams (underlying Kafka topics) must have the same number of partitions and the producers should use the same partitioner. Otherwise, the co-location of keys is lost and joins are going to be incomplete/inaccurate.

Chapter 5

This is the heart of Kafka Streams… the fusion of Streams and Tables… KSTream+KTable. How would you interpret Stream as a table and table as a stream? It’s pretty straightforward. Quoting the book

“if we consider our stream of events as a log, we can consider this stream of updates [implicitly grouped by key] as a change-log… The distinction between the two is in a log you want to read all records, but in change log, you only want the latest record for each key.”

KTable is an abstraction of a change-log stream from a primary-keyed table. KTable emits updates. It’s typically a subset of events in a KStream (if there’re updates to the same key). When a KTable saves the state of the processor (a.k.a. committing), it forces a cache flush  and sends the latest updated, deduplicated records downstream. KTable is filtering with a twist—updates that not the latest are not seen.

Logically,

KTable = KStream + cache + commit interval

Smaller the cache, faster the updates; Shorter the commit time, faster the updates. So much so that, a KTable with zero size cache or zero commit interval is exactly same as KStream itself. The config names are cache.max.bytes.buffering and “commit.interval.ms”.

There are two ways to create a KTable.

  1. Directly from a keyed topic
  2. Apply groupBy + reduce operators to a KStream

One of the best examples of the power of Kafka Stream DSL from Chapter 5 of the book.

KTable<String, ShareVolume> shareVolume = 
  kStreamBuilder.stream(EARLIEST, stringSerde, stockTransactionSerde, STOCK_TOPIC) ①
                .mapValues(st -> ShareVolume.newBuilder(st).build())
                .groupBy((k, v) -> v.getSymbol(), stringSerde, shareVolumeSerde)
                .reduce(ShareVolume::reduce, "stock-transaction-reductions");

A note for those familiar with RxJava. Observable.groupBy in Rx returns a Observable<GroupedObservable<K,V>>—a stream of keyed streams. This nesting often requires flatMap so that the inner GroupedObservables are “subscribed” and activated. That’s different from how KStream does it. In KStream, groupBy returns KGroupedStream<K,V> (a flat structure), which is an intermediate stream only to be converted to a KTable using some sort of per-key  reduction/aggregation.

Windowed Computations

Joins are windowed computations because the streams are basically infinite. Joins are based on coincidence window. If two events have the same key and they coincide, you can join them. You decide what coincidence means by specifying a JoinWindows instance. JoinWindows.of(2000) creates a  window that spans 2 seconds before and 2 seconds after every event in the first stream. There’s JoinWindows.before if you want the window to end at the event and JoinWindows.after if you want the window to begin at the event. When you want to join on three or more KStreams (or KTables), you will need to chain joins and map the records with a new key and repeat the repartitioning process.

Using a window-based operators turn KStream<K,V> and KTable<K,V> to KStream<Windowed,V> and KTable<Windowed,V>. It is straightforward as well as elegant at the same time. Separating key and value types clearly helps. If Windowed stream gets a typed representation then why use just a flag for a to-be-repartitioned stream? It could also be well-typed as in KPartitionedStream<K,V> or something like that. I mean what if KStream.through returns a KPartitionedStream and KPartitionedStream.map returns KStream (non-partitioned), etc. Having a properly partitioned Stream is important for application correctness and types are about enforcing correctness. Just a thought.

There are there types of windows.

  1. Session Window
  2. Tumbling Window
  3. Sliding/Hopping Window

Sliding windows perform a new calculation after waiting for an interval smaller than the duration of the entire window. TimeWindow.advanceBy converts a tumbling window to a sliding window.

GlobalKTable allows non-key joins because all data is available locally. Also the event stream (that’s globally available) need not be partitioned by the key of the lookup data.

Chapter 6

This is my most favorite chapter because it makes me an owner of a successful brewery. Beer and Hops what not to like about that?

This chapter is about the “processor” API that’s lower-level than the Streams DSL described before. IN the KStream DSL, KTables​, have to rely on a combination of committing and cache flushing to forward records. In the lower-level API, there’s no such restriction. Forwarding events can be totally custom, dependent on data, or anything else for that matter. Cool.

Punctuator is a scheduler. That’s just plain english as it turns out. Local stores are RocksDB. It’s not clear why local stores need compaction. RocksDB should hide that complexity anyway right? With the in-memory and LRU based stores, infrequently used keys and values might eventually be removed. So to avoid that use a persistent store.

Is KStream a Monad?

Beloved flatMap gets mentioned in Chapter 6 for the first time and to my utter dismay it’s not about flattening nested KStream. There does not appear to be a way to create a nested KStream. So there’s no question of flattening one. So what is flatMap doing?

So KStream.flatMap is about flattening an Iterable. It accepts a mapper that returns an Iterable<KeyValue<K,V>>. The resulting KStream flattens the Iterable. In that sense KStream API tries to be practical rather than purist. I suppose there’s probably some hocus-pocus way to convert a KStream<K,V> into an Iterable but it does not appear to be built-in. So, no KStream is not truly monadic it call still flatten.

Chapter 7

This chapter talks about monitoring Kafka Stream applications. How to monitor consumer lag, producer and consumer interceptors. For example, consumer has onConsume and onCommit callbacks and producer has onSend and onAcknowledgement callbacks. Kafka Stream applications have StateListener and StateRestoreListenerwith callback such as onRestoreStart, onBatchComplete, onRestoreEnd. They are expected to be stateless.

Monitoring of state restoration from a change-log is built-in KStreams. The next chapter is about testing Kafka Streams applications. Kafka Streams allows testing the entire topology without Kafka running. In just a unit test. That’s cool.

Chapter 9

One of the coolest things in Kafka Streams is that it gives read-only access to the streaming application state store. It’s possible to expose it over REST or any other RPC mechanism of your choice. No RPC mechanism is provided however. It’s a just a plain K-V lookup.

When exposing the internal state store to the world, care should be exercised to allow queries only when the Kafka Streams application is the running state. Any other states such as shutting down, error, etc should disallow access to the local state store. This can be achieved by registering a statelistener.

Alright, this blog post ended up being a whirlwind tour of Kafka Streams and the book Kafka Streams in action.Any inaccuracies are mine. I hope someone out there finds this blogpost useful or at least will inspire someone to explore Kafka Streams.

 

Tuning Truly Global Kafka Pipelines

“Running Kafka pipelines is like flying a jet engine…”—someone commented during the Tuning Kafka Pipelines talk at the Silicon Valley Code Camp, 2017—“… You have to look at the ratios and not rpms like in a car engine….”

I could not agree more. Setting up Kafka is just the beginning. Monitoring and tuning Kafka pipelines for reliability, performance, latency, uptime, maintenance efficiency pose the true challenge in running Kafka in production.

In this talk, I’ve shared my experience and some valuable lessons of tuning the performance of a truly global production Kafka pipeline at Linkedin.

Furthermore, this was one of the best audience I’ve seen at the SVCC in years. There were more than 20 questions from the audience. Very engaged and knowledgeable. Check it out!

1:46 Here’s how a truly global kafka data pipeline looks like

5:30
Q: Is it not efficient to get data from follower replica?
A: Theoretically, it is possible to read data from the follower replica as long as the follower is in sync with the leader. However, Kafka does not support that (just yet).

6:15
Q: So the replication is just ensure resiliency in case broker #2 fails?
A: That’s right. If one of the brokers fails, the consumers can resume consumption from the follower replica.

8:56
Q: Why Kafka pipelines?
A: Kafka provides a very large, reliable, durable buffer between various stages in the pipeline. There are many other reasons too.

11:00
Q: What is the smallest Kafka pipeline?
A: One producer, one Kafka broker, one consumer

12:24 Running Kafka is not a trivial thing. Kafka needs a Zookeeper cluster.

12:38
Q: When is the right time to introduce Kafka?
A: Right now! Kafka should be multi-tenant. Multiple apps should be able to run through the same pipeline.

13:35 In a single process of mirror-maker there are multiple consumers but one producer

14:18 No single point of failure. Kafka achieves that by replicating data. Kafka pipelines achieve that by replicating data over multiple datacenter.

15:13 LinkedIn’s crossbar topology of Kafka pipelines. Active-active replication. Supports traffic shifts.

16:27 Durability interacts with latency and throughput. Highest durability with acks=all

18:11 If you want to increase throughput of the entire Kafka cluster, add more brokers and hosts. If you want to increase throughput of a specific application, increase the # of partitions. Pipeline throughput depends on number of things including colocation. “Remote consume” achieves the highest pipeline throughput.

20:16 Producer performance tuning. Effect of batching, linger.ms, compression type, acks, max in flight requests, etc.

23:30 Consumer performance tuning. More partitions more parallelization, fetch message max bytes.

25:13 Broker performance tuning. Performance depends on degree of replication. number of replication threads, batching. inter-broker replications over ssl has overheads

26:55
Q: Do the number of partitions affect producer performance?
A: I totally screwed this one up. They do. Increasing partitions allow more parallelism and more throughput. Producer can send more batches in flight. What was I thinking?

29:17
Q: How many partitions should be used? How do you determine the # of partitions?
A: There’s a formula. The answer depends on qps (messages per second), average message size, and retention time. These three factors multiplied gives you the total size on the disk of a single replica. However, size of replica partition on a single machine should not exceed over X GB (say 25 GB) because moving partitions (when needed) takes a very long time. Secondly, if a broker crashes and comes back up, it needs to rebuild the index from the stored log. The time for reboot is proportional to the size of partition on disk.

30:00 Kafka pipeline latency is typically a few 100 milliseconds. However, the SLA (for customers) of a pipeline would be much longer (in minutes) because pipeline maintenance may have some downtime when SLA can’t be met.

32:53
Q: Do you talk about Zookeeper tuning?
A: High write throughput is a problem with zookeeper.

33:00 Performance tuning of truly global production Kafka pipelines. Expected global replication SLA was under 30 minutes for 100 GB data (single topic). Pipeline to Asia had really long tail (3 hrs). Kafka Mirror Maker is a CPU-bound process due to decompression.

38:00
Q: What’s happening on the either side of the ramp in the chart?
A: It’s a Hadoop push of 100 GB data in 12 minutes. Hence ramp up and ramp down.

39:00 Production pipeline machine setup: 100 GB data pushed in 12 minutes, 840 mappers, 4 large brokers, broker replication over SSL, ACK=-1, 4 KMM clusters, 10 KMM processes each.

40:15
Q: Are all mirror-makers on the source datacenter or not?
A: Only one pipeline (out of 4) has mirror-makers on the destination datacenter.

41:16 Text book solution for the mirror-maker performance tuning. Remote Consumer-Local Produce. However, that was not practical.

43:09
Q: Why Under-replicated partitions in the source cluster?
A: Too many producers (Hadoop mappers in this case)

45:00 True production fire-fighting. A producer bug prevented us from sending 1 MB size batches. Mirror-makers used to bail out unpredictably due to a compression estimation bug (KAFKA-3995).

46:45 CPU utilization of 4 mirror-makers in a well-behaved global Kafka pipeline. Monitoring 100s of such metrics is the most important prerequisite in running a successful Kafka pipeline.

And here comes the best comment…

49:35 “… running Kafka pipelines is like flying with a Jet engine. Look at the ratios. They don’t maintain rpms like car …”  I agree!

50:45 The end!

This talk would not have been possible without continuous help from Kafka SREs and the Venice team in Linkedin. See Kafka and Venice articles on the Linkedin Engineering Blog.

Java 8 Lambda Pet Peeves

I started using Java professionally when it was already at version 8. I anticipated Java to be lot more fun compared to when I learned it first sometime in early 2000s. With lambdas,  the Streams API, and completable futures Java now has a handful of functional goodies. Coming from C++11 world, I did not want to give up on basic language capabilities such as lambdas. Java 8 seemed like a good point to enter.

Alas, my excitement did not last for too long. Using lambdas in Java is like scratching fingernails on a chalkboard. Here’s why.

Checked exceptions make verbose lambdas

Consider an example of mapping over a list of CompletableFutures. Suppose, you want to double the value enclosed in every future in a list. Here’s what you have to write today.

List<CompletableFuture<Long>> list =
  Arrays.asList(CompletableFuture.completedFuture(10L));
list.stream()
    .map(f -> {
           try {
             return 2*f.get();
           }
           catch(InterruptedException | ExecutionException e) {
             // return -1; // Return -1 for error? Yuck!
             throw new RuntimeException(e); // Sigh!
           }
         });

It’s verbose beyond my wildest imagination. The code should have been either 1) .map(CompletableFuture::get) or 2) .map(f -> f.get()) Neither of them work because the instance method CompletableFuture::get might throw checked InterruptedException and ExecutionException. The functional interface that map relies on is Function. It has an apply method that is not suitable for any lambda or method reference that might throw a checked exception.

People have invented many tricks to work around this problem but they are all crutches and don’t cut down the verbosity to the cleanest possible code.

One of the possible techniques is to create wrappers that catch a generic exception and take generic actions like logging them.

Others have suggested a wrapper to simply convert the checked exception into a RuntimeException and let it escape.

A more fancy technique is to pass exception types as generic type parameters and throw checked exceptions without declaring them. That’s sneaky but also not very useful as it works only for local throw statements.

For example, method throwAsUnchecked below is fine as long as there’s only a local throw statement.

import java.io.IOException;

public class prog {
  Exception e = new IOException(&amp;amp;amp;quot;ignore me&amp;amp;amp;quot;);

  public static void main(String[] args) {
    prog p = new prog();
    p.iShouldDeclareACheckedExceptionButIDont();
  }

  private void iShouldDeclareACheckedExceptionButIDont() {
    throwAsUnchecked();
  }

  @SuppressWarnings (&amp;amp;amp;quot;unchecked&amp;amp;amp;quot;)
  private &amp;amp;amp;lt;E extends Throwable&amp;amp;amp;gt; void throwAsUnchecked() throws E {
     iMayThrowIOException(); // compiler error
     throw (E)e;
  }

  void iMayThrowIOException() throws IOException {}
}

The only reasonable and the right technique is to create checked exception friendly SAM (Single Abstract Method) interfaces that are parallel to the already existing standard SAM interfaces. Of course, it causes an explosion in the number of SAM interfaces, which is already bloated due to primitive specializations.

No Immediately Invoked Function Expressions (IIFE pattern) in Java

Most languages I’ve used in the past, IIFE pattern is quite convenient. It’s famous in Javascript. I’ll take a Lua example, which is nearly identical.

local y = (function(x) return 2*x end)(5)
print (y) -- prints 10

The idea is to create an anonymous function and call it right-away. This pattern comes in handy when there are multiple steps needed to initialize a value. Instead of creating a separate named function and passing (possibly many) arguments to it, IIFE carries you through the day. I like IIFE because it avoids pollution of the global namespace in scripting languages.

It’s not limited to just scripting languages. C++ supports it too; quite elegantly.

int y = [](int x){ return 2*x; }(5);
std::cout << y; // prints 10

Here’s a fun fact.

The smallest IIFE expression in C++ is just []{}()

On the other hand, Java’s equivalent of IIFE makes my cry.

Long y = ((Function<Long,Long>)(x -> 2*x)).apply(5L);

For one, the lambda expression must be casted to the functional interface you care about. You have to spell out all the parameter types. It’s not convenient at all when you have long types. Last but not the least, you have to remember the SAM in the functional interface and use it to invoke the lambda. It’s the last part that really gets me.

I went on and created a list of all the SAM methods in the Java 8 SDK. There are dozens of names, literally. Here’s a small list for some (bad) taste: Function::apply, Supplier::get
Predicate::test, Consumer::accept, BiConsumer::accept, BiFunction::apply, BiPredicate::test, BooleanSupplier::getAsBoolean, ToDoubleFunction::applyAsDouble, Runnable::run, Comparator::compare

To be fair, C# IIFE is not the best of it’s kind anyhow. It definitely feels more function like than that of Java’s.

int v = new Func<int, int>((x) => { return 2*x; })(5);

OK, so there you have it. My pet peeves about Java 8 lambdas.

Kafka in Industrial IoT

Sensors and smart data analysis algorithms are key to any Internet of Things (IoT) system. But they are not everything. The conduit that moves vast corpuses of data from sensors to data analysis systems is equally (if not more) important that makes the whole wheel spin. I.e., data movement is the life-blood of IoT systems.

Industrial IoT

Industrial IoT—the big brother of consumer IoT—is about collecting and analyzing data from machines, buildings, factories, power grids, hospitals and such. Like any other IoT system, IIoT systems need technologies to transport data reliably, securely, efficiently, and scalably (if there is a word like that). There are plethora of technologies out there that claim to be the right choice for the data movement: MQTT, CoAP, OPC, OPC-UA, DDS, Alljoyn, IoTivity to name just a few.

If you step back from this alphabet-soup and try to distill what these technologies are trying to achieve, common architectural patterns begin to emerge. These patterns have been well-documented previously. Specifically, the Industrial Internet Consortium (IIC) has published a reference architecture that describes three architectural patterns.

  • Three-tier architecture pattern
  • Gateway-Mediated Edge Connectivity and Management architecture pattern
  • Layered Databus pattern

I’ll pick the Layered Databus pattern for further discussion here because it’s a more specific instance of the three-tier architecture pattern, I’m more familiar with it, and it strongly resembles the architecture of the applications that use Kafka.

The Layered Databus Pattern

three-layer-databus-arch

What’s a databus anyway? And why does it rhyme so closely with a database?

The reference architecture defines it very generally as

“… a logical connected space that implements a set of common schema and communicates using those set of schema between endpoints.”

Then come the layers. The purpose of the layers is

“[to] federate these systems into a system-of-systems [to] enable complex, internet-scale, potentially-cloud-based, control, monitoring and analytic applications.”

That makes sense. The scope of the “databus”widens as you move up the hierarchy. Each databus layer has it’s own data-model (a fancy word for collection of schemas/types). The raw data is filtered/reduced/transformed/merged at the boundary where the gateways are. The higher layers are probably not piping through all the data generated at the lower layers. Especially in cases such as wind turbines that generate 1 terabyte of data per week per turbine.

OMG Data Distribution Service (DDS) is a standard api to design and implement Industrial IoT systems using this architectural style. It’s a good way to implement a databus. DDS brings in many unique aspects (e.g., declarative QoS model, < 100µs latency, interoperability) into the mix that is way beyond the scope of this article. It’s perhaps sufficient to say that I’ve seen the evolution of the Layered Databus pattern in the IIC reference architecture very closely during my 5+ years tenure at Real-Time Innovations, Inc.

So far so good…

What’s in it for Kafka, anyway?

I submit that one of the best technologies to implement the higher layers of “databus” is Kafka.

Kafkaesque IIoT Architecture

Kafka is, by definition, a “databus”.  Kafka is a scalable, distributed, reliable, highly-available, persistent, broker-based, publish-subscribe data integration platform for connecting disparate systems together. The publishing and consuming systems are decoupled in time (they don’t have to be up at the same time), space (they are located at different places), and consumption rate (consumers poll at their own pace). Kafka is schema-agnostic as it does not care about the data encoder/decoder technology used to transmit data. Having said that, you are much better off adopting a well-supported serde technology such as Avro, Protobuf and manage the collection and evolution of schemas using some sort of schema-registry service.

Data-Centric Kafka: Log-Compaction

According to the IIC reference architecture, a data-centric pub-sub model is “central to the databus”. A data-centric pub-sub model focusses on

  1. Defining unique instances of data sources
  2. Managing the lifecycle of the instances
  3. Propagation of modifications to the instance attributes as instance updates (not just a message).

It’s conceptually similar to a database table where each row has a primary key that determines the “instance”. The main difference is that the column values of rows (independent instances) change frequently and the middleware communicates them as first-class instance updates. The instance lifecycle events (e.g., creation of an instance, deletion of an instance) are first-class and can be observed. Note that data-centric pub-sub in no way requires a database.

Kafka has built-in abstractions for data-centric pub-sub communication model. They are called log-compacted topics. Nominally, a Kafka topic with a finite retention time, key-value pairs are deleted when it’s time. In a log-compacted topic, on the other hand, only a single value survives for a given key. Only the latest value of  given key-value pair survives until the tombstone for the key is written. Nominal retention isn’t applicable anymore. Only a tombstone deletes the key from the partition. The consumers can recognize the creation of a new key and deletion of an existing key via the consumer API. As a result, Kafka is fully equipped for data-centric communication model.

Any consumer that is subscribed to the log-compacted topic from the time topic is created is able to see all updates to the keys produced into the topic. This is because there’s a finite delay before Kafka selects keys for compaction. A caught-up consumer observes all the changes to all the keys.

A consumer that subscribes long after the creation of log-compacted topic only observes the final value of keys that have been compacted. Any such consumer given enough time catches up to the head of the topic and from that point on observes all the updates to the subsequently produced keys.

This feature enables many use-cases.

  • The first one is database replication. Large internet companies like Linkedin have moved away from the native replication technology of say Oracle/MySQL replication and have replaced them with incremental data capture pipelines using Kafka.
  • The second use-case is backing up local key-value stores of stream-processing jobs. The processing state of Samza/Kafka-Streams jobs is periodically check-pointed locally and to Kafka log-compacted topics as a backup.

Data-Centric Kafka All the Way

There are two key aspects that makes Kafka a great choice for the layered databus architectural pattern.

  1. Free (as in Freedom) Keys
  2. KafkaStreams for Dataflow-oriented Application Implementation

Free (as in Freedom) Keys

Kafka’s notion of a record (a.k.a message) is a key-value pair, where key and value are orthogonal to each other. The key, even if it’s just a subset of attributes in the value (e.g., a userId, sensorId, etc.), is separate from the value. The key could contain attributes outside the value altogether. The key structure (type) is in no way statically bound to the value structure (type). The key attributes can grow/shrink independently of value should the need arise.

This flexibility allows application designers to publish a stream of values to different topics each with a potentially different key type. This could also be done in a serial pipeline where an application reads a log-compacted topic with (k1, v1) type and republishes it with (k2, v1) type. I.e., Kafka accommodates logical repartitioning of the dataspace to suite the target application semantics.  (Note: The logical repartitioning is not to be confused with with physical repartitioning by increasing/decreasing the number of partitions. The former is about domain-level semantics, while the later is about managing parallelism and scalability.)

KafkaStreams for Dataflow-oriented Application Implementation

The data-centric principles should not be limited to just the communication substrate. If the consuming application is structured around the notion of logically parallel, stateful transformations of data in a pipeline (i.e., a flow) and the infrastructure handles state management around the core flow abstraction, there is much to gain from such an application design. First, the application can scale naturally with the increasing number of simultaneously active keys (e.g., growing number of sensors). It can also scale easily with the increasing data volume and update velocity (e.g., growing resolution/accuracy of the sensors). Scaling for both dimensions is achieved by adding processing nodes and/or threads because the dataflow design admits concurrency very easily. Managing growing scale requires effective strategies for distributed data partitioning and multi-threading on each processing node.

Not coincidently, dataflow-oriented stream processing libraries have become increasingly popular due to improved level of abstraction, ease of use, expressiveness, and their support for flexible scale-out strategies. Dataflow-oriented design and implementation has been shown effective in easily distributing the workload across multiple processing nodes/cores. There are many options including Apache Beam, Apache SamzaApache Spark, Apache Flink, Apache Nifi, and Kafka Streams for Kafka.

Check out how Kafka Streams relates to Global Data Space.

The most touted middleware for implementing the layered databus pattern, DDS, supports dataflow-oriented thinking at the middleware level but the API makes no attempt to structure the applications around the idea of instance lifecycle. Having said that, Rx4DDS is a research effort that attempts to bridge the gap and enforce data-centric design all the way to the application layer.

Once again, Kafka comes to rescue with Kafka Streams which includes high-level composable abstractions such as KStream, KTable, and GlobalKTable. By composable, I mean it’s an API DSL with fluent interface. KStream is just a record stream. KTable and GlobalKTable are more interesting as they provide a table abstraction where only the last update to a key-value pair is visible. It’s ideal for consuming log-compacted topics. Application-level logic is implemented as transformations (map), sequencing (flatMap), aggregations (reduce), grouping (groupBy), and joins (product of multiple tables). Application state management (snapshots, failure-recovery, at-least once processing) is delegated to the infrastructure.

These capabilities are by no means unique to Kafka Streams. The same ideas surface in number previously mentioned stream processing frameworks. The point, therefore, is that Kafka together with stream processing frameworks provide an excellent alternative to implement the layered databus pattern.

Kafka Pipelines for Federation (aka Layers)

It does not take a rocket scientist to realize that the “Layered Databus” is just a geographically distributed Kafka pipeline (or pipelines). Yeah, that’s everyday business for Kafka.

A common purpose of deploying Kafka pipelines in Internet companies is tracking. User activity events captured at front-end applications and personal devices are hauled to offline systems (e.g., Hadoop) in near realtime. The tracking pipelines commonly involve “aggregate” clusters that collect data from all regional clusters. The regional clusters are deployed world-wide. Kafka Mirror-makers are used to replicate data from one cluster to the another over long latency links. It’s essentially a federation of Kafka clusters for data propagation in primarily one direction. The other direction is pretty similar.

Here’s how the same architecture can be applied to an abstract IIoT scenario.

kafka-pipeline-iiot

This picture shows how Kafka can be used to aggregate data from remote industrial sites.

A gateway publishes data from machines to a local Kafka cluster where it may be retained for a few days (configurable). The mirror-maker cluster in the data-centers securely replicate the source data into the aggregate clusters. Large volumes of data can be replicated efficiently by using built-in compression in Kafka. Enterprise apps consume data from the aggregate clusters.

A pipeline in reverse direction could be used for the control data. The mirror-maker cluster is separate due to direction and different latency requirements. The control pipeline in the replica data-center is passive.

Interestingly, a similar architecture is used by British Gas in production.

Kafka offers a number of advantages in this case.

  • Aggregate clusters support seamless failover in the face of regional unavailability.
  • Kafka and mirror-makers support no-loss data pipelines.
  • There’s no need of a separate services for durability as Kafka itself is a reliable, persistent commit log.
  • With growing scale (more topics, more keys, larger values, faster values, etc), more instances of Kafka brokers and mirror-maker can be easily added to meet the throughput requirements.
  • Identity mirroring maintains the original instance lifecycle across the pipeline. Identity mirroring means that the source and destination clusters have same # of partitions and keys that belong in partition p in the source cluster also belong to partition p in the destination cluster. Order of records is guaranteed.
  • Whitelisting and blacklisting of topics in mirror-maker allows only a subset of data to pass through.
  • Replication is secured using SSL.
  • Kafka is designed for operability and multi-tenancy. Each Kafka cluster can be monitored as Kafka publishes numerous metrics (as JMX Mbeans) for visibility, troubleshooting, and alerting during infrastructure problems.

Summary

Kafka (the infrastructure) provides a powerful way to implement the layered databus architecture (a.k.a system-of-systems) for Industrial IoT. Kafka Streams (the API) provides first-class abstractions for dataflow-oriented stream processing that lends itself well to the notion of data-centric application design. Together, they allow the system architects and other key stake-holders to position their assets (apps and infrastructure) for future growth in unique sensors, data volume/velocity, and analysis dimensions while maintaining a clean dataflow architecture.

Beginning Coditation

This blog is about meditation over code—Coditation. It’s a commentary over interesting code (a functional gem or object-oriented pattern) or a distributed system or a big data technology that’s worth talking about.