preface
Hello everyone, I am ChinaManor, which literally translates to Chinese code farmer. I hope I can become a pathfinder on the road of national rejuvenation, a ploughman in the field of big data, an ordinary person who is unwilling to be mediocre.
This is the mind map for real time technologymanor
Will update the reading “Alibaba big Data practice” chapter 5 real-time technology
2 streaming technology architecture
In the flow computing technology, it is necessary for each subsystem to form a data processing link depending on each other to produce results and finally provide real-time data service. In the actual technology selection, there are a lot of open source technical solutions, but the overall architecture of each solution is similar except that the implementation principles of each subsystem are not quite the same. In addition, systems in the streaming technology architecture are intersecting with offline processing, the two sets of technology solutions are not completely independent, and there is a tendency to merge in the industry. According to the functional division of each subsystem, mainly divided into the following parts. 1 Data Collection
The data source is generally from the log server of each service (for example, the browsing behavior log of the website and the modification log of the order). These data are actually collected in the data middleware for real-time subscription of the downstream.
2 Data Processing
After the data is collected in the middleware, the downstream real-time subscription data is required, and the data is pulled to the task of the streaming computing system for processing. A streaming computing engine is required to support the execution of streaming tasks.
3 Data Storage
After data is processed in real time (such as aggregation, cleaning, etc.), it is written to an online service storage system for use by downstream callers. The writes here are incremental and are continuous.
4 Data Services
A unified data service layer (such as HSF interface and HTTP service) is installed on the storage system to obtain real-time computing results.
As you can see from Figure 5.2, real-time and offline are common in the data acquisition and data service sections, as there is no need to care about the timeliness of data in either layer. In this way, data sources can be unified to avoid the inconsistency between streaming processing and offline processing.
2.1 Data Collection
Data acquisition is the source of the whole data processing link, is the root node of all data processing links, since the need to achieve real-time computing, then naturally need to achieve real-time acquisition. The collected data are all from the service server. From the perspective of the collected data, it can be divided into two types:Database change log
For example, MySQL binlog logs, HBase Hlog logs, Ocean Base change logs, and Oracle change logs.Engine Access log
, such as pache engine logs generated by users visiting websites, search engine interface query logs, etc. Both database change logs and engine access logs are stored in files on the business server, so whenever the content of the monitoring file changes, the collection tool can collect the latest data. Under normal circumstances, for the consideration of throughput and system pressure, and add a ↓ has been recorded on the collection times, but based on the following principles, according to the batch of data collectionData size limit
: When the limit is reached, the new data currently collected is treated as a batch (for example, a 512KB batch).Time limit
: When the time reaches the specified condition, the collected new data is also used as a batch to avoid not collecting data in the case of a small amount of data (for example, writing a batch within 30 seconds). As long as one of the above conditions is met, it will be collected as batch new data. The parameters of the two conditions in the data middleware need to be set according to business requirements. When the batch is frequently collected, the delay can be reduced, but the throughput will inevitably decrease. The collected data needs a data exchange platform to distribute to the downstream, and this platform is data middleware. There are many ways to implement data middleware systems. For example, Kafka is the open source system, while TimeTunnel (similar in principle to Kafka) is widely used in Alibaba Group, as well as MetaQ Notify and other messaging systems. As you can see from Figure.3, the messaging system is upstream of the database change node, so its time is much lower than that of the data middleware, but the throughput it supports is limited. Therefore, the messaging system is typically used as a message relay for changes to the business database, such as order orders, payments, and so on. For other large business data (tens of capacity per day), it is generally transmitted through the data middleware system. Although its delay is in seconds, it supports high throughput. The performance comparison of messaging and data middleware is shown in Table 5.1.In addition, in some cases, some business and not through the message system into the row updates to the database (for example, some child business order data synchronization, by guide person MySQL) that is to say, from the message in the system to obtain the data are not comprehensive, but through database log data to get the business change process must be full. Therefore, in order to communicate with offline data sources, real-time data is generally acquired in the form of data middleware to collect database change data (this requires merging business primary keys in the data processing layer, for example, an order may be changed several times, and there will be multiple change records. Merge to get the latest data. Timeliness and throughput are two contradictory objects in data processing. In many cases, what kind of system should be used to do data transfer from the perspective of business
2.2 Data Processing
The real-time computing task is deployed on the streaming computing system, and the real-time source data is obtained through the data middleware for real-time processing. There are a variety of open source and non-open source streaming computing engine systems in use by major Internet companies. Widely used in the industry are Twitter’s Storm system, Yahoo’s S4 system, Apache Park Streaming and, in recent years, Flink. The overall architecture of these systems is similar, but the implementation of many details is not quite the same, suitable for different application scenarios. Alibaba Group in the use of more is ali cloud providedStreamCompute
The system, as the industry’s first full-link streaming computing development platform, covers every link from data acquisition to data production, ensuring the rigorous and reliable development of streaming computing. StreamSQL, which provides SQL semantics, eliminates the barriers to streaming data analysis. It wraps a layer of SQL semantics on top of Storm, making it easy for developers to write SQL to perform real-time calculations without worrying about the details of the calculation state, greatly improving development efficiency and lowering the threshold for streaming computing. Of course, it also supports traditional development patterns, like the Hive MapReduce relationship in Hadoop, which can be selected for different application scenarios. StreamCompute also provides a stream computing development platform, on which application o&M can be performed without the need to log in to the server, greatly improving o&M efficiency. The following to Stor as an example, a simple flow data processing principle. The entire topology of a real-time application is a directed acyclic graph (see Apache Storm’s official website: HTTP :// Storm.apache.org index.html), as shown in Figure 5.4.• POU: topology input. It reads data from the data middleware and sends it to the downstream bolt according to its own distribution rules. It can have multiple input sources. • Bolt: Business processing unit, which can be divided into multiple steps based on processing logic, with custom data distribution rules among them. For performance reasons, computing tasks are often multithreaded. Buckets are divided based on service primary keys, and most of the data required for calculation is stored in memory, which greatly improves application throughput. Of course, in order to avoid memory overflow, expired data in the memory needs to be cleared periodically. You can use the LRU (least recently used) algorithm or the service time set for clearing (for example, if the service time belongs to T-, it will be cleared today morning). Here are some typical problems encountered with real-time tasks. 1. De-weighting Indicator In BI (Business intelligence) statistical real-time tasks, there is a very high indicator of resource consumption, that is, de-weighting indicator. Because real-time tasks pursue processing performance, calculation logic is generally completed in memory, and intermediate result data will also be cached in memory, which brings the problem of excessive memory consumption. When calculating the weight, it is bound to save the details of the weight data. When the details of the weight data reach hundreds of millions or even billions, the memory can not be put, how to do? At this time need to be divided into two cases to see the exact weight. In this case, detailed data must be saved. In case of memory problems, data skew can be used to deal with the memory pressure of a node among multiple nodes. · Fuzzy de-weighting. If the amount of detailed deduplication data is very large and the service accuracy is not required, you can use related deduplication algorithms to reduce the memory usage to 1/1000 or even 1/10000 to improve the memory utilization. (I) Bloom filter This algorithm is the application of the bit array algorithm. It does not save the real detail data, but only the mark bits corresponding to the hash value of the detail data. Of course, hash collisions can occur, but the error rate can be controlled and the calculated deduplicates are smaller than the real values. Using this algorithm to store hundreds of millions of pieces of data requires only 100 MEgabytes of space. Scenario The statistics accuracy is not high and the statistics dimensions are too large. For example, the total network of various businesses UV data, the number of records reached tens of millions of results. Because bloom filters can be shared between dimensions. (2) Cardinality estimation The algorithm also uses the principle of hash to estimate the boundary of the existing number set according to the degree of data dispersion, so as to get the approximate sum of the deduplicating values. The estimated deduplicated value may be larger or smaller than the true value. It takes only a few kilobytes to store hundreds of millions of bytes of data. Application scenario: The statistical accuracy is not high and the statistical dimension is very coarse. For example, a whole plate of UV data, the result is only one record per day. Cardinality estimation cannot be shared between dimensions. For example, a cardinality estimation object is required to collect UV data for an entire day, so it is not suitable for fine-grained statistical scenarios. The implementation details of these two algorithms can be found on the Internet, but I won’t go into details here.
Data skew Data skew is a common problem encountered in ETL. For example, when calculating the total number of visitors or transaction volume in a day, the final result is only one, and related computing tasks should be completed on one node. When the amount of data is very large, the processing capacity of a single node is limited, and the performance bottleneck is inevitable. In this case, the data needs to be processed by buckets. The idea of bucket processing and off-line processing is the same. (1) The deduplication index is divided into buckets Has. The same value must be put into the same bucket for deduplication. Finally, the total value is obtained by adding the values in each bucket, which uses PU and memory resources of each bucket. (2) Non-deduplication indicator bucket data is randomly distributed to each bucket, and finally the value of each bucket is summarized, mainly using the capacity of each bucket. Transaction processing Because real-time computing is distributed processing, the instability of the system will inevitably lead to data processing may fail. For example, data cannot be sent due to network jitter or data is lost due to machine restart. In these cases, how to do the precise processing of the data? Almost all of the stream computing systems mentioned above provide mechanisms for automatic ACK of data, retransmission of failure, and transaction information. Timeout Period Data is processed by batch. If the batch data is processed out of time, data is resended from the end of the topology. In addition, the amount of data processed in batches should not be too large, and a stream limiting function (limiting the number of records or capacity of a batch of data, etc.) should be added to avoid data processing timeout. Each batch of transaction data will be accompanied by the transaction ID information. In the case of retransmission, the developer can determine the different processing logic between the first arrival and the retransmission according to the transaction information. Backup mechanism The developer needs to ensure that the in-memory data can be restored from external storage, so the intermediate result data used in the calculation needs to be backed up to external storage. All of the above mechanisms are used to ensure that the data is presentable.
2.3 Data Storage
During the running of a real-time task, many dimensions and indicators are calculated. The data needs to be stored in a storage system for recovery or association. There are three types of data involved: · Intermediate calculation results — In the process of real-time application, some states are saved (such as detailed data of deduplication indicators), which can be used to restore the memory field with data from the database in the event of a failure. · Final result data refers to the real-time result data processed by ETL, which is updated in real time, written with a high frequency and can be directly used by downstream. Dimension table data – In an offline computing system, a synchronization tool is used to guide people to an online storage system for real-time tasks to associate real-time streaming data. The use of dimension tables will be described in a later section. There are many types of databases, such as relational database, column database, document database, etc. So what characteristics should be paid attention to when choosing a database for real-time tasks? As mentioned above, real-time tasks are multithreaded, which means that the data storage system must be able to support multiple concurrent reads and writes, and the latency needs to be in milliseconds to meet the real-time performance requirements. In practice, column storage systems such as Base Tair MongoDB are generally used. Since these systems write data to memory first and then to disk, the write latency is in the millisecond level. Read requests also have a caching mechanism, and the latency can be in the millisecond level for multiple concurrent reads. However, the disadvantages of these systems are obvious. Take HBase as an example. A table requires a rowkey, which is sorted by ASCII code, just like indexes in relational data libraries. If the business side needs another way to read the data, it must reprint the Row key. From this Angle, HBase is not as convenient as relational databases. But a Base table can store TB or even tens of TB of data, and the relational database must be divided into different tables to achieve this level of data storage. Therefore, for the real-time calculation of massive data, non-relational database is generally used to cope with a large number of concurrent reads and writes. Here are some practical lessons for table name design and Rowkey design in data statistics. Table name Design Rule: Id of the summary layer + Data domain + Primary dimension + Time Dimension For example, DWS TRD _LR DTR indicates the transaction data at the summary layer. The transaction data is summarized based on the seller (Sir) primary dimension +O points until the end of the day (DTR). The advantage of this approach is that all data of the same main dimension are placed in a physical table, avoiding the excessive number of tables and difficult to maintain. In addition, you can intuitively see what data content is stored from the table name, which is convenient for troubleshooting. Rowkey Design Rules: MD5 + primary dimension + dimension ID + subdimension + Time dimension + subdimension For example: Seller ID First four digits of MD5 + Seller ID+ App-level category ID+ DDD + Level-2 category ID D5 As the first part of a row key, the first four digits of D5 hash data to balance server load and avoid hot issues. In the example above, the seller ID is in the main dimension and is mandatory when looking up data. A dimension identifier is generated for each statistical dimension to differentiate on the RowKey.
2.4 Data Services
After real-time data is stored in the storage system, users can obtain real-time data through unified data services. OneService, for example, which will be covered in the next chapter, has the benefit of eliminating the need to directly connect to databases, data sources, and other information to be maintained in the data services layer, making it transparent to downstream when the storage system is migrated. The caller only needs to use the interfaces exposed by the service layer and does not need to care about the implementation of the underlying fetch logic. Differences between storage systems are masked and logs are generated in a unified manner, facilitating analysis and monitoring of downstream usage.
conclusion
The above is alibaba’s big data practice | Real-time Technology Chapter Streaming technology architecture (II) The next chapter will talk about Alibaba’s streaming data model
May you have your own harvest after reading, if there is a harvest might as wellThree even a key
See you next time 👋·