Abstract: This paper is based on a speech delivered by Liu Chenglong, project leader of real-time financial data warehouse of CITIC Jiantou Securities, and CAI Yue, r&d engineer of financial information data, at Flink Forward Asia 2021. The main contents include:
- Citic Jiantou Securities Flink framework
- Flink stream processing scenario
- Real-time transformation of financial information
- future
Click to view live replay & Presentation PDF
Founded in 2005, CITIC Jiantou Securities Was listed on the Hong Kong Stock Exchange in 2016 and the Main board of the Shanghai Stock Exchange in 2018. The investment banking business has been ranked top 3 in the industry for 8 consecutive years. The scale of custody securities ranks second in the industry. The main operating indicators are now listed in the top 10 of the industry. With the rapid development of the company’s business and technology, digital transformation is becoming our focus in recent years.
As the financial industry involves many areas of business, the company over the years accumulated a large number of complex is strongly associated with the business data, the basis of the discovery of problems, analyze problems, solve the problem in the process, how to coordinate the business before, during and background as well as the aspects of science and technology department cooperate to conduct business caliber of combing with the development of the processing logic, become the key problems to be solved.
I. Flink framework of CITIC Jiantou Securities
The data center architecture is shown in figure 1. It is mainly divided into the following sections: data center section composed of Greenplum data Warehouse and Hadoop big data platform; Offline development, real-time development, data exchange based data development plate; And data portal, data gateway, data governance, operation management and other parts.
Among them, the current tasks of data development are mainly offline development and offline data processing of data exchange. However, with the improvement of data timeliness, the T +1 business model based on offline batch processing has been unable to fully meet the demand for information timeliness in the current market environment, which is also the purpose of vigorously developing real-time development and striving to provide customers with more timely data services.
We take the real-time development of the whole link as an example to illustrate the interaction between each plate in the data center.
The real-time development module is entered from the unified entrance of data portal. Firstly, the real-time incremental data of business information such as centralized trading and margin trading are pulled to the Kafka message queue. Flink consumes Kafka real-time stream data and processes data with dimension table data. When a large amount of dimension table data is involved in the processing logic, it is necessary to develop and exchange data offline, and prepare data for the dimension table by running batch data offline. Finally, the resulting data is written to a relational or NoSQL database. The data gateway then generates API interface by reading the result data and provides data service to the downstream system.
The data management and control module in the data governance section mainly manages the metadata of database tables in the data center and business-related database tables. Users can subscribe to the change information of the database tables they care about in the data portal. When the subscribed data table changes, the operation center can notify subscribers of the database table changes through the unified alarm module through multiple channels, so that developers can adjust the data processing tasks in a timely manner.
Flink real-time stream processing architecture first collects CDC logs from business databases using Attunity tools, and writes database table changes under the same system to a Topic queue in Kafka. This means that each topic in Kafka has multiple table data. Therefore, in The Kafka source of Flink, the two fields of Schema and Tablename should be filtered once to obtain the CDC data flow of the data table to be obtained, and then the processing logic of the subsequent dimension table should be carried out. The processed data is written into the result table and stored in different databases according to different requirements.
The selection of database generally follows the following principles:
- When the amount of data is relatively small and does not require high concurrency, relational databases are usually used for storage.
- When a large amount of data is required and high concurrency is required, HBase is used as the storage medium.
- In the case of small amount of data but high concurrency, Redis is selected for caching.
- ES is generally chosen as the storage component in cases involving a large amount of data retrieval.
Securities industry data has two distinct features:
- One of them is that the opening time is fixed, and the data volume of a large number of businesses will be greatly reduced after the closing of the market, and even some businesses will not generate new data after the closing of the market. Therefore, to save resources, we will set the start and stop time for those tasks closely related to the opening time according to the actual situation.
- The second feature is the importance of financial data. Data deviation is not allowed in a large number of scenarios. In view of the high requirement of data reliability, we set offline tasks for data correction at night for a large number of real-time tasks to ensure the correctness of data.
Second, Flink flow processing scenario
The following are several practical scenarios to illustrate the application of Flink stream processing. It is mainly divided into three scenarios: real-time indicator statistics of retail business, real-time indicator statistics of fund investment and detailed query of capital flow.
2.1 Retail Service Scenario
Real-time indicators of retail business lines are an important part of management cockpit. Decision makers can make reasonable decisions on the operation and development of the company by analyzing the company’s operating indicators.
To design real-time data warehouse for retail business, statistical indicators of opening statistics, customer service and APP operation need to be obtained. According to real-time data processing architecture and hierarchical design of data warehouse, real-time data warehouse for retail business can be divided into the following processes:
- Firstly, ODS layer data is constructed to collect CDC logs of customer information table, business flow table, channel table and other related basic tables in real time. The data table of each business library is connected to the ODS layer of a Kafka topic to establish real-time data warehouse.
- Secondly, DWD layer data modeling, creating Flink task consumption Kafka message of ODS layer, data cleaning, filtering, desensitization, association conversion and other processing. At the same time, data confluence is carried out based on the granularity of customer accounts, and the off-line dimension table is used to expand the scope of operations to obtain a detailed list of account granularity and realize the establishment of DWD layer.
- After that, the data modeling of DWS layer is summarized based on the data of DWD layer. By analyzing business requirements, the data of DWD layer is divided according to topics, and the public indicators such as channel service subject width table, business department operation subject width table and trading product subject width table are summarized to establish the DWS layer.
- Finally, according to the actual business requirements, calculate the business indicators and establish the ADS layer. Some business indicators of granularity of user accounts can be directly calculated through details of THE DWD layer. Some coarse-grained business indicators, such as the number of customers serving APP channels and the number of customers reading products, can be calculated through the DWS layer. The final calculation results will be connected to the data gateway and the data will be uniformly provided to the downstream system or displayed through the BI system.
Hierarchical management of real-time data warehouse can bring two benefits:
- The first is to avoid the smokestack data development mode, no need to consume Kafka ODS layer data from all tasks, reduce the time overhead, more conducive to data recovery, and can support flexible analysis of different business topics;
- Secondly, in the case of data processing errors, it is easier to determine which layer of data processing logic is wrong, and shorten the error time.
2.2 Statistical scenario of real-time index of fund investment
Fund business is becoming more and more important in the securities industry. It can provide real-time sales information of fund investment products and provide data support for fund investment to adjust strategies when considering. The data of the fund investment scenario has three characteristics:
- First, the scale of data involved is relatively small;
- Second, the data is provided to the company’s internal personnel for review at the opening time;
- Third, the accuracy of data is particularly high.
For the small amount of data, we output the data index results to Oracle relational database. In view of the characteristic that data will be provided to internal personnel at the opening time, we open the start-stop strategy of real-time tasks, leaving more resources for the tasks running at night. In view of the high requirement of data accuracy, we corrected the data by off-line batch running at night to ensure the accuracy of the data.
The original scenario was that the page triggered a stored procedure to read the data, which was not the source system data, with a minute-level delay. The real-time data processing scheme enables business departments to master core data more efficiently by pushing real-time indicators of customer addition, addition, signing, retention, signing rate, scale and other dimensions.
2.3 Real-time ETL- Capital flow scenario
This scenario allows business personnel to quickly query the transaction details of customers within a certain period during the opening period. It needs to solve three problems:
- First, the detailed capital flow, a total of billions of data, under the condition of a large amount of data, how to quickly query?
- Second, the opening time to meet the business staff query, and the amount of data in non-opening time is small, whether to adopt timing scheduling?
- Third, the capital flow must not make mistakes, how to ensure the accuracy of data?
In view of the large amount of data, we finally choose Hbase component to store data. By reasonably designing rowkey and establishing region partition, we can quickly query the details of fund flow within a specified period. In view of the small amount of transaction data in non-opening time, the timing start-stop strategy of tasks is enabled to save more resources for nighttime batch tasks. In view of the high requirement of data accuracy, off-line data correction method is adopted to meet the requirement of accuracy.
Iii. Real-time transformation of financial information
In the financial sector, there are various news bulletins and other information that every market participant reads and follows most frequently. Our company’s definition of information not only includes the above traditional meaning of information, but also takes into account the complexity and diversity of data itself and the actual flow process of collection, management and application, we have redefined information, that is, all non-user and non-transaction-related data are financial information.
Our center collects the following four categories of financial information data, the most common ones are news, announcements and research reports, as well as securities market data related to the trading market, such as currency, stocks, bonds, funds and derivatives, as well as macro industry data of various dimensions. The last category is the all-embracing other and derivatives. It covers all kinds of data analyzed by other third parties based on raw market data, such as company public opinion, fundamental analysis and forecast.
If trading and users are compared to the bones and meridians of the financial market, information data is the blood of our financial market, which is produced from the former throughout the body and continuously.
Then how to flow the information data of various categories? It is very simple, as shown in the figure: the bottom layer is the data source we introduced. At present, most of the information data has been collected and sorted by Wind, flush and other information data providers, so we can obtain all kinds of basic data without spending too much time and cost.
But with the introduction of more data quotient, there are problems. If one of the data vendors has a problem and the cooperation cannot continue, the data service must also be affected. In order to solve this problem, we introduced the concept of central database and built a set of financial data model by ourselves. All downstream systems are connected with the data structure of central database, and we are responsible for shielding information data vendors, which is the second layer in the figure.
There is also a small module on the far right of the figure called data direct. In practical application, not all downstream systems are suitable for docking, and some still rely on the original data quotient structure, so this small interface still exists and outputs data services together with the central library in parallel.
The top layer is the service object, which covers the various lines of business within the company and continuously feeds the various business systems.
Under the overall three-tier architecture, the increasing number of data sources and data types improve the overall quality of our data services and enable us to serve more customers. At the same time, the central library as the core of the architecture, improve the overall service ability to resist risks, risk prevention is the most important thing for financial companies.
Our early work mainly focused on these two points. When these two functions were gradually improved and stabilized, our focus gradually shifted to information data transmission and information content optimization. The market is changing rapidly. The shorter the time it takes for data to spread over the link, the higher the value of information in terms of time. There is no upper limit for transmission speed, and the faster the better, which is the data transmission efficiency. However, when the data is fast and the quality of upstream data providers is uneven, the service is only fast and inaccurate, and the data provided to users has problems, so how to control the quality of data content without losing 1, 2 or 3 points has also become a thorny problem.
In order to optimize points 3 and 4, we took Flink engine as the core to carry out architecture transformation and selected two scenes to share.
3.1 Dragonfly Gold APP F10 news scene
Dragonfly Gold APP mainly provides financial information, data services for the majority of investors to browse. This is the first version of the solution. The main process is to label news from the upstream tagging system, flow into Kafka, and then into the central library that we have just designed. The data is extracted and converted for downstream use, transferred to the interface library, and finally served through the API.
In order to timely obtain the changes of the database, we selected Canal, which is lightweight and easy to integrate, to implement it among many CDC tools. By capturing changes to the database, the developer wrote a program that read the subscription Canal data in real time, parsed and combined the data into the data format required by the business, and then actively wrote the updates to Redis at the top. The downstream users can obtain the latest information data when using the related interface without waiting for the data to expire passively.
After the first scheme has been running for a period of time, we found two problems: one is that the link is too long, which will lose timeliness; Second, the active write caching process has gradually become an important part of the whole information service. However, as an open source tool, Canal is still improving its functions, such as program monitoring and alarm, which need to be developed independently. In addition, Canal is also slightly inadequate in terms of stability and high availability.
For this reason, we introduce Flink to adjust the data link and data processing link. Data processing, the use of Flink efficient ETL capabilities, the introduction of high timeliness requirements of information data processing scenarios, while Flink as a streaming computing engine, natural and Kafka integration, can be seamless docking, with direct output of information to Kafka capabilities of the system, such as news label system. The community has been refining various connectors, such as the CDC approach to provide more room for Flink ETL capabilities. At the same time, the support of Redis Sink also enables the original cache program and business logic to be integrated into Flink for unified implementation. Finally, the whole information data processing process is centralized management, shorten the link and save the transmission time.
The powerful ETL capabilities reduce the complexity of the architecture and save the legacy set of components. As a whole, the distributed high availability architecture runs through the upstream, middle and downstream, enabling the steady and efficient output of information service capabilities. In the long run, information data can be used in a wide range of applications, from a variety of sources and outputs, and Flink’s increasingly rich connectors can support further expansion of data sources and destinations, making it possible for information data to handle more scenarios.
3.2 Cross-check scenario of multi-source Data
It would be nice to have a single architecture to solve all problems, and we tried this in a scenario of cross-checking multiple sources of data. This scenario addresses the problem of controlling data content quality. The data is faster and can be solved by technical means, but the more accurate data is not something we can control in the middle link.
The upstream relies on many data providers, which may obtain data through crawler collection, manual input and data reception. Diverse data types and various links lead to uneven data quality that we receive, and problems will be directly transmitted to the downstream and amplified step by step. Because we are far away from the source, it is impossible to provide accurate data services across the data vendors, so we have to provide timely error correction services as the next best thing.
There is fierce competition among data vendors in the market, so we are very lucky that we can get multiple copies of most of the basic data, which makes it possible to discover data differences. By cross-checking multi-source data, we can obtain data differences and timely remind and correct errors.
The earlier a problem is detected throughout the service link, the lower its impact. So how do you spot problems earlier? It is divided into the following three steps:
- The first step is to ID zazi, financial market share everyone know, trading standards, coding standards, a yard throughout all of the data, but more debt and the base is not standard, such as stock data tend to design code for financial entities, inside the data business is local only, so if you want to do cross test, first of all, to solve the problem of ID pull together, It’s a physical job.
- The second step is indexation. Data verification needs are often specific, such as daily stock closing price verification. However, the data structure of the data quotient for the check point is often quite different. Therefore, the index generation logic for the multi-source library is written by using Flink SQL through indexing to draw the heterogeneous data structure together.
- The third step is the real-time verification window. It’s easy to get started, just run the script and compare the numbers periodically. However, with the increasing requirement of index verification and the increase of data volume, the script processing capacity of batch running is slightly weak. Therefore, using the window characteristics of Flink, we developed a real-time verification window for verification, aggregated the required verification indicators, triggered the verification window calculation in time and quantity dimensions, and output the results to Kafka, which can support the real-time push of messages.
Flink supports two types of Windows, one based on time and the other based on quantity. If both the time and quantity dimensions are controlled, multiple user – defined window assignments can be achieved by using global Windows plus triggers. A few lines of pseudocode have been placed in the diagram. On the global window, the trigger determines when the element arrives and when the timer arrives.
In the verification window, maxCount is used to judge whether the multivariate index data has arrived, then the window function is triggered and the index value is compared. If there is a problem in a certain data transmission and the corresponding index value does not arrive, it needs to be controlled in the time dimension, define the maximum duration of the window, then no longer wait, directly trigger the window function, and define the corresponding data source index as delayed arrival. The final result output is shown in the upper right table. Both technical and business personnel can respond to the verification results in a timely manner, ultimately achieving a detour to improve the accuracy of data services. Differences are found and dealt with in a timely manner to minimize the impact on downstream.
The application of Flink in the financial industry, I believe there are more scenarios worth exploring. Taking the open source community this train express, so that our brokerage financial information services can be qualitative improvement.
Iv. Future prospects
Finally, some future prospects for real-time streaming processing are shared, including some scenarios being communicated and an exploration of the direction of streaming batch integration.
The scenarios in requirement communication are divided into the following aspects:
- Account assets, including real-time asset holding index statistics, customer trading profit and loss, trading records analysis;
- Marketing knowledge, including moT loss customer reminder and recall, failed account opening customer reminder and tracking, liangrong business potential new customer mining, e-commerce APP activity content and content operation;
- Risk control, including the customer dimension of the concentration of positions, the company dimension of the amount of financing in the company’s net capital and other indicators analysis and statistics.
On the other hand, our project team is investigating OLAP multidimensional analysis components. Since the current real-time development is still using Lambda architecture, The result table storage component involves relational databases such as MySQL, SQL Server, Oracle, and NoSQL databases such as HBase, ES, and Redis. Data isolation is a serious problem we are facing at present. It is hoped that OLAP component can realize the unified writing of real-time data and offline data, realize the integration of stream and batch, break the current situation of data isolation, and hope to achieve the purpose of unified storage, unified external service, unified analysis and processing in the integration of stream and batch storage layer.
Click to view live replay & Presentation PDF