2. 1 November

Double tenth is one of the national people’s shopping carnival, but for ali technical people, double tenth is a big test, annual technical person exhausted every means to safeguard user experience and the reliability of stability from the underlying network, infrastructure, capacity planning, performance optimization and personalized recommendation, intelligent search, complex marketing play, Every level of the whole technical support system is constantly evolving and producing a large number of technological innovations. Weak water three thousand only take a scoop, if you pay attention to double eleven, you will find that there are a lot of “sports goods” venue page similar to the first picture below, this kind of venue has two characteristics:

  1. Characteristics are similar. (For example, the goods in Figure 1 are sports goods)

  2. A thousand faces. (The goods seen by each user are personalized, thus maximizing the probability of transaction) How to accurately select the goods that meet the demands of the venue from hundreds of millions of goods, and deliver them to consumers in a personalized way, and finally maximize the probability of transaction? Behind the technical support system we call “selection system”, this paper mainly introduces the free fish selection system technology evolution, hoping to bring you some ideas to solve this kind of problem.

2. Evolution of free fish selection system

2.1 Business problems solved by product selection system

Xianyu is similar to most e-commerce systems in that there are three core links:

  1. Commodity release, the seller will need to sell the goods released to the platform.

  2. Commodity display, the platform through a variety of ways to show the right goods to the right buyers, buyers through commodity browsing to form the purchase intention.

  3. Commodity transaction: after the seller confirms the purchase intention, the seller orders the goods, the merchant delivers the goods, and the after-sales service completes all the links of the money and barter exchange. The problems solved by the product selection system are mainly concentrated in the link of commodity display. Especially in large-scale activities like Double Eleven, the product selection system can effectively combine the operation students’ in-depth understanding of the industry with the personalized algorithm, construct a large number of venues and channels, and select high-quality goods to present to consumers.

The selection system is roughly divided into three layers as shown above.

  • Basic data layer. All the data related to the goods are very rich in sources, which may be the basic information of the goods released by the seller, such as brand and size; It may also be the algorithm’s understanding and analysis of goods, such as picture quality, product quality; It can also be the statistical dimension of data, such as the statistics of product page views, product praise rate statistics, etc. Often, the magnitude of data in the underlying data layer determines the type of technology architecture that follows.

  • Operational experience layer. By setting various rules for selecting products (referred to as “rules for selecting products”), the operation selects the collection of products that reflects its will from the mass of products. For example, sports goods venue, the operation can be in accordance with the sports brand brand, high-quality sellers and other rules as the selection of sports goods to circle the collection of basic goods. Typically, this collection is already several orders of magnitude smaller than the underlying data.

  • Algorithm capability layer. In the product collection selected by the operation, the recommendation engine is used to recommend the product combined with the user’s personalized information to generate the feed stream of the conference page seen by the user. The core of the algorithm capability layer is to select the goods in the set of goods in the operation circle through different sorting algorithm strategies, and rank the goods that are most likely to generate purchase intention for users in the front.

The business scenario of Xianyu has a significant feature that is different from other e-commerce companies: Xianyu products are all unique products. Once someone buys them, they cannot be sold any more and need to be excluded from the feed stream. At the same time, the new product or the change of the characteristics of the product itself must immediately affect the downstream selection result set. To sum up, the core differentiated appeal of idle fish voting system is: second-level real-time requirements.

2.2 Search based selection implementation

In the early stage of Xianyu, like many startups, the number of products on the platform was relatively small and there was no professional algorithm team, but there were still a lot of demands for product selection. For example, when college students graduated in June, they organized “graduation season” activities, and the product selection conditions were as follows: The seller is a college student, and the title of the product includes “graduation”. The product belongs to books, electrical appliances, clothing and other categories, and the price is less than 300 yuan. Buyers sort their feeds by distance, prioritizing those closest to them. In the technology architecture of the time, search was the source of almost all feeds pages, and selection was naturally based on search.

  • After the user publishes the commodity and finishes writing to the commodity library, the reference sends an asynchronous message to the messaging middleware, which uses Alibaba’s RocketMQ[1], which is now open source.

  • After application B receives messages from message-oriented middleware, it will obtain supplementary information other than basic commodity information required by dump table through RPC calls from other applications, and perform some data conversion, merging and other actions to assemble data structure for updating search engine, which is first written into the search dump table borne by mysql. Then the real-time index update service exposed by search engine is called through HTTP interface for real-time index update.

  • Each night, the scheduled task also triggers full dump of the search, which is to pull data from the search dump table to the engine side, and then complete the full index building and online version replacement.

  • Set up the backstage operation selection page, and construct a form for the conditions that can be searched. The operation students can select and assemble conditions on the form, and preview the selection results under the assembly conditions. After satisfactory adjustment, they can save the search conditions and publish them to the production system.

  • The feed request of the front-end page obtains the selection rule based on the id of the selection configuration, which is translated into a search query string, and sends the request to the search system to obtain the results of the feeds.

    Solr was selected as the search engine. In addition to the small business volume at that time (data volume and query QPS were not high), the main reason was that a team in the company built a search platform for small and medium data volume businesses based on Solr. Direct access saved a lot of work in operation and reliability assurance. Search engines can be ElasticSearch or any other handy search engine.

    Mentioned before, carefree fish goods orphan works feature requires us to choose product platform must satisfy the real time, at this stage, we guarantee that motion is the core of real-time optimization of real-time visible index update delay, native solr when there are lots of problems on real-time index updates, search team to optimize a lot of work, can refer to this article in detail share [2], Finally, we were able to do real-time index updates to the foreground search with a visible delay of 100 milliseconds.

    Although the structure of this product is a little primitive, it has supported our appeal for a long time, and even now, this structure is still of reference value for small and medium-sized companies.

    Inside Ali, there are still many search-based selection platforms, which are generally constructed by combining algorithm recommendation layer. The core function of search engine is:

    1. Real-time preview of selection results based on conditions

    2. The set of goods derived to the algorithm based on conditions

By exporting the selection result set from the search engine and writing it into KV storage, the recommendation algorithm recalls the data from this result set when making recommendations, and then sorts the data based on user behavior and other algorithm data. The detailed engineering support of the algorithm module will be introduced below. Note the part in red font in the schematic diagram of this architecture, selection result data is exported in pull mode, which means there are only two ways to update the KV storage of selection result set:

  1. When a new or updated selection rule is created, the trigger system pulls the data hit by the rule from the engine and writes it to KV storage.

  2. In the early hours of every morning when traffic is low, scheduled tasks execute all selection rules configured in the production environment and write the results to KV storage. This means that the new changes on the product cannot be updated in real time into the selection result set. Several possible solutions are as follows:

  3. Full data export is performed periodically. This is not feasible because a full export is large, takes a long time to execute, puts a lot of strain on the system, and in any case, it is not possible to do a full continuous export and recalculation in seconds.

  4. Incremental data export. This is not feasible because the engine itself is updated in real time. Even with the same rule, incremental recall based on index timestamps can lead to data duplication, and performance bottlenecks can quickly occur when there are many rules. It is not acceptable for idle fish to select products in this mode due to the characteristics of the second level real-time requirements of the selected products. But for inventory, less goods change, electricity system for real-time demand is not high, this solution fully meet the business demands, readers can think about even more inventory, still there may be goods sold out, be punished as a result of the shelves, these cases how to ensure the final delivery to the user in feeds and effective weed out these goods?

2.2 Realization of product selection based on offline computing

We call the online real-time data query services similar to those provided by search engines as online services, and offline computing refers to the complex calculation of massive data and the output of calculation results by submitting computing tasks based on big data computing platform.

In this architecture, the core computing logic executes all selection rules through scheduled tasks on the big data platform, which can give full play to the capabilities of the computing platform to perform super-large scale and complexity calculations without the need to build additional auxiliary systems such as search engines. There are many kinds of big data computing platforms. Currently, Alibaba mainly uses the MaxComputer platform [3]. Of course, it can also build big data systems such as Kafka (data acquisition channel) +Hadoop cluster +Hive (SQL query) + Spark (programmable access and batch processing) to support massive data processing. Obviously, in this mode, it is still difficult to meet the real-time requirements of idle fish in seconds.

2.3 Core links and technical challenges of real-time product selection system

To sum up the previous two different selection schemes, the following core link can be summarized to realize the real-time selection system that meets the demands of idle fish business.

  • Real – time updated commodity data sheet. A wide table is a table with many columns. The data generated by various data sources that can be used for selection rule calculation should be written into the wide table in real time in the form of independent columns.

    Challenge 1, you need to support incremental and full data merging at the standalone field level. Because the selection platform data sources are diverse, and the output time is different.

    Such as idle fish the release process of goods is not compulsory for the user to select category information, but based on user input the title of the description and pictures, through the online algorithm for real-time category forecast, but online category prediction algorithm at the expense of precision to ensure real-time, therefore, algorithm through offline data computing platform, High-precision re-prediction of the category of the full quantity of goods will be carried out every day, which is the reason why the original data should be imported into the wide data table of selected products in full and incremental modes every day. It is a solution that takes into account data consistency and real-time data. Most real-time search engines adopt similar solutions. Full and incremental data dump is used to synchronize and update index and DB data in real time. As long as there are incremental and full updates, there must be the problem of merging incremental and full data.

    Is common method of search engine, two online service engine, the engine by using real time increment mode 1, 2 complete engine after full amount data dump, according to the amount of data the latest timestamp, incremental change messages received after the timestamp played back to update the incremental index, until one increment of incremental data match engine message timestamps, Then the online service performs engine switchover. Engine 2 provides online service, and engine 1 performs the full and incremental actions completed by engine 2. This mode can guarantee the complete data merge of full incremental on the basis of online service. But according to the previous introduction, we directly use search as selection engine is unable to meet the demand of real-time selection, and other databases do not have the ability to switch the index online.

The figure above shows the stack conflict of full and incremental data on a field. When the wide table data merge is performed after full prediction, it is necessary to be able to accurately determine how the incremental and full data of the field of the category are updated. Challenge 2: Super high write speed (20W+) and read speed (20W+) on wide tables due to 100 million level full data update. Calculation formula: 1 billion data will be updated within 1 hour, tps27W.

  • Real-time calculation of online selection rules. The change of any field on any commodity needs to participate in the calculation of selection rules to determine whether the current rule still meets the conditions, and then determine whether the downstream selection result set needs to be updated.

    Challenge 3: Selection rule execution efficiency.

  • New selection rule full data real-time calculation and preview. When creating or modifying an item selection rule, you need to calculate all goods based on the new rule and obtain the set of goods that meet the rule. The point here is that the calculation of full goods takes seconds to complete, and the full data itself is real-time data.

    Challenge 4: When search engines are unavailable, big data computing platforms are usually unable to achieve complex calculations at the second level and with multiple columns. What solution should we choose?

  • Real-time selection result set update.

    Challenge 5: The selection results used for the algorithm are often carried by KV storage. After the rule calculation, if the KV storage needs to be read and written, the TPS is calculated according to 200,000 TPS per second and 1000 rules, then the TPS is 200,000 *1000. At present, there is no KV system capable of carrying such a high throughput.

  • Real-time recommendation algorithm.

    Challenge 6: The recommendation algorithm makes recommendations based on the selection pool data, but the most commonly used I2I collaborative filtering based on user behavior is based on the recommendation of a large number of associated goods. Once a small commodity pool is identified, there is a high probability that the data in recall stage will be empty, resulting in the failure of I2I. Challenge 7: How to bring offline i2I, U2I and other data that are not updated in real time online. Challenge 8: How to collect online data in real time to realize online model training and real-time prediction based on real-time data.

2.4 Attempts to adopt PostgreSQL

PostgreSQL, which supports both OLTP and OLAP, was the first to be introduced in line with the core appeal of the selection system in 2.3.

  • High write and query speed, cluster million QPS.

  • Json-format data is naturally supported, and the KEY:JSONVALUE two-column mode ensures fast key-based queries, while the specially designed JSON format and custom data merge UDF support incremental full field level merge.

  • Postgresql-based triggers trigger rule calculation in real time. Rules are implemented by custom functions, see Resources [4].

  • Postgresql sends notification messages to downstream recommendation or search systems through the notify mechanism.

This system basically achieves the design goals and supports some business scenarios, but there are three serious problems:

1. After the UDF is executed, you need to define triggers to send the Notify message. When the number of triggers increases, the system performance deteriorates sharply. It takes 1052 seconds (17 minutes) to execute 50 trigger with 250,000 pieces of data to write. If hundreds of millions of data are written every day, it takes more than 100 hours to complete a full write. Physical hardware has been used, but there is no room for improvement at present.

2. The Notify mechanism adopts the client pull mode, and the client cannot be mutually exclusive in the microservice cluster. Therefore, services need to do a lot of work to ensure that messages are not consumed repeatedly and only once in the cluster.

3. Under KV storage mode, the performance of execution data preview and retrieval is also sharply reduced, which cannot support the ability of real-time preview and new rule export. Based on the constraints of these problems, PG scheme is only used in a small number of scenes, not as the final scheme of real-time selection system for idle fish.

2.5 Attempts to adopt real-time computing platform

When commodity data fields change, real-time calculation of rules is triggered, which itself is a typical event-driven real-time processing process. Blink is a universal real-time computing platform created by Alibaba’s real-time computing team on the basis of Flink, and many capabilities have been enhanced. Also a real-time computing framework, a quick comparison between Storm and Spark.

In the selection of the most attention to three points:

  1. Whether real-time stream processing is supported to ensure latency.

  2. Whether there is local storage, high performance wide table deltas and full merge, and high performance real-time write to the collection of selected items all depend on this.

  3. The cost of subsequent upgrade and maintenance is greatly affected by the maintenance and evolution of a dedicated team.

    From the above three dimensions, Blink is the best choice.

How did Blink solve the technical challenges mentioned in section 2.3? The answer is: rely on Blink StateBackend.

First, I would like to introduce Blink stateBackend. What is State?

State refers to the intermediate calculation result or metadata attribute of the compute node during the flow calculation. For example, if the aggregation process is aggregation, the intermediate aggregation result is recorded in the State. For example, if Apache Kafka is the data source, the offset of the read record is also recorded. This State data is persisted (inserted or updated) during the calculation. To sum up, State in Blink is a snapshot of the internal data (calculated data and metadata attributes) of the Blink task, which is time-related.

State persistence engines currently supported by Blink include

    1. Gemini, high performance memory StateBackend, reduces serialization and JNI overhead compared with Niagara/Rocksdb. State read and write speed is faster and CPU consumption is lower, but the data size is limited in memory. Peak checkpoint size Does not exceed (Number of concurrent requests x 200)MB.

    2. RocksDBStateBackend based on RocksDB, local file + asynchronous HDFS persistence, flink native, relatively slow.

    3. Another one is based on Niagra, StateBackend and Distributed Persistence developed by Ali, which is much faster than RocksDB. So how do you use StateBackend to solve these performance challenges? As described earlier, during aggregation, the State store is started, and we trigger the State store with a custom UDAF (user-defined aggregation function). UDAF writing:


     
  1. / *

  2. * @param <T> The type of UDAF output

  3. * @param <ACC> UDAF accumulator type. Accumulator is the number used to store the intermediate result of UDAF calculation

  4. * According to the type, users need to design each UDAF Accumulator according to their needs. For example, the simplest count UDAF,

  5. * Accumulator can be an accumulator

  6. * /

  7. public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {

  8. / *

  9. AggregateFunction accumulator

  10. * The system calls this method once before it does the aggregate calculation first

  11. * /

  12. Public ACC createAccumulator ();

  13. / *

  14. * The system calls this method after each aggregate calculation

  15. * /

  16. Public T getValue(ACC Accumulator);

  17. / *

  18. * The user needs to implement an accumulate method that describes how the user's input data is computed and updated into Accumulator.

  19. The first parameter to the accumulate method must be an ACC-type Accumulator using AggregateFunction. Running in the system

  20. * During the process, the underlying Runtime code will place the historical status accumulator, and the user specified upstream data (support any amount, any

  21. * type of data) is sent to accumulate as an argument for calculation.

  22. * /

  23. public void accumulate(ACC accumulator, ... [User-specified input parameter]... ;

  24. }

Copy the code

CreateAccumulator creates an aggregator object at task startup. After a new event is generated, accumulate’s first parameter accumulator is the result of the previous calculation. The accumulate function evaluates the new and last aggregated data. The aggregator object is designed as follows.


     
  1. {

  2.    "_mergeRes": {

  3.        "original_price": [

  4. In 1541433610000,

  5.            "84.37"

  6. ].

  7.        "title": [

  8. In 1541433610000,

  9. "[Resell] Curling irons Big Rollers Electric rollers don't hurt short hair Air Bangs inside buckle men..."

  10. ].

  11.        "category": [

  12. In 1541433610000,

  13.            "50025437"

  14. ].

  15.        "status": [

  16. In 1541433610000,

  17.            "0"

  18.        ]

  19.    },

  20. "_ruleRes": "4963_5:0;" .

  21.    "_changeFlag": "1",

  22.    "_ruleDiff": "4963_5:0"

  23. }

Copy the code
  • The _mergeRes field indicates the merged service information. The current idle fish scenario represents the product information. The blink task merges the source item information with the merge result held in memory, and _mergeRes is the result of the merge. _mergeRes is a key-value structure, where value is an array. The 0th bit of the array represents the timestamp of the current property change, and the first bit represents the current property value. The _mergeRes data is finally written to petadata. By carrying its own timestamp in each field, both full and incremental messages are consumed as incremental messages, and as long as each field contains the timestamp from which this data was generated, accurate data merging can be done here. This addresses challenge 1 in 2.3.

  • _ruleRes indicates the rule matching status of the current product information. 0 indicates that the rule is not matched, and 1 indicates that the rule is matched.

  • _changeFlag Flag indicating whether the product information has changed after the merger, 1- has changed; 0- No change.

  • _ruleDiff stores the calculation result of the last rule in State. Each incremental message diff the rule matching result directly, and only the changed rule result will be delivered to the downstream in the form of metaq message, which saves a lot of KV call judgment on the result pool, thus solving challenge 4 in 2.3.

  • Blink’s built-in high-performance storage, Niagara, solves challenge 2 in 2.3. Niagara draws on the architecture of Seastar[5] and adopts the shared-nothing design of Thread per core. The cost of thread lock competition and switch is almost zero, and the code does not consider multi-thread competition, which greatly reduces the logic. Niagara is a fully asynchronous execution engine that uses futures, Promises, and continuations to express asynchronous execution logic. Storage kernel, using LSM structure, which can be more convenient to read, write and space magnification adjustment. Compared with RocksDB with LSM architecture, many aspects of optimization have been made to improve performance, including memtable based on Masstree [6]; The snow-shovelling flush strategy; Adaptive layer expansion strategy; Real-time in-memory compaction.

Pictured above, before entering the Blink calculation procedure, set up a unified heterogeneous data access layer, mainly be responsible for the standardization of different sources of data access, for example, have all measured data from off-line computing platform, real-time data from the log system, with data from a database to monitor real-time database change, also have a call from the online services, The adapter will normalize various source messages into Metaq messages, erase data types in message objects, and complete unified registration and management of data types and verification rules through the metadata center. The metadata center also serves the Blink real-time computing task and is used for data parsing, so as to ensure the global unification of data Shchema.

Due to the blink architecture design, if the blink task upgrade is stopped, all State storage data will become invalid. The introduction of metadata center can also ensure that UDAF code does not need to be modified when fields are added to the wide table, only configuration push is required. All the full data and incremental data are accessed in the way of Metaq messages, so as to ensure that in the data combination calculation of blink, there is no need to consider the current type of data and only need to merge according to the field timestamp. Service fields of the new wide table can be accessed independently in the form of messages rather than in the form of join tables in the offline computing layer, avoiding the problem of slow output of full data caused by too much join dependence in offline computing. Messaging middleware also plays an important role in disaster recovery. In extreme cases of blink system failure, full messages and incremental messages of the day can be completely reinjected into the engine based on the replay of metaq message sites, so as to complete system recovery at the hour level. In addition, there may be a problem of large data when diff data is put into the same aggregator object. A two-level Aggration scheme was considered, but due to the retract mechanism of real-time stream calculation, when the State counter was reduced to 0, the State data would be emptied. As for expression calculation engine, some departments in the company independently implement some high-performance expression engines based on RETE algorithm, but our expression calculation method is relatively broad, and there may be some operator support problems when using self-developed expression engine. The characteristics of several mainstream expression engines are compared.

Overall, we use the MVEL engine as our expression engine.

2.6 Real-time full selection and preview

HybridDB for MySQL[7], a high-performance OLAP database developed by Ali Cloud, is selected, mainly considering its real-time multi-dimensional analysis capability of trillion data at millisecond level, and its good compatibility with big data platforms MaxComputer and Blink. The following figure describes the process of lossless switching of online services through data incremental data double write and online KV storage version switching when creating or changing selection rules.

2.7 Real-time recommendation layer

There are so many things in the real-time recommendation layer that I won’t cover them in detail in this article due to space limitations. Please look forward to the next article.

2.8 Performance Specifications

We named the idle fish’s real-time selection system Mach, which is Mach, the sonic unit.

Mach has hosted almost all the online Feeds services on the idle fish line, including the external Feeds services, which has become the infrastructure of idle fish. In the future, we will continue our in-depth cooperation in system ultimate performance, online algorithm optimization, system stability, upstream investment platform and downstream delivery platform.

Identify two-dimensional code, forward-looking technology is in your grasp

Reference:

[1]RocketMQ: https://rocketmq.apache.org/

[2] a kind of real-time search scheme based on Lucene: http://news.laiwang.com/message/news_view.htm?msg_id=82836187

[3]MaxComputer:https://www.aliyun.com/product/odps

[4]PostgreSQL Notify: https://www.postgresql.org/docs/current/static/sql-notify.html

[5]Seastar: http://www.seastar-project.org

[6]Masstree: https://pdos.csail.mit.edu/papers/masstree:eurosys12.pdf

[7]Hybrid for Mysql:https://www.aliyun.com/product/petadata