After performing performance tests on Kafka and Pulsar, Lakala replaced the messaging platform with Pulsar.

After performing performance tests on Kafka and Pulsar, Lakala replaced the messaging platform with Pulsar.

Founded in 2005, Lakala Payment is a leading third-party payment company in China. It is committed to integrating information technology, serving offline entities, starting from payment, and empowering small, medium and micro merchants in all dimensions. In 2011, it became one of the first batch of companies to obtain the "Payment Business License", and in the first half of 2019, it served more than 21 million merchants. On April 25, 2019, it was listed on ChiNext.

Functional Requirements

Due to the large number of project teams in Lakala, each project selected its own message system according to its needs. This leads to coupling between the business logic of many systems and the specific message system on the one hand, causing trouble for subsequent system maintenance and upgrades; on the other hand, business team members have different levels of management and use of the message system, which makes the overall System service quality and performance are unstable; in addition, maintaining multiple systems at the same time leads to high physical resource utilization and management costs. Therefore, we plan to build a distributed basic messaging platform and provide services to various teams at the same time. The platform needs to have the following characteristics: high reliability, low coupling, tenant isolation, easy horizontal expansion, easy operation and maintenance, unified management, on-demand application and use, while supporting traditional message queues and streaming queues. Table 1 shows the characteristics that these two types of services should have.

Why choose Apache Pulsar

There are many large open source messaging platforms for users to choose from, and most of the architecture designs are similar. For example, Kafka and RocketMQ both use an integrated storage and computing architecture, and only Pulsar uses a multi-layer architecture that separates storage and computing. There are three messaging systems we choose: Kafka, RocketMQ and Pulsar. Before the test, we made a simple comparison of the performance and functions of the three through public data on the Internet. Table 2 shows the comparison results. It can be seen that Pulsar is more in line with our needs.

Pulsar's architectural advantages

Pulsar is a cloud-native distributed message flow platform, derived from Yahoo!, supports Yahoo! applications, serves 1.4 million topics, and processes more than 100 billion messages per day. In 2016, Yahoo! open sourced Pulsar and donated it to the Apache Software Foundation. In 2018, Pulsar became the top project of the Apache Software Foundation.

As a high-performance solution, Pulsar has the following features: support for multi-tenancy, through multi-tenancy, each tenant can be individually set up authentication mechanism, storage quota, isolation strategy, etc.; high throughput, low latency, high fault tolerance; native support for multiple clusters Deploy, support seamless data replication between clusters; highly scalable, capable of supporting millions of topics; support multi-language clients, such as Java, Go, Python, C++, etc.; support multiple message subscription modes (exclusive, shared, disaster Equipment, Key_Shared).

Reasonable architecture Kafka adopts an integrated architecture of computing and storage. When the number of topics is large, Kafka's storage mechanism will cause cache pollution and reduce performance. Pulsar uses an architecture that separates computing and storage (see Figure 1). The stateless computing layer is composed of a group of brokers that receive and deliver messages. The brokers are responsible for communicating with business systems, and are responsible for protocol conversion, serialization and deserialization, and master selection. The stateful storage layer consists of a set of bookie storage nodes, which can store messages persistently.

Broker architecture

Broker is mainly composed of four modules. We can carry out secondary development of corresponding functions according to actual needs.

Dispatcher: Scheduling and distribution module, responsible for protocol conversion, serialization and deserialization, etc.

Load balancer: Load balancing module, which controls and manages access traffic.

Global replicator: Cross-cluster replication module, undertaking asynchronous cross-cluster message synchronization function.

Service discovery: The service discovery module selects a stateless master node for each topic.

Persistence layer (BookKeeper) architecture

Figure 3 shows the architecture diagram of the persistence layer in Pulsar. Bookie is the storage node of BookKeeper and provides independent storage services. ZooKeeper is a metadata storage system that provides service discovery and metadata management services. The BookKeeper architecture belongs to a typical slave-slave architecture. The role of all bookie nodes is slave, responsible for persisting data, and the processing logic of each node is the same; the BookKeeper client is the leader role and undertakes coordination work. Because it is stateless, So you can quickly achieve failover.

Isolation architecture

To ensure the excellent performance of Pulsar, it is mainly reflected in the following aspects:

IO isolation: write, rear-end read, and catch-up read isolation.

Utilizing the characteristics of network inflow bandwidth and sequential disk writes to achieve high-throughput writing: Traditional disks have high bandwidth during sequential writes, and scattered reads and writes result in reduced disk bandwidth. Sequential writes can improve performance.

Utilize the network outgoing bandwidth and the IOPS processing power provided by multiple disks to achieve high-throughput reading: After receiving the data, write it to a better-performing SSD disk for level-one caching, and then use asynchronous threads to write the data to the In the traditional HDD hard disk, the storage cost is reduced.

Use all levels of caching mechanism to achieve low-latency delivery: when the producer sends a message, the message is written to the broker cache; when it is consumed in real time (reading after the end), the data is first read from the broker cache, avoiding reading from the persistence layer bookie, Thereby reducing delivery delays. In the scenario of reading historical messages (catch-up reading), bookie will read the disk messages into the bookie read cache, thereby avoiding reading the disk data every time and reducing the read latency.

Comparison summary

On the left is the architecture design adopted by Kafka, RabbitMQ and other messaging systems. The broker node is responsible for both computing and storage. In some scenarios, using this architecture can achieve high throughput; but when the number of topics increases, the cache will be polluted and affected. performance.

On the right is the architecture of Pulsar. Pulsar splits the broker and adds the BookKeeper persistence layer. Although this will increase the design complexity of the system, it can reduce the coupling of the system and make it easier to implement functions such as scaling and failover. Table 3 summarizes the main characteristics of the partition architecture and the shard architecture.

Based on Pulsar's architecture and functional characteristics, we tested Pulsar. At the operating system level, NetData tools are used for monitoring, and data packets of different sizes and frequencies are used for pressure testing. Several important indicators of the test are the fluctuations of disks and network bandwidth.

The test conclusions are as follows:

**Deployment method: **Hybrid deployment is better than separate deployment. The broker and bookie can be deployed on the same node or separately. When the number of nodes is large, it is better to deploy separately; when the number of nodes is small or the performance requirements are high, it is better to deploy the two on the same node, which can save network bandwidth and reduce delay.

**Load size: **As the test load increases, tps decreases and throughput is stable.

**Washing method: **Asynchronous brushing is better than synchronous brushing.

Compression algorithm : LZ4 is recommended for compression algorithm. We tested several compression methods that come with Pulsar. When using the LZ4 compression algorithm, the CPU usage is the lowest. Using compression algorithms can reduce network bandwidth usage, with a compression ratio of 82%.

**Number of partitions:** If a single topic does not reach the upper limit of the physical resources of a single node, it is recommended to use a single partition; since Pulsar storage is not coupled with partitions, the number of partitions can be adjusted at any time according to business development.

**Number of topics: **In the process of stress testing, increasing the number of topics will not affect performance.

**Resource constraints: **If the network bandwidth is Gigabit, the network will become a performance bottleneck, and the network IO can reach 880 MB/s; when the network bandwidth is 10 Gigabit, the disk will become the bottleneck, and the disk IO utilization rate will be about 85% .

**Memory and threads: **If you use a physical host, you need to pay attention to the ratio of memory to the number of threads. The default configuration parameter is that the number of IO threads is equal to 2 times the number of CPU cores. In this case, the number of cores in the physical machine is 48. If the memory is set to a small value, OOM problems are more likely to occur.

In addition to the above tests, we also retested the destructive test cases of Jack Vanlightly (the test engineer of RabbitMQ), and got the following conclusions:

In all test scenarios, there is no message loss or out of order;

In the scenario where message deduplication is enabled, there is no message duplication.

In addition, we communicated with the core developers of the Apache Pulsar project earlier. They have rich practical experience in Yahoo! and Twitter, and are preparing to establish a company to promote the use of Pulsar all over the world, and will regard China as the most important Base, which provides a strong guarantee for our use. Now everyone knows that they established StreamNative and have received multiple rounds of financing, and the team is constantly growing.

Pulsar's practice in basic messaging platform

Our basic messaging platform architecture based on Pulsar is shown in the figure below. The green part in the figure is the function or component developed based on Pulsar. This section will combine actual usage scenarios to introduce in detail how we apply Pulsar and components developed based on Pulsar in actual usage scenarios.

Scenario 1: Streaming queue

1. OGG For Pulsar Adapter

The source data is stored in Oracle. We hope to capture Oracle's changed data in real time, perform real-time calculation, data analysis, and provide it to downstream business systems for query scenarios.

We use Oracle's OGG (Oracle Golden Gate) tool for real-time crawling, which contains two modules: source OGG and target OGG. Since OGG officially did not provide components from Sink to Pulsar, we developed OGG For Pulsar components as needed. The following figure shows the data processing process diagram. OGG will capture the addition, deletion and modification operations of each record in the table, and push each operation as a message to the OGG For Pulsar component. The OGG For Pulsar component will call the producer interface of the Pulsar client for message delivery. During the delivery process, the order of messages needs to be strictly guaranteed. We use the primary key of the database table as the key of the message. When the amount of data is large, the topic can be partitioned according to the key, and the same key can be delivered to the same partition, so as to ensure that the addition, deletion, and modification operations performed on the records with the same primary key in the database table sequence.

2. Pulsar To TiDB component

We store the captured change messages in TiDB through the Pulsar To TiDB component, and provide query services to downstream systems. The processing logic of this component is:

Use disaster recovery subscription method to consume Pulsar messages.

Hash operation is performed according to the key of the message, and the same key is hashed to the same persistence thread.

Enable Pulsar's message deduplication function to avoid repeated message delivery. Assuming that MessageID2 is delivered repeatedly, data consistency will be destroyed.

3. Pulsar's message persistence process analysis

Pulsar's message persistence process includes the following four steps:

The OGG For Pulsar component calls the producer interface of the Pulsar client to deliver messages.

The Pulsar client obtains the address of one of the brokers according to the list of broker addresses in the configuration file, and then sends the topic attribution query service to obtain the address of the broker serving the topic (broker2 in the example below).

The Pulsar client delivers the message to Broker2.

Broker2 calls BookKeeper's client for persistent storage. The storage strategy includes the total number of bookies that can be selected for this storage, the number of copies, and the number of successful storage confirmation replies.

4. Dynamic transfer of database table structure

When OGG uses the AVRO method for serialization, if multiple tables are delivered to the same topic, the AVRO Schema has a secondary structure: wrapper schema and table schema. The wrapper schema structure is always the same and contains three parts of information: table_name, schema_fingerprint, and payload; when OGG crawls data, it will sense the changes in the database table structure and notify OGG For Pulsar, that is, the table structure determines its table schema, and then the table schema Generate the corresponding schema_fingerprint.

We send and store the obtained table schema in the specified Schema topic. The message in the Data topic only contains schema_fingerprint information, which can reduce the size of the message packet after serialization. When Pulsar To TiDB starts, it consumes data from the Schema topic, and uses schema_fingerprint as the Key to cache the table schema in memory. When deserializing the message in the Data Topic, extract the table schema from the cache according to the schema_fingerprint, and deserialize the payload.

5. Consistency Guarantee

To ensure the order and de-duplication of messages, it needs to be set from three aspects: broker, producer, and consumer.

Broker

Enable the deduplication function at the namespace level: bin/pulsar-admin namespaces set-deduplication namespace --enable

Fix/optimize the deadlock problem of Pulsar client. Version 2.7.1 has been fixed. For more information, please refer to PR 9552.

Producer

pulsar.producer.batchingEnabled=false

In the producer settings, turn off batch sending. If you enable batch message sending, the messages may be out of order.

pulsar.producer.blocklfQueueFull=true

In order to improve efficiency, we send messages asynchronously and need to open blocking queue processing, otherwise message loss may occur. The call to asynchronous send timed out and sent to the abnormal topic. If the message is repeated when the message is resent after asynchronous timeout, it can be handled by turning on the automatic deduplication function; the message sending timeout in other cases needs to be handled separately, we store these messages in the abnormal topic, and then reconcile them The program obtains the final state data directly from the source library.

Consumer

Implement interceptor: ConsumerInterceptorlmpl implements ConsumerInterceptor

Configuration confirmation timeout: pulsarClient.ackTimeout(3000, TimeUnit.MILLISECONDS).ackTimeoutTickTime(500, TimeUnit.MILLISECONDS)

Use cumulative confirmation: consumer.acknowledgeCumulative(sendMessageID)

**Note: **Configure the confirmation timeout parameter. If the consumption confirmation is not made within the ackTimeout time, the message will be re-delivered. In order to strictly ensure consistency, we need to use the cumulative confirmation method to confirm.

6. Confirmation of message consumption

If the message with MessageID of 1 has been confirmed that the consumption is successful, the cumulative confirmation method is started, and the message with MessageID of 3 is being confirmed at this time, the message with MessageID of 2 that has been consumed but not confirmed will also be confirmed as successful. If the confirmation has not been received within the "confirmation timeout" time, the messages with MessageID 2, 3, 4, and 5 will be re-delivered in the original order.

If a single confirmation method is used, the messages with MessageID 1, 3, 4 in the figure confirm that the consumption is successful, and the message with MessageID 2 "confirms timed out". In this case, if the application does not handle properly and does not confirm one by one in the order of consumption, when the message "Confirm Timeout" appears, only the message that has timed out (that is, the message with MessageID 2) will be re-delivered, causing the consumption sequence to occur Confused.

**Summary: **Single confirmation is recommended for queue consumption mode, and cumulative confirmation is recommended for streaming consumption mode.

7. Message confirmation timeout (client side) detection mechanism

There are two parameters in the confirmation timeout mechanism, the timeout period and the polling interval. The timeout detection mechanism is implemented by a two-way queue + multiple HashSets. The number of HashSets is (timeout time) divided by (polling interval) and then rounded, so each polling processes one HashSet, thereby effectively avoiding the performance loss caused by global locks.

Scenario 2: Message queue: OpenMessaging protocol implementation (transparent layer protocol)

Many business systems we used in the past were strongly coupled with the messaging system, which led to troublesome follow-up upgrades and maintenance, so we decided to use the OpenMessaging protocol as the middle layer for decoupling.

Implement the OpenMessaging protocol through Pulsar.

The development framework (based on spring boot) calls the OpenMessaging protocol interface to send and receive messages.

Scenario 3: Streaming queue: Custom Kafka 0.8-Source (Source development)

Pulsar IO can be easily connected to various data platforms. Some of our business systems use Kafka 0.8, and the official source does not provide the corresponding source. Therefore, we developed the Kafka 0.8 Source component according to the interface definition of Pulsar IO.

Scenario 4: Streaming queue: Function message filtering (message filtering)

We use Pulsar Functions to desensitize the sensitive fields in the Pulsar IDC cluster message (such as ID number, mobile phone number) and synchronize them to the cloud cluster in real time for consumption by cloud applications.

Scenario 5: Streaming queue: Pulsar Flink Connector streaming computing (streaming computing)

In the business analysis scenario of the merchant, Flink connects to Pulsar through the Pulsar Flink Connector, performs real-time calculations on the pipeline data according to different dimensions, and persists the calculation results into TiDB through Pulsar. Judging from the current usage, the performance and stability of Pulsar Flink Connector are performing well.

Scenario 6: Streaming queue: TiDB CDC adaptation (TiDB adaptation)

We need to perform real-time capture based on TiDB data changes, but the TiDB CDC For Pulsar serialization method does not support the AVRO method, so we have customized development for this use scenario, that is, first encapsulate the data sent from TiDB, and then deliver it to Pulsar in. The development language of the TiDB CDC For Pulsar component is Go language.

future plan

Our basic messaging platform based on Pulsar effectively improves the use efficiency of physical resources; using a set of messaging platform simplifies system maintenance and upgrade operations, and the overall service quality is also improved. Our plan for the future use of Pulsar mainly includes the following two points:

Continue to offline other messaging systems, and finally all access to the Pulsar basic messaging platform;

Deeply use Pulsar's resource isolation and flow control mechanism.

In practice, with the help of many native features of Pulsar and components developed based on Pulsar, the new messaging platform perfectly fulfilled our expected functional requirements.