Abstract: In 2017, Ofo put more than 10 million bikes into the market, and the launching, operation and dispatching of these bikes require a lot of data. This paper will share the big data development path of Ofo in MaxCompute from the reasons ofo chooses MaxCompute and the actual application of data integrity, task scheduling and Proxy service.

Limin Long, Ofo Big Data, Deputy Director of Big Data.


PPT material download address: click.aliyun.com/m/100000306…

Video address: click.aliyun.com/m/100000306…

Address: product click.aliyun.com/m/100000306…

This sharing mainly includes the following contents:

Ofo select MaxCompute

Two, actual combat application

1. Data integrity

2. Task scheduling

3. The Proxy service

Ofo select MaxCompute

First, a look back at 2016. At the time, Ofo’s data analysts were using primitive methods like Excel and MySQL to create reports. In this context, a developer was asked to develop a data platform in two weeks.



So, how to accomplish this task? First, analyze the main parts of a data platform. Among them, the primary problem is cluster (under big data only using MySQL often appear to check the situation). Once you have a cluster, you need to load data, which involves ETL. For the outside world, they care more about the data itself, so they also need BI platform, which also needs a lot of investment. With the BI platform, reports can be made on the platform, and report scheduling is involved.



Among them, the most important problem is clustering. Should you build your own cluster or use cloud services? In making this choice, the following six dimensions are mainly considered.

· Storage: In fact, storage also determines performance. Alibaba Cloud uses ORC, which is a type of column storage. MaxCompute uses column storage as well.

· Computing: The requirement of computing performance is to reduce time consumption. For example, if a SQL statement is executed for 20 or 30 minutes, this computing performance is clearly unacceptable.

· Expense: Expense is usually not a consideration. For the average small business, MaxCompute postpay is the best option.

· Stability: Stability can only be reflected by long-term use, so it is not overemphasized here.

· UDF: The characteristics of shared bikes determine the calculation involving a large number of “points”. The UDF function must be used here, so if UDF is not supported, it is not an option.

· Documentation: MaxCompute documentation is very detailed.


We chose MaxCompute based on a number of factors. So what were the results after a year and a half of use? Here are a few examples.

· Real Hammer 1: the amount of SQL written by a colleague in one year of working at Ofo exceeds the total amount of the previous 5 years;

· Real hammer2: compare self-built EMR cluster and MaxCompute: cluster cost 2 vs 1, o&M cost 6 vs 1;

· Shihammer iii: Under the premise of good business operation, the daily cost of the new incubation project is less than 50 yuan.


Two, actual combat application

Here are the reasons for choosing MaxCompute, and here are some experiences with using it.

1. Data integrity: Inaccurate data is the biggest concern of data analysts. But what’s more worrying is that when you look at the data, you don’t know if it’s accurate or not! An important cause of this problem is incomplete data. For example, one million pieces of data were generated yesterday, but only 990,000 were uploaded. Therefore, it is important to ensure data integrity.

· Definition of data integrity: the program ensures that the data of T+1 day is complete and non-split, that is, atomic;

· Practice of not paying attention to data integrity: calculation is agreed by time, and calculation dependence between data is also based on time;

Consequences of not paying attention to data integrity: it is difficult to find data errors, requiring human troubleshooting; If it is not logically resolved, it will repeat.

There are only two scenarios for expected data integrity: either there is data and it must be true, or there is no data at all.



In practice, how to solve the problem of data integrity? The solution mainly includes the following points.

· Use tunnel upload command to upload data without SDK; Tunnel upload = tunnel upload = tunnel upload = tunnel upload = tunnel upload = tunnel upload = tunnel upload = tunnel upload The SDK is row-level upload.)

· Maintain data markers. (When data is uploaded to MaxCompute, it is marked for the day, and subsequent computes are based on this mark. Data that is not ready is not evaluated.)



After doing these points, it has played a very significant effect in the practical application: there is no inaccurate data caused by incomplete data.

Once data integrity is guaranteed programmatically, there is another problem to consider: how to resolve the data integrity of spontaneous queries. For example, when querying HUE data, the user does not know whether the data is complete. As for the solution, here are some hints, which will be discussed in detail later.

2. Task scheduling: Nearly a thousand reports need to be scheduled and calculated every day, so the relationship between reports may be interdependent. How to cooperate effectively is a problem to be solved in task scheduling.

Task scheduling is divided into the following three types.



Intermediate tables, wide tables: We call the most primitive tables, such as order tables, coupon tables, etc. generated on a daily basis. However, these tables need to be associated in real queries. For example, if you want to query the coupon information in an order, you need to write a JOIN statement for each query if you do not create a wide table.

Calculation report: a table used for statistics after calculation.

Result wide tables: Calculation reports are stored in the database, which can result in a very large number of tables in the database. Create wide tables of results so that analysts can find the indicators they want to analyze.

The following figure shows the expectations for task scheduling.



First, concurrency, multi-machine multi-process, in order to reduce the process hung up the impact of server hung up.

Second, collaboration requires the ability to establish dependencies. For example, a table is evaluated first and then dependent tables are evaluated.

Third, can monitor, when the failure can be timely alarm.

Fourth, scalability. The statements written in task scheduling are not only SQL, but also Python scripts or shells. Fifth, resource isolation, in the resource scheduling should pay attention to, do not let the large SQL resource occupied, once the resource is occupied, the whole calculation will be stuck.

The following describes the technical framework of task scheduling used in practical applications. The producer obtains data from the database, verifies data integrity and dependency relationship, verifies whether the state is ready, enters the queue after verification, and the state changes to waiting. The consumer obtains data from the queue, changes the state to RUNNING, and finally writes the state back to the database. In this process, each task needs to synchronize the state of its heartbeat to the database. For example, if a producer is down, if there is no heartbeat mechanism, the task it gets may be in a waiting state forever.



Task scheduling resource optimization and isolation

MaxCompute can be used in two ways: prepaid and postpaid. Pre-paid, there is a separate resource pool in which resources are available but capped and have been pre-paid. Postpaid, there is a shared resource pool, people need to grab resources.



In practical application, the following rules are included:

· Post-payment for large tasks

· Use pre-paid for high-priority tasks

· Fill up your prepaid account first

· Prepaid queue is full, use postpaid

3. The Proxy service

The following figure shows the problems that Proxy endpoint can solve.



· Solve repeated execution: for example, two people execute the same SQL statement repeatedly, and the data is not updated. So the second time, it returns the result of the last time. In this way, the second query does not consume MaxCompute resources. This reduces execution time and improves the experience. At the same time, reduce the cost of resources and save costs.

· Security control: key will not be exposed to the outside world, and business free accounts will be established. Different people will have different accounts. At the same time, build an Intranet IP address whitelist. The whitelist of MaxCompute is for the external network, and there are many machines on the internal network. If all Intranet machines have access permission, it is not secure.

· Convenient statistics: SQL overhead counts to people, and can be easily charged by part.

So what should be done in practice? Generally speaking, it can be divided into the following two schemes.



Scheme 1: Proxy forwarding. The data is forwarded to MaxCompute and then returned via response.

Scheme 2: The server invokes the SDK. Using MaxCompute SDK, after each request is obtained, the parameters in the request are parsed and returned to the SDK.

Due to the large workload of plan 2, we chose Plan 1, which has the following advantages.

· Small development workload

· Pyodps upgrade is also not affected

· Compatibility with potential API interfaces

· Only realize our free account system

· IP whitelist control


The following figure shows the core code.



Here’s a quick look at some of the code.



All urls are judged by rules. The more urls are written in the regular expression, the more they are matched.



It is mainly used to solve the problem of repeated execution of SQL code.



The command line is used to solve problems. MaxCompute is divided into two main entrances, one is SDK, the other is command line. The SDK is relatively easy to implement. The command line generates its own taskname, and each request checks its Taskname.



Also, when building security controls, be sure to have your own signature. We can only use the prefix of the SSID uploaded by the client instead of the signature uploaded by the client.

The above code has implemented the overall process, but there are still some problems in the specific implementation process.

Difficulty 1: How to ensure that the optimized results are consistent with the actual execution results?

· Extract table information and partition information from SQL

· Obtain updated information of table data within a certain time delay

Solution:

· Build SQL syntax tree, extract table, partition is not solved yet

· Start a new process to capture the last modification time of tables and partitions

Difficulty 2: adaptation of command line return. Why?

· Task name is generated by the client, for example, console_query_task_152696399361

· Both TaskStatus and InstanceProgress check the task name in the message returned by the server. If the task name is inconsistent with the task name of the client, FAILED: Task status unknown will appear

Solution: The client will find the same task name as itself from all the task names on the server.

· Save the task name of all requests in history

· Return all task names

Good results have been achieved through Proxy service:

· Improved experience. Specific examples: the first SQL execution takes 70 seconds, and the second execution takes more than 0.9 seconds;

· Reduced the cost, the overall cost reduced by half;

· Improved security control without exposing serCREt_key to colleagues;

· Each business is assigned one account to facilitate cost statistics;

· Solved the data integrity issues mentioned earlier.

At present, the core codes of ODPS Proxy, task scheduling and data processing used by Ofo are open source.



The original link