Home PC Games Linux Windows Database Network Programming Server Mobile  
  Home \ Server \ Apache Kafka: the next generation of distributed messaging system     - MySQL dual master configuration (Database)

- LVM management reduces swap partition space to the root partition (Linux)

- MySQL query performance comparison of a single truth (Database)

- Install mono offline on CentOS (Server)

- C ++ pointer of the (error-prone model) (Programming)

- Android Fragment really fully resolve (Programming)

- Installation and Configuration Munin monitoring server on Linux (Server)

- How to test your MongoDB application upgrade? (Database)

- How to run in support UDp Docker container JBoss data virtualization GA (Server)

- Hibernate Performance Optimization of EHCache cache (Programming)

- LogStash log analysis display system (Linux)

- ogg Oracle to SQL Server 2005 to achieve synchronization (Database)

- How to use the Docker Machine cluster deployment Swarm (Server)

- Security: Unix operating system intrusion tracking Strikes Back (Linux)

- Ubuntu 14.04 LTS compiler installation R Source Code (Linux)

- Use Ambari rapid deployment Hadoop big data environment (Server)

- To obtain installation package (RPM) under RHEL6 (Linux)

- Java thread lifecycle (Programming)

- Flow control message transmission between RAC (Database)

- Oracle table Access Control (Database)

  Apache Kafka: the next generation of distributed messaging system
  Add Date : 2018-11-21      
  Brief introduction

Apache Kafka is distributed publish - subscribe messaging system. It was originally developed by LinkedIn company later became part of the Apache project. Kafka is a fast, scalable, distributed design is inherent, zoning and submit log service can be replicated.

Apache Kafka compared with traditional messaging systems, with the following exceptions:

It was designed as a distributed system, easy to expand outwards; It also provides high throughput for the publish and subscribe; It supports multiple subscribers, while failure can automatically balance the consumer; It persistent messages to disk, it can be used for bulk consumption, such as ETL, and real-time applications.

In this paper, I will focus on Apache Kafka architecture, features and characteristics that help us understand why Kafka better than traditional messaging services.

I'll compare Kafak and traditional messaging services RabbitMQ, Apache ActiveMQ features, discuss some of Kafka than the traditional message service scenarios. In the final section, we will discuss an ongoing sample applications demonstrate Kafka use as a message server. The complete source code for the sample application at GitHub. A detailed discussion about it in the last section of this article.


First, let me introduce the basic concepts of Kafka. Its architecture includes the following components:

Topic (Topic) is a specific type of message flow. Message byte payload (Payload), the topic is the message category name or seeds (Feed) name. Producer (Producer) is any object can publish a message to the topic. Published messages are stored in a group of servers, they are called Agent (Broker) or Kafka clusters. Consumers can subscribe to one or more topics, and pull data from the Broker, so consumption of these messages issued.


Figure 1: Kafka producers, consumers and the environment proxy

Producers can choose their favorite serialization method for encoding the message content. To improve efficiency, producers can send a message in a release request. The following code demonstrates how to create and send a message producer.

Manufacturer sample code:

producer = new Producer (...);
message = new Message ("test message str".getBytes());
set = new MessageSet (message);
producer.send ("topic1", set);

To subscribe to topic, consumers must first create one or more message flow as the topic. Messages posted to this topic will be evenly distributed to these streams. Each message stream interface provides iterative message constantly generated. The consumer then iterations of each message stream, processing the message payload. Different from traditional iterator, iterator message flow never stops. If there is no message, the iterator will block until a new message is posted to this topic. Kafka while supporting point distribution model (Point-to-point delivery model), namely consumers to consume more than a single copy of a message queue and publish - subscribe model (Publish-subscribe model), that is more than consumption receives its own copy of the message. The following code demonstrates how consumers use information.

Consumer sample code:

streams [] = Consumer.createMessageStreams ("topic1", 1)
for (message: streams [0]) {
bytes = message.payload ();
// Do something with the bytes
Overall architecture

Kafka shown in Figure 2. Because Kafka is inherent distributed, a Kafka cluster typically includes a plurality of agents. For load balancing, the topic will be divided into multiple partitions, each agent stores one or more partitions. Multiple producers and consumers at the same time production and acquisition news.

Figure 2: Kafka architecture

Kafka storage

Kafka storage layout is very simple. Each topic corresponds to a logical partition log. Physically, a log for the same size of a set of segmented files. Every time the producers publish messages to a partition, the agent will be appended to the message the last segment file. When the number reaches the set value of the news release or after a certain period of time, the real segment file written to disk. After the writing is completed, the message to the consumer public.

Different from traditional messaging systems, Kafka messages stored in the system there is no clear message Id.

log messages to a logical offset public. This avoids the overhead of random access index structure supporting the actual message address maintenance intensive addressing is used to map the message ID to. The message ID is incremental, but not continuous. To calculate the next message ID, you can add length in its logical offset on the basis of the current message.

Consumers always get messages from a specific order of the partitions, if consumers know the offset of a particular message, it means that consumers have consumed all messages before. Consumers pull an asynchronous request, ready for consumption byte buffer to the proxy. Pull each asynchronous request message containing the offset to be consumed. Kafka use sendfile API efficiently distribute byte from the agent to the consumer segment of the log file.

Figure 3: Kafka storage architecture

Kafka Agent

Different from other messaging systems, Kafka agent is stateless. This means that consumers must maintain state information for consumption. The information from the consumer to maintain their own, regardless of the agent completely. This design is very subtle, which itself contains innovation.

Delete a message from the agent becomes very difficult, because the agent does not know whether the consumer has used the news. Kafka innovative solution to this problem, it is a simple time-based SLA applies retention policies. When a message in the agent over a certain time, it will be automatically deleted. This innovative design has a great advantage, consumers can deliberately fall back to the old offset consumption data again. This violates the common agreement of the queue, but it proved to be a basic feature of many consumers. ZooKeeper and Kafka

Consider multiple servers distributed system, each server is responsible for saving data, perform operations on the data. Such potential examples include distributed search engine, distributed build system or systems known as Apache Hadoop. A common problem with all of these distributed systems is how to you at any point in time to determine which server is alive and at work. The most important thing is, when faced with these problems of distributed computing, such as network failure, bandwidth limitations, the variable delay connection, security issues and any network environment, errors may occur even across multiple data centers when, how reliable you do these things. These are precisely the Apache ZooKeeper issues of concern, it is a fast, highly available, fault-tolerant, distributed coordination services. You can use ZooKeeper to build reliable, distributed data structure for members of the group, leader election, collaborative workflow and configuration services as well as generalized distributed data structures such as locks, queue barrier (Barrier) and latch device (Latch). Many well-known and successful project depends on ZooKeeper, including HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (Incubating) and Accumulo.

ZooKeeper is a distributed, hierarchical file system, promote loose coupling between the client and to provide the final agreement, similar Znode view of a conventional file system files and directories. It provides basic operations such as create, delete, and check Znode exists. It provides event-driven model, the client can observe the changes in specific Znode, such as existing Znode adds a new child node. ZooKeeper ZooKeeper to run multiple servers, called Ensemble, for high availability. Each server memory to hold a copy of the distributed file system for read requests from the client to provide services.

Figure 4: ZooKeeper Ensemble architecture

Figure 4 shows a typical ZooKeeper ensemble, a server as a Leader, the other as a Follower. When Ensemble started, first elected Leader, then copy all Follower Leader status. All write requests are routed through the Leader, the change will be broadcast to all Follower. Change broadcast is called atomic broadcast.

Kafka in ZooKeeper uses: As ZooKeeper for coordinating and facilitating distributed systems, Kafka use ZooKeeper is based on the same reasons. ZooKeeper is used to manage and coordinate Kafka agent. Each Kafka Kafka agent other agents have coordinated through ZooKeeper. When Kafka system added a proxy or a proxy failure fails, ZooKeeper service will notify producers and consumers. According to producers and consumers to work with other agents to coordinate. Kafka overall system architecture shown in Figure 5.

Figure 5: Overall architecture Kafka distributed systems

Apache Kafka compared to other messaging services

Let's look at two projects using Apache Kafka in contrast to other messaging services. These two projects are LinkedIn and my project:

Studies on LinkedIn

LinkedIn team did an experiment study comparing the performance of Kafka and Apache ActiveMQ V5.4 and the RabbitMQ V2.4. They use the default ActiveMQ message persistence library Kahadb. LinkedIn two Linux machines running on their experiments, each machine is configured for 8-core 2GHz, 16GB of memory, six disk use RAID10. Two machines connected via a 1GB network. A machine as a proxy, and the other as producers or consumers.

Manufacturer test

LinkedIn team to configure the proxy in all systems, asynchronous messages brush into its persistence library. For each system, a run producer, posted a total of 10 million messages, each 200 bytes. Kafka producers and 50 batches of 1 to send a message. ActiveMQ and RabbitMQ seems to be no simple way to send bulk messages, LinkedIn assumes its bulk value of 1. The results are shown in Figure 6, below:

Figure 6: Manufacturer's performance results LinkedIn

The main reason Kafka much better performance include:

Kafka does not wait for confirmation proxy to proxy the fastest processing speed can send a message. Kafka more efficient storage format. On average, Kafka has 9 bytes per message overhead, and ActiveMQ 144 bytes. The reason is that the required heavy JMS message headers, as well as a variety of index maintenance overhead structure. LinkedIn busiest noted ActiveMQ a thread most of the time in order to maintain access B-Tree metadata and status messages. Consumer Testing

In order to do consumer testing, LinkedIn uses a total of 10 million consumers to obtain information. LinkedIn so that all systems every pull request to prefetch about the same amount of data, up to 1000 messages or 200KB. For ActiveMQ and RabbitMQ, LinkedIn Consumers are set to automatic recognition model. The results shown in Figure 7.

Figure 7: Consumer LinkedIn's performance results

The main reason Kafka much better performance include:

Kafka has a more efficient storage format; in Kafka, the transmission from the proxy to the consumer byte less. ActiveMQ and RabbitMQ two container transport agents must maintain the status of each message. LinkedIn ActiveMQ team noted that one of the threads in the testing process, the KahaDB page has been written to disk. In contrast, Kafka agent no disk write operation. Finally, Kafka by using sendfile API reduces the transmission overhead.

Currently, a project I was working on providing real-time services, quickly and accurately extract the OTC market (OTC) pricing content from the message. This is a very important project, handling nearly 25 kinds of financial information asset classes, including bonds, loans and ABS (asset-backed securities). The original source of the information project covers the major financial markets in Europe, North America, Canada and Latin America. Here are some statistics of the project, including a description of efficient distributed messaging solutions how important it is:

Message handling daily quantity exceeds 1,300,000; Analytical daily OTC price quantity exceeds 12,000,000; It supports more than 25 kinds of asset classes; Independence day, more than 70,000 analytical instruments.

The message contains the PDF, Word documents, Excel and other formats. OTC pricing may also be extracted from the annex.

Due to performance limitations of traditional news server, when dealing with large attachments, the message queue becomes very large, our project is facing serious problems, JMSqueue one day need to start 2-3 times. Restart JMS queue may lose all the messages in the queue. Projects require a framework, regardless of the parser (consumer) behavior of how, are able to keep the message. Kafka characteristics are well suited to the needs of our project.

The current projects have the characteristics:

    Get use Fetchmail Remote Mail message, and then filtered by Procmail and processes, such as a separate annex distributed based messaging. Each message is obtained from a separate file, the file is processed (read and delete) as a message into the message server. Get news content from the message queue service for parsing and extracting information. Sample Application

    This example is based on the original application to modify the application I use in the project version. I have deleted the use of multi-threading features and logs the workpiece sample application as simple as possible. The purpose of the sample application is to show how Kafka producers and consumers of the API. Applications include a sample of producers (producers of simple code that demonstrates Kafka producers API usage and publish specific topic messages), consumer example (simple consumer code that demonstrates the API usage for consumers Kafka) and message content generation API (generates API message contents to a file in a specific path). The following figure shows the various components and their relationship with other components of the system.

    Figure 8: Sample Application component architecture

    The structure of Kafka sample application source code examples of procedures similar. Application source code contains the Java source folders 'src' and 'config' folder, which includes several configuration files, and some Shell scripts for executing sample application. To run the sample application, refer to the description ReadMe.md file or GitHub site Wiki page.

    You can use the program to build Apache Maven, it is also very easy to customize. If someone wants to modify or customize the code sample application, there are several Kafka build script has been modified, can be used to rebuild the sample application code. A detailed description of how to customize the sample application has been placed on the project's GitHub Wiki page.

    Now, let's look at an example of the application of core workpiece.

    Kafka producer code sample

    / **
    * Instantiates a new Kafka producer.
    * @param Topic the topic
    * @param DirectoryPath the directory path
    * /
    public KafkaMailProducer(String topic, String directoryPath) {
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "localhost:9092");
    producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props));
    this.topic = topic;
    this.directoryPath = directoryPath;

    public void run() {
    Path dir = Paths.get(directoryPath);
    try {
    new WatchDir(dir).start();
    new ReadDir(dir).start();
    } catch (IOException e) {

    The above shows the basic usage of Kafka producers API, for example, set the attributes of producers, including the release of which topic messages which serialization class and proxy information can be used. The basic function of this class is to read mail message from the mail file directory, and then publish the proxy to Kafka as a message. Directory by java.nio.WatchService class monitors, once the new e-mail message to the Dump directory, it will be read as a message immediately and to publish Kafka agent.

    Kafka consumer code sample

    public KafkaMailConsumer (String topic) {
    consumer =
    Kafka.consumer.Consumer.createJavaConsumerConnector (createConsumerConfig ());
    this.topic = topic;

    / **
    * Creates the consumer config.
    * @return The consumer config
    * /
    private static ConsumerConfig createConsumerConfig () {
    Properties props = new Properties ();
    props.put ( "zookeeper.connect", KafkaMailProperties.zkConnect);
    props.put ( "group.id", KafkaMailProperties.groupId);
    props.put ( "zookeeper.session.timeout.ms", "400");
    props.put ( "zookeeper.sync.time.ms", "200");
    props.put ( "auto.commit.interval.ms", "1000");
    return new ConsumerConfig (props);

    public void run () {
    Map < String, Integer > topicCountMap = new HashMap < String, Integer > ();
    topicCountMap.put (topic, new Integer (1));
    Map < String, List < KafkaStream < byte [], byte [] > > > consumerMap = consumer.createMessageStreams (topicCountMap);
    KafkaStream < byte [], byte [] > stream = consumerMap.get (topic) .get (0);
    ConsumerIterator < byte [], byte [] > it = stream.iterator ();
    while(it.hasNext ())
    System.out.println (new String (it.next () message ()).);

    The above code demonstrates the basic consumer API. As we mentioned earlier, consumers need to set the consumer message flow. In the Run method, we were set up, and print the received message in the console. In my project, we will enter it into the system to extract the analytic OTC pricing.

    In the current quality assurance system, we use Kafka as a message server for proof of concept (Proof of Concept, POC) project, its overall performance is better than a JMS message service. One of the features we are very excited, then consume messages (re-consumption), which makes our analysis system can be re-resolve some of the messages according to business needs. Kafka based on these good results, we are planning to use it, rather than Nagios system, do log aggregation and analysis.

    to sum up

    Kafka is a new system of processing large amounts of data. Kafka pull consumer model allows consumers to own speed processing messages. Exception occurred while processing the message, the consumer can always choose again consume the message.

    About the Author

    Abhishek Sharma financial products in the field of natural language processing (NLP), machine learning and analytical programmers. He provides analytical algorithm design and development for multiple companies. Abhishek interests include distributed systems, natural language processing and machine use big data analysis algorithms.

- To configure Samba to share files with Windows under CentOS (Linux)
- To install and configure the Jetty server and JDK under Ubuntu 14.04.2 (Server)
- Four Methods of Self - Learning Linux (Linux)
- Initialization and starting process of Linux (Linux)
- The Sublime Text 3 configuration file (Linux)
- Terminal multiplexing tool tmux use (Linux)
- Linux operating system boot manager -GRUB (Linux)
- To generate a certificate using OpenSSL under Linux (Server)
- About redis in Replication (Database)
- Usage Linux device tree (Programming)
- Android Qemu GPS module (Programming)
- Introduces Linux kernel compilation system and compiler installation (Linux)
- Java study notes: String (Programming)
- Linux server is how to do after the invasion (Linux)
- MultiWriter: while the ISO image concurrent writes 20 USB Startup Disk (Linux)
- Linux 6 install Oracle 11g (64bit) (Database)
- Hadoop safe hftp (Server)
- To install Jetty server configuration in detail (Server)
- Several SQL Server data migration / Export Import Practice (Database)
- Ubuntu install image browser and manager Phototonic 1.6.17 (Linux)
  CopyRight 2002-2020 newfreesoft.com, All Rights Reserved.