|
Brief introduction
Apache Kafka is distributed publish - subscribe messaging system. It was originally developed by LinkedIn company later became part of the Apache project. Kafka is a fast, scalable, distributed design is inherent, zoning and submit log service can be replicated.
Apache Kafka compared with traditional messaging systems, with the following exceptions:
It was designed as a distributed system, easy to expand outwards;
It also provides high throughput for the publish and subscribe;
It supports multiple subscribers, while failure can automatically balance the consumer;
It persistent messages to disk, it can be used for bulk consumption, such as ETL, and real-time applications.
In this paper, I will focus on Apache Kafka architecture, features and characteristics that help us understand why Kafka better than traditional messaging services.
I'll compare Kafak and traditional messaging services RabbitMQ, Apache ActiveMQ features, discuss some of Kafka than the traditional message service scenarios. In the final section, we will discuss an ongoing sample applications demonstrate Kafka use as a message server. The complete source code for the sample application at GitHub. A detailed discussion about it in the last section of this article.
Architecture
First, let me introduce the basic concepts of Kafka. Its architecture includes the following components:
Topic (Topic) is a specific type of message flow. Message byte payload (Payload), the topic is the message category name or seeds (Feed) name.
Producer (Producer) is any object can publish a message to the topic.
Published messages are stored in a group of servers, they are called Agent (Broker) or Kafka clusters.
Consumers can subscribe to one or more topics, and pull data from the Broker, so consumption of these messages issued.
Figure 1: Kafka producers, consumers and the environment proxy
Producers can choose their favorite serialization method for encoding the message content. To improve efficiency, producers can send a message in a release request. The following code demonstrates how to create and send a message producer.
Manufacturer sample code:
producer = new Producer (...);
message = new Message ("test message str".getBytes());
set = new MessageSet (message);
producer.send ("topic1", set);
Pre>
To subscribe to topic, consumers must first create one or more message flow as the topic. Messages posted to this topic will be evenly distributed to these streams. Each message stream interface provides iterative message constantly generated. The consumer then iterations of each message stream, processing the message payload. Different from traditional iterator, iterator message flow never stops. If there is no message, the iterator will block until a new message is posted to this topic. Kafka while supporting point distribution model (Point-to-point delivery model), namely consumers to consume more than a single copy of a message queue and publish - subscribe model (Publish-subscribe model), that is more than consumption receives its own copy of the message. The following code demonstrates how consumers use information.
Consumer sample code:
streams [] = Consumer.createMessageStreams ("topic1", 1)
for (message: streams [0]) {
bytes = message.payload ();
// Do something with the bytes
}
Pre>
Overall architecture Kafka shown in Figure 2. Because Kafka is inherent distributed, a Kafka cluster typically includes a plurality of agents. For load balancing, the topic will be divided into multiple partitions, each agent stores one or more partitions. Multiple producers and consumers at the same time production and acquisition news.
Figure 2: Kafka architecture
Kafka storage
Kafka storage layout is very simple. Each topic corresponds to a logical partition log. Physically, a log for the same size of a set of segmented files. Every time the producers publish messages to a partition, the agent will be appended to the message the last segment file. When the number reaches the set value of the news release or after a certain period of time, the real segment file written to disk. After the writing is completed, the message to the consumer public.
Different from traditional messaging systems, Kafka messages stored in the system there is no clear message Id.
log messages to a logical offset public. This avoids the overhead of random access index structure supporting the actual message address maintenance intensive addressing is used to map the message ID to. The message ID is incremental, but not continuous. To calculate the next message ID, you can add length in its logical offset on the basis of the current message.
Consumers always get messages from a specific order of the partitions, if consumers know the offset of a particular message, it means that consumers have consumed all messages before. Consumers pull an asynchronous request, ready for consumption byte buffer to the proxy. Pull each asynchronous request message containing the offset to be consumed. Kafka use sendfile API efficiently distribute byte from the agent to the consumer segment of the log file.
Figure 3: Kafka storage architecture
Kafka Agent
Different from other messaging systems, Kafka agent is stateless. This means that consumers must maintain state information for consumption. The information from the consumer to maintain their own, regardless of the agent completely. This design is very subtle, which itself contains innovation.
Delete a message from the agent becomes very difficult, because the agent does not know whether the consumer has used the news. Kafka innovative solution to this problem, it is a simple time-based SLA applies retention policies. When a message in the agent over a certain time, it will be automatically deleted.
This innovative design has a great advantage, consumers can deliberately fall back to the old offset consumption data again. This violates the common agreement of the queue, but it proved to be a basic feature of many consumers.
ZooKeeper and Kafka
Consider multiple servers distributed system, each server is responsible for saving data, perform operations on the data. Such potential examples include distributed search engine, distributed build system or systems known as Apache Hadoop. A common problem with all of these distributed systems is how to you at any point in time to determine which server is alive and at work. The most important thing is, when faced with these problems of distributed computing, such as network failure, bandwidth limitations, the variable delay connection, security issues and any network environment, errors may occur even across multiple data centers when, how reliable you do these things. These are precisely the Apache ZooKeeper issues of concern, it is a fast, highly available, fault-tolerant, distributed coordination services. You can use ZooKeeper to build reliable, distributed data structure for members of the group, leader election, collaborative workflow and configuration services as well as generalized distributed data structures such as locks, queue barrier (Barrier) and latch device (Latch). Many well-known and successful project depends on ZooKeeper, including HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (Incubating) and Accumulo.
ZooKeeper is a distributed, hierarchical file system, promote loose coupling between the client and to provide the final agreement, similar Znode view of a conventional file system files and directories. It provides basic operations such as create, delete, and check Znode exists. It provides event-driven model, the client can observe the changes in specific Znode, such as existing Znode adds a new child node. ZooKeeper ZooKeeper to run multiple servers, called Ensemble, for high availability. Each server memory to hold a copy of the distributed file system for read requests from the client to provide services.
Figure 4: ZooKeeper Ensemble architecture
Figure 4 shows a typical ZooKeeper ensemble, a server as a Leader, the other as a Follower. When Ensemble started, first elected Leader, then copy all Follower Leader status. All write requests are routed through the Leader, the change will be broadcast to all Follower. Change broadcast is called atomic broadcast.
Kafka in ZooKeeper uses: As ZooKeeper for coordinating and facilitating distributed systems, Kafka use ZooKeeper is based on the same reasons. ZooKeeper is used to manage and coordinate Kafka agent. Each Kafka Kafka agent other agents have coordinated through ZooKeeper. When Kafka system added a proxy or a proxy failure fails, ZooKeeper service will notify producers and consumers. According to producers and consumers to work with other agents to coordinate. Kafka overall system architecture shown in Figure 5.
Figure 5: Overall architecture Kafka distributed systems
Apache Kafka compared to other messaging services
Let's look at two projects using Apache Kafka in contrast to other messaging services. These two projects are LinkedIn and my project:
Studies on LinkedIn
LinkedIn team did an experiment study comparing the performance of Kafka and Apache ActiveMQ V5.4 and the RabbitMQ V2.4. They use the default ActiveMQ message persistence library Kahadb. LinkedIn two Linux machines running on their experiments, each machine is configured for 8-core 2GHz, 16GB of memory, six disk use RAID10. Two machines connected via a 1GB network. A machine as a proxy, and the other as producers or consumers.
Manufacturer test
LinkedIn team to configure the proxy in all systems, asynchronous messages brush into its persistence library. For each system, a run producer, posted a total of 10 million messages, each 200 bytes. Kafka producers and 50 batches of 1 to send a message. ActiveMQ and RabbitMQ seems to be no simple way to send bulk messages, LinkedIn assumes its bulk value of 1. The results are shown in Figure 6, below:
Figure 6: Manufacturer's performance results LinkedIn
The main reason Kafka much better performance include:
Kafka does not wait for confirmation proxy to proxy the fastest processing speed can send a message.
Kafka more efficient storage format. On average, Kafka has 9 bytes per message overhead, and ActiveMQ 144 bytes. The reason is that the required heavy JMS message headers, as well as a variety of index maintenance overhead structure. LinkedIn busiest noted ActiveMQ a thread most of the time in order to maintain access B-Tree metadata and status messages.
Consumer Testing
In order to do consumer testing, LinkedIn uses a total of 10 million consumers to obtain information. LinkedIn so that all systems every pull request to prefetch about the same amount of data, up to 1000 messages or 200KB. For ActiveMQ and RabbitMQ, LinkedIn Consumers are set to automatic recognition model. The results shown in Figure 7.
Figure 7: Consumer LinkedIn's performance results
The main reason Kafka much better performance include:
Kafka has a more efficient storage format; in Kafka, the transmission from the proxy to the consumer byte less.
ActiveMQ and RabbitMQ two container transport agents must maintain the status of each message. LinkedIn ActiveMQ team noted that one of the threads in the testing process, the KahaDB page has been written to disk. In contrast, Kafka agent no disk write operation. Finally, Kafka by using sendfile API reduces the transmission overhead.
Currently, a project I was working on providing real-time services, quickly and accurately extract the OTC market (OTC) pricing content from the message. This is a very important project, handling nearly 25 kinds of financial information asset classes, including bonds, loans and ABS (asset-backed securities). The original source of the information project covers the major financial markets in Europe, North America, Canada and Latin America. Here are some statistics of the project, including a description of efficient distributed messaging solutions how important it is:
Message handling daily quantity exceeds 1,300,000;
Analytical daily OTC price quantity exceeds 12,000,000;
It supports more than 25 kinds of asset classes;
Independence day, more than 70,000 analytical instruments.
The message contains the PDF, Word documents, Excel and other formats. OTC pricing may also be extracted from the annex.
Due to performance limitations of traditional news server, when dealing with large attachments, the message queue becomes very large, our project is facing serious problems, JMSqueue one day need to start 2-3 times. Restart JMS queue may lose all the messages in the queue. Projects require a framework, regardless of the parser (consumer) behavior of how, are able to keep the message. Kafka characteristics are well suited to the needs of our project.
The current projects have the characteristics:
Get use Fetchmail Remote Mail message, and then filtered by Procmail and processes, such as a separate annex distributed based messaging.
Each message is obtained from a separate file, the file is processed (read and delete) as a message into the message server.
Get news content from the message queue service for parsing and extracting information.
Ol>
Sample Application
This example is based on the original application to modify the application I use in the project version. I have deleted the use of multi-threading features and logs the workpiece sample application as simple as possible. The purpose of the sample application is to show how Kafka producers and consumers of the API. Applications include a sample of producers (producers of simple code that demonstrates Kafka producers API usage and publish specific topic messages), consumer example (simple consumer code that demonstrates the API usage for consumers Kafka) and message content generation API (generates API message contents to a file in a specific path). The following figure shows the various components and their relationship with other components of the system.
Figure 8: Sample Application component architecture
The structure of Kafka sample application source code examples of procedures similar. Application source code contains the Java source folders 'src' and 'config' folder, which includes several configuration files, and some Shell scripts for executing sample application. To run the sample application, refer to the description ReadMe.md file or GitHub site Wiki page.
You can use the program to build Apache Maven, it is also very easy to customize. If someone wants to modify or customize the code sample application, there are several Kafka build script has been modified, can be used to rebuild the sample application code. A detailed description of how to customize the sample application has been placed on the project's GitHub Wiki page.
Now, let's look at an example of the application of core workpiece.
Kafka producer code sample
/ **
* Instantiates a new Kafka producer.
*
* @param Topic the topic
* @param DirectoryPath the directory path
* /
public KafkaMailProducer(String topic, String directoryPath) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props));
this.topic = topic;
this.directoryPath = directoryPath;
}
public void run() {
Path dir = Paths.get(directoryPath);
try {
new WatchDir(dir).start();
new ReadDir(dir).start();
} catch (IOException e) {
e.printStackTrace();
}
}
The above shows the basic usage of Kafka producers API, for example, set the attributes of producers, including the release of which topic messages which serialization class and proxy information can be used. The basic function of this class is to read mail message from the mail file directory, and then publish the proxy to Kafka as a message. Directory by java.nio.WatchService class monitors, once the new e-mail message to the Dump directory, it will be read as a message immediately and to publish Kafka agent.
Kafka consumer code sample
public KafkaMailConsumer (String topic) {
consumer =
Kafka.consumer.Consumer.createJavaConsumerConnector (createConsumerConfig ());
this.topic = topic;
}
/ **
* Creates the consumer config.
*
* @return The consumer config
* /
private static ConsumerConfig createConsumerConfig () {
Properties props = new Properties ();
props.put ( "zookeeper.connect", KafkaMailProperties.zkConnect);
props.put ( "group.id", KafkaMailProperties.groupId);
props.put ( "zookeeper.session.timeout.ms", "400");
props.put ( "zookeeper.sync.time.ms", "200");
props.put ( "auto.commit.interval.ms", "1000");
return new ConsumerConfig (props);
}
public void run () {
Map < String, Integer > topicCountMap = new HashMap < String, Integer > ();
topicCountMap.put (topic, new Integer (1));
Map < String, List < KafkaStream < byte [], byte [] > > > consumerMap = consumer.createMessageStreams (topicCountMap);
KafkaStream < byte [], byte [] > stream = consumerMap.get (topic) .get (0);
ConsumerIterator < byte [], byte [] > it = stream.iterator ();
while(it.hasNext ())
System.out.println (new String (it.next () message ()).);
}
The above code demonstrates the basic consumer API. As we mentioned earlier, consumers need to set the consumer message flow. In the Run method, we were set up, and print the received message in the console. In my project, we will enter it into the system to extract the analytic OTC pricing.
In the current quality assurance system, we use Kafka as a message server for proof of concept (Proof of Concept, POC) project, its overall performance is better than a JMS message service. One of the features we are very excited, then consume messages (re-consumption), which makes our analysis system can be re-resolve some of the messages according to business needs. Kafka based on these good results, we are planning to use it, rather than Nagios system, do log aggregation and analysis.
to sum up
Kafka is a new system of processing large amounts of data. Kafka pull consumer model allows consumers to own speed processing messages. Exception occurred while processing the message, the consumer can always choose again consume the message.
About the Author
Abhishek Sharma financial products in the field of natural language processing (NLP), machine learning and analytical programmers. He provides analytical algorithm design and development for multiple companies. Abhishek interests include distributed systems, natural language processing and machine use big data analysis algorithms.
|
|
|
|