Introduction

Event Streaming

the practice of capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications in the form of streams of events

routing the event streams to different destination technologies as needed

ensures a continuous flow and interpretation of data so that the right information is at the right place, at the right time

Kafka’s Event Stream Purpose

  1. To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
  2. To store streams of events durably and reliably for as long as you want.
  3. To process streams of events as they occur (RT) or
    retrospectively.

Server and Client in Kafka

Kafka’s server and client communicate to each other via TCP protocol

Server

  • run as a cluster of one or more servers that can span multiple datacenters or cloud regions
  • Brokers: servers that form the storage layer (Cache Proxy)
  • Other servers run Kafka Connect to continuously import and export data as event streams to integrate Kafka with your existing systems such as relational databases as well as other Kafka clusters
  • Fault-Tolerance: if any of its servers fails, the other servers will take over their work to ensure continuous operations without any data loss.

Client

  • write distributed applications and microservices that read, write, and process streams of events in parallel, at scale, and in a fault-tolerant manner even in the case of network problems or machine failures

定义

  • Event: records the fact that “something happened” in the world or in your
    business. read, write any data from a distributed system is a form of event.
    • example as follow
1
2
3
Event key: "Alice"
Event value: "Made a payment of $200 to Bob"
Event timestamp: "Jun. 25, 2020 at 2:06 p.m."
  • Producer: client, publish events to Kafka
  • Consumer (subscriber): subscribe to (read and process) these events
    • producers and consumers are fully decoupled agnostic of each other.
    • producers never need to wait for consumers
    • Kafka provides guarantees to process events exactly-once (remember
      from eecs482 distributed system class, we have client at-least-once, server
      at-most-once, but adding these 2 together plus some fault tolerance tricks can
      make it “exactly-once”)
  • Topic: event are stored in topics organized and durable.
    • topic is similar to a folder in a filesystem, where files are events
    • events in a topic can be read as often as needed.
      • different from traditional system, where events are deleted after consumption.
      • you should define how long Kafka should retain(保持) your events
        (after which discard old events)
  • Partition: a topic is spread over a number of “buckets” located on different Kafka brokers
    • very important for scalability: it allows client apps to read/write from/to
      many brokers at the same time.
    • when a event is introduced to a topic, it is actually appended to one of the
      topic partitions. Events with the same event key will be written to the same
      partition.
      • Kafka will guarantee that any consumer of a given topic-partition will always read that partition’s events in exactly the same order as they were written.

kafka_concept

  • Fault Tolerance: every topic can be replicated (even across geo-region or
    data centers), s.t. always be multiple brokers have a copy of data (common replication number is 3), and the replication is conducted at the granularity of topic-partition level

应用

Messaging 消息代理

Advantage over traditional Message Brokers:

  • Decouple: producer send msg and no need to concern whom will consume it.
  • Buffering: for internet flow burst, buffer some packs
    • High Throughput
    • Durability and Replayability

Website Activity Tracking

original use of Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds (一个有序, 可追加, 可供多个消费者订阅和消费的事件/数据流)

site activity (page views, searches, or others) is published to central topics with one topic per activity type

Design

Motivation

  • to be able to act as a unified platform for handling all the real-time data feeds a large company might have
    • Requirements for large company
      • high-throughput to support high volume event streams such as real-time log aggregation
      • deal gracefully with large data backlogs to be able to support periodic data loads from offline systems
      • handle low-latency delivery to handle more traditional messaging use-cases
  • support partitioned, distributed, real-time processing of these feeds to create new, derived feeds. This motivated our partitioning and consumer model.
  • guarantee fault-tolerance in the presence of machine failures

Persistence

  • Persistence storage relies on disk, which is stable but slow in performance; How to use disk properly affects the disk performance a lot.

  • Key Factor about Performance: Throughput, Disk Seek Latency, and
    there are divergence (差错) in between.

  • Linear Writes: predictable on all os, thus are optimized to read-ahead
    (recall eecs482 lecture 19 file_system’s last page, for sequentially accessed
    files can be a big win unless file scattered across the disk) and write-behind
    (recall eecs482 project 3, defer-and-avoid-work, copy-on-write strategy)

  • Compensate for performance divergence: modern os became aggressive in use of RAM for disk caching.

    • modern os divert (转移) all free memory to disk caching

PageCache 页缓存 (recall memory is the unified cache for address space)

Since RAM is a kind of file system’s cache, the following paragraphs will refer
pagecache to memory. And similar to the cache, all read/write to filesystems
will go through pagecache first. Namely the pagecache is a partition of the
memory where the kernel reserves for all disk IO, while the in-memory cache is
the process memory
. The pagecache is maintained by the OS (recall eecs482
project 3 file-backed page), and the in-memory cache is maintained by the app (process) itself (swap-backed page) -> this leads to the problem of cold-start, where on start the available pages will be aggresively used to load files for pagecache, but not for the in-memory cache

The JVM is known for its poor memory usage on object (class) mem padding, also, it’s
pretty bad in garbage collection (GC), especially for large programs. We
aim to solve these 2 problems with Kafka.

Note: JVM’s class object will store the memory in heap of its process, which
will lead to GC

Terminology:

  • cold: load from nothing
  • warm: start with something in cache/memory

PageCache-Centric

Use pagecache instead of in-memory page, namely everytime on write, we write log
to the memory’s log (in the pagecache part), while the traditional methods tends
to write to the disks when the RAM is full, which could become very slow.

Constant Time Suffies

For msg system’s persistency, they use per-consumer queue with associated
BTree
or other generall purpose random access structure to maintain metadata about messages.
The time complexity of this structure is O(logN)O(\log N), which could be quite slow
for disk operations, since for tree seek, if there are many memory-miss seeks,
then the process will becom quite slow.

Log Structure (Append-Only-Commit-Log)

Get idea from the log append and sequetial read operation.

  • append for enqueue: only add to the last page, which only takes O(1)O(1) time
  • sequential read for dequeue: only read the next page, which only takes O(1)O(1) time.
    With the above 2 designs, we could let read/write independent of each other. And
    the above design features sequential rw to disk.

Comparison with the traditional msg system: if we use filesystem’s BTree
structure, then the write to the last node might influence the previous nodes
(recall eecs482 p4 writing to file datablock will influence the file inode). The
Append-Only-Commit-Log ensures that we only write to single blocks at last (no
need to do memory-trace for the destination).

At the same time the performance is completely decoupled from the data size, the
server could use many cheap and slow disks to store.

Having access to virtually unlimited disk space without any performance penalty means that we can provide some features not usually found in a messaging system.
For example, we don’t need to delete the data after it is consumed immediately.

Efficiency

After talking about problems in the disk efficiency, there are 2 remained problems:

  • too many small I/O operations
  • excessive byte copying

Address the “Too many small IO operations”: Batching

Build a Message Set abstraction that naturally groups messages together.

Network requests group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time

Server appends chunks of messages to its log in one go

Consumer fetches large linear chunks at a time

RESULT: turn a bursty stream of random message writes into linear writes that flow to the consumers

Address the “Excessive byte copying”

What is Byte Copy? it’s just like that you send some data over the network,
where the data from local byte to the network binary bytes, and upon receiving,
it will also turn into the end host’s system order.

Solution: share a unified data structure among Producer, Consumer, Broker.

Traditional process of data sending:

  1. The operating system reads data from the disk into pagecache in kernel space
  2. The application reads the data from kernel space into a user-space buffer
  3. The application writes the data back into kernel space into a socket buffer
  4. The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network

Unix sendfile optimization (zero-copy optimization): disk -> pagecache -> socket

This could be reused for many consumers without copying many times

End-to-End Batch Compression

Efficient compression requires compressing multiple messages together rather than compressing each message individually.
Kafka supports this with an efficient batching format. A batch of messages can be grouped together, compressed, and sent to the server in this form. The broker decompresses the batch in order to validate it. For example, it validates that the number of records in the batch is same as what batch header states. This batch of messages is then written to disk in compressed form. The batch will remain compressed in the log and it will also be transmitted to the consumer in compressed form. The consumer decompresses any compressed data that it receives

The Producer design

producer send directly to the broker w/o intervening routing tier (中间路由层)
-> thus all Kafka nodes should have the ability to answer request of which
servers are alive

Client (producer) control which partition to publish, it can be random (load
balancing) or semantic (allow user to specify a key for partition operation,
which is usually a hash func).

This will allow user to locally assume their consumptions (controllable and
predictable)

Asychronous Send: with batching, the sender needs to buffer send data to
certain size before eventually sending them out, which requires asynchronous
send.

The Consumer design

works by issuing “fetch” requests to the brokers leading the partitions it wants to consume

fetch operation includes an offset of the log, and will receive chunk of log
after that offset.

(Broker) Push vs. (Consumer) Pull

Pros:

  1. Controllable Rate: The goal (for consumer pull method) is generally for the consumer to be able to consume at the maximum possible rate, namely for broker push method the centric broker will send downstream data at its rate but not considering real consumer rate.
  2. Batching Supporting: For broker push, if immediately it will cause poor
    batching (waste bandwidth), if buffer for sometime, broker itself does not
    know when this data will be in fact consumed by the consumer (might buffer up a
    lot of unconsumed data at consumer side); For consumer pull, it will always pull
    all the buffered (batch) data at broker side.

Cons:

  1. if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive.
    Solution: allow the consumer request to block in a “long poll” waiting until data arrives

Consumer Position

Keeping track of what has been consumed is one of the key performance points of a messaging system.

For traditional brokers, if certain data is fetched by one consumer, broker will
either records that fact locally immediately or wait for acknowledgement
from the consumer
. But this could have consistency problems.

1
2
Send -> Network -> Received

In these 3 steps all might faile, and leads to the problems discussed in eecs482
(at-least-once, at-most-once, exactly-once).

Our topic is divided into a set of totally ordered partitions, each of which is consumed by exactly one consumer within each subscribing consumer group at any given time -> the position of a consumer in each partition is just a single integer, the offset of the next message to consume.

Pros:

  • This makes the state about what has been consumed very small, just one number for each partition.
  • This state can be periodically checkpointed.
  • This makes the equivalent of message acknowledgements very cheap.
  • A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers

Offline Data Load (Re-check this after reading the Hadoop)

Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data warehouse.

Static Membership

Aim: improve the availability of stream applications, consumer groups and other applications built on top of the group rebalance protocol

For dynamic members, when it restart, it will be assigned an entity id, but for
a static member, its id will be kept

Message Delivery Semantics

As discussed many times above, semantics are following 3:

  • At-least-once
    • the problem is that the consumer may receive the same message more than
      once, and the solution is to assign each message a unique id and the consumer
    • another solution is to use transaction log
  • At-most-once
    • consumer when reading msgs, it first write to the log and then process to
      the output
    • if crash happens btw log finished and the processing, then the msg will be considered finished but in fact not.
  • Exactly-once

producer could specify the level of durability, and that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not necessarily the followers) have the message.

consumer:

  1. save-pos-then-process: described above in the at-most-once part
  2. process-then-save-pos: the consumer will first process the message and then save the position of the last message it has processed, result is similar to the at-least-once part above.

Kafka Stream

Kafka Streams ensure that all of the messages are processed exactly-once

namely we send the output topic with the same key as the input topic, this
is called Two-Phase Commit.

  • commit successful: process output is visible, next time poll next unprocessed
    node.
  • abort: process output is not visible, next time poll the same node, since the
    transaction is not updated as well.

The remaining problem is that in external systems (not follow Kafka protocol), the position and the process output may be stored in different places. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output.

Kafka Transaction

In Kafka, only producers are transactional, and they are able to make transactional updates to the consumer’s position (confusingly called the “committed offset”), and it is this which gives the overall exactly-once behavior.

There are three key aspects to exactly-once processing using the producer and consumer, which match how Kafka Streams works.

  1. The consumer uses partition assignment to ensure that it is the only consumer in the consumer group currently processing each partition.
  2. The producer uses transactions so that all the records it produces, and any offsets it updates on behalf of the consumer (the brokers will record the consumer’s offset in it), are performed atomically
  3. In order to handle transactions properly in combination with rebalancing, it is advisable to use one producer instance for each consumer instance. More complicated and efficient schemes are possible, but at the cost of greater complexity.

Kafka Replication

Kafka allows topic partion to have multiple replicas, which can be configured at
server

If some nodes in the cluster fails, the structure could move the data
automatically, which esure the reliablity of the system.

Leader and Follower

The unit for replica is topic’s partition, and each partition has one leader and multiple follower replicas.

total number of replicas = 1 leader + N followers

Usually, # of partion >> # of brokers, each partion’s leader will be distributed
evenly among brokers

All follower nodes will sync leader node’s logs, where the notes and offsets
coinsides with the leader node’s logs.

Followers consume messages from the leader just as a normal Kafka consumer
would and apply them to their own log. Follower nodes could pull batch logs from
their leader.

How to define that a node is alive? (the precise definition of alive is
required by automatically handling failures process)

  • brokers must maintain an active session with the controller in order to
    receive regular metadata updates.
  • brokers acting as followers must replicate the writes from the leader and not
    fall “too far” behind. (Close Following)

we refer to nodes satisfying these two conditions as being “in sync” (not
alive or failed). Ther leader keeps track of the set of “in sync” replicas (set of In Sync Replica = ISR)
If any replicas failed to meet either condition, leader will remove them from
ISR

Failure Types in Distributed System: the failures we will try to solve in
distributed system is kind of “fail/recover” style, where any node stop
workingsuddenly and then recover (not failing forever). We do not aim to solve
Byzantine” style failures, namely when one node encounters random(malformed) responses

Presice Definition of Commit: only after all messages’ replicas are added to
the log can be assumed committed. And only Commited messages are gonna being consumed. With this restriction, we don’t need to worry about if the leader failed.

Replication logs: Quorums, ISRs, and State Machines

replicated log is the heart of Kafka.

A replicated log can be used by other systems as a primitive for implementing
other distributed system in the state-machine style.

Relicated log models with a series of ordered value. The simplest method is by
leader node provide to choose the required value, as long as the leader node is
alive. All follower only need to copy the replicas data and orders as the leader

If leader crashes, we need to pick one leadr from the follower, but the problem
is that followers might fall behind or crash itself. So we need to make sure
that our leader “candidates” are in sync follower with latest data.

Quorum: when writing data we need to make sure that a number of replicas are
writen successfully, while when reading data we need to make sure that a number
of replicas are read successfully. If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
To be short, quorum-based strategy means that there must be at least k
replicas to be considered successful.

Majority Vote: If there are 2f+1 replicas and at least f+1 replicas received
the latest msg, then if failure of leader happens, we can have f+1 machine, at
least one replica in them are in sync, and thus we could find a good one to
become the next leader.
Advantage of MV: latency depends on the fastest server, if the replica index
= 3, then f+1 = 2, namely we need 2 fastest replicas to be in sync (1 faster
follower)
Disadvantage of MV: if many nodes fail, then it could be difficult to find
an electable leader. To tolerate 1 failure, we need 2f+1 = 3 replicas, but if 2
failures, then we need 5 replicas, and so on. This is not a good solution for
high bandwidth and disk space.

Kafka’s Design on Replica: Kafka dynamically maintains a ISR, where nodes in
the ISR are all highly synced with the leader. Only ISR members are qualified
for the leader, and any mst should be appded to the logs of any nodes in ISR,
and then the message could be commited. (ISR not include all the replicas, but
must include a user-specified number of replicas, and a replica following too
slow will be temporarily removed from the ISR and will be added to later if its
catches up with the leader). The ISR model will tolerate up to f failures when there
are f+1 replicas, thus this is called f+1 model.

This can be considered as the tradeoff between the efficiency and the
consistency.

Another important design difference is that, Kafka will not require the failure
node to recover all its data (with fsync), but if a failed-and-recovered node
wants to be back to ISR, it needs to replay the logs (by pulling logs from the
leader)

Unclean Leader Election: what if all nodes are down?

2 ways to solve if all replicas are failed:

  1. wait for one ISR replica to recover to working state, and choose ths replica
    to be the leader.
  2. Choose any first node that recovers (not necessarily in the ISR) as the
    leader.

This is a tradeoff between the availability and the consistency.
If we choose the first method, then there will be long downtime for the first
recovered ISR node;
If we choose the second method, it might be unconsistent.
Kafka chooses the second method by default.

Availablity vs. Consistency

when writing to Kafka, producer set ack to check the commit state: ack = 0
means that is does not wait for broker’s acknowledgement; ack = 1 meas that it
waits for leader to successfully commited; ack = -1 (all) means that it waits for all replicas in the ISR to be commited.

We could set the consistency level by following methods:

  1. disable unclean leader election: only wait for the latest leader to recover
  2. set the minimum size of hte ISR

Replica Management

Since the above replication strategy is within range of one topic’s one partion,
and a kafka cluster has millions of such partitions, and the failure happens on
the granularity of a broker (set of partitions), thus the kafka manager should
do such as load-balancing to balance the partition load within the cluster,
to avoid that most partitions focuses on few nodes. Also the leadership should
be balanced to make every node be some partition’s leader node.

Log Compaction (日志压缩)

Logs in the system can be grown to very large even if the data set itself is
small. Since the tiem is inifinite, small data set’s logs could grow gradually
and finally become a very large one. This is the motivation of log compaction:
we need to abandon some “useless” logs to save space.

What is useless logs? Logs are used for failure recovery where user might try to
replay the logged operations and rebuild the state of the system. Traditional method is to abandon logs after several time threshold (time-scaled), yet this could be slow on restart, since a single value may be modified many times, but most of the time we need the final state of certain variable.

Here comes how we do the log compaction: According to the key (it’s like the
database system where all data entries are key-value pairs), we will update the
key in the log (overwrite). And the result is that we only maintain a set of
active keys in the log with their final states, and at the same time the total
disk usage is reduced.

How to Achieve Log Compaction

The following is a high level log-logic figure, which shows the logic structure
of each message’s offset structure in Kafka log.

log_cleaner_anatomy

Log Head: includes the traditional Kafka logs, which includes the continuous
offset and all messages. Log compaction add options to handle tail logs.