Tens of thousands of AI applications have been implemented in many industries, such as anti-fraud in the financial industry, news recommendation in the media industry, and pipeline detection in the energy industry. SparkSQL plays an important role in rapidly implementing feature transformation in these AI applications

In the multi-table scenario, it is used to join operations between tables, such as the transaction information table to join the account table

Use UDFs for simple feature transformations, such as hour function processing for timestamps

SparkSQL has so far solved the problem of offline model training feature transformation well. However, with the development of AI application, people’s expectation of the model is no longer just to achieve offline research effect. It’s about delivering value in real business scenarios, and real business scenarios are model application scenarios, which require high performance, which require real-time reasoning, where we run into the following problems

How much table data offline to online mapping, that is, in the process of batch training input many table, in an online environment these tables should exist in what form, this will also affect the whole system architecture, well done to enhance the efficiency, do well will greatly increase the cost of model to create business value SQL into real-time execution cost is high, because online reasoning requires high performance, Data scientists can make thousands of features, each of which can be converted into human flesh, adding significantly to the cost of engineering

It is difficult to keep offline and online characteristics consistent, and manual conversion leads to consistent performance, which is often difficult to do

Offline effects are great but online effects do not meet business requirements

In the specific anti-fraud scenario, the model application requires TP99 20ms to detect whether a transaction is fraudulent, so the application performance of the model is very high

How does the ## Fourth Normal Form Feature Engineering database solve these problems

1, in the form of a database, solved the problems of the offline table to online mapping, we how to answer is offline table in front of the distribution, online is how distribution 2, by the same set of code to execute the offline and online characteristics of the transformation, let effect are obtained by the online model 3, data scientists cooperating with business development team to SQL to transfer media, 4. SQL accelerated by LLVM can be 2 ~ 3 times faster than SparK2. x and 3.x implemented by Scala in sequence complex feature scenes. In-memory storage online can ensure that SQL can return results in a very low delay

Quickly turn the Spark SQL model into a live service demo

The model training scenario of Demo is to predict the time required for a taxi ride to the end. Here, we will use feDB, PySpark, LightgBM and other tools to build an HTTP model inference service, which will also be spark’s practice in machine learning scenarios

The entire demo was over 200 lines of code and took less than half an hour to make

1, Train_SQl. py feature calculation and training, 80 lines of code 2, predict_server.py model inference HTTP service, 129 lines of code

## Scene data and characteristics of the entire training data as follows

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff _latitude store_and_fwd_flag, trip_duration id3097625, 22 16:01:00 6-01-1201, 2016-01-22 16:15:16, 2, 73.97746276855469, 40.7613525390625, 73.95573425292969, 40.772396087646484, N, 856 id3196697, 6 1201-01-28 07:40:16 07:20:18, 2016-01-28, 1, 73.98524475097656, 40.75959777832031, 73.99615478515625, 40.72945785522461, N, 1198 31 00:48:27 id0224515, 6-01-2201, 2016-01-31 00:53:30, 1, 73.98342895507812, 40.7500114440918, 73.97383880615234, 40.74980163574219, N, 303 id3370903, 1201-01-14 June 12:25:33 11:46:43, 2016-01-14, 2, 74.00027465820312, 40.74786376953125, 73.86485290527344, 40.77039337158203, N, 2330 Id2763851, 6 2201-02-20 13:21:00, 2016-02-20 13:45:56, 1, 73.95218658447266, 40.772220611572266, 73.9920425415039, 40.74932098388672, N, 1496 id0904926, 6 1201-02-20 19:33:19 19:17:44, 2016-02-20, 4, 73.97344207763672, 40.75189971923828, 73.98480224609375, 40.76243209838867, N, 935 Id2026293, 6 1201-02-25 01:16:23, 2016-02-25 01:31:27, 1, 73.9871597290039, 40.68777847290039, 73.9115219116211, 40.68180847167969, N, 904 id1349988, 6 1201-01-28 20:21:36 20:16:05, 2016-01-28, 1, 74.0028076171875, 40.7338752746582, 73.9968032836914, 40.743770599365234, N, 331 Id3218692, 6 2201-02-17 16:43:27, 2016-02-17 16:54:41, 5, 73.98147583007812, 40.77408218383789, 73.97216796875, 40.76400375366211, N, 674Copy the code

Scenario Feature change SQL script feature change

select trip_duration, passenger_count,
sum(pickup_latitude) over w as vendor_sum_pl,
max(pickup_latitude) over w as vendor_max_pl,
min(pickup_latitude) over w as vendor_min_pl,
avg(pickup_latitude) over w as vendor_avg_pl,
sum(pickup_latitude) over w2 as pc_sum_pl,
max(pickup_latitude) over w2 as pc_max_pl,
min(pickup_latitude) over w2 as pc_min_pl,
avg(pickup_latitude) over w2 as pc_avg_pl ,
count(vendor_id) over w2 as pc_cnt,
count(vendor_id) over w as vendor_cnt
from {}
window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),
w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW)
Copy the code

We chose vendor_id and Passenger_count for timing features


train_df = spark.sql(train_sql)
# specify your configurations as a dict
params = {
    'boosting_type': 'gbdt'.'objective': 'regression'.'metric': {'l2'.'l1'},
    'num_leaves': 31.'learning_rate': 0.05.'feature_fraction': 0.9.'bagging_fraction': 0.8.'bagging_freq': 5,
    'verbose': 0}print('Starting training... ')
gbm = lgb.train(params,
                lgb_train,
                num_boost_round=20,
                valid_sets=lgb_eval,
                early_stopping_rounds=5)
gbm.save_model('model.txt')
Copy the code

Execute the model training process, resulting in model.txt

## Model inference process import data code import

def insert_row(line):
    row = line.split(', ')
    row[2] = '%dl'%int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)
    row[3] = '%dl'%int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)
    insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row)
    driver.executeInsert('db_test', insert)
with open('data/taxi_tour_table_train_simple.csv'.'r') as fd:
    idx = 0
    for line in fd:
        if idx == 0:
            idx = idx + 1
            continue
        insert_row(line.replace('\n'.' '))
        idx = idx + 1
Copy the code

Note: train. CSV is the version of training data in CSV format

Model inference logic predict.py

def post(self):
        row = json.loads(self.request.body)
        ok, req = fedb_driver.getRequestBuilder('db_test', sql)
        if not ok or not req:
            self.write("fail to get req")
            return
        input_schema = req.GetSchema()
        if not input_schema:
            self.write("no schema found")
            return
        str_length = 0
        for i in range(input_schema.GetColumnCnt()):
            if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) == 'string':
                str_length = str_length + len(row.get(input_schema.GetColumnName(i), ' '))
        req.Init(str_length)
        for i in range(input_schema.GetColumnCnt()):
            tname =  sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))
            if tname == 'string':
                req.AppendString(row.get(input_schema.GetColumnName(i), ' '))
            elif tname == 'int32':
                req.AppendInt32(int(row.get(input_schema.GetColumnName(i), 0)))
            elif tname == 'double':
                req.AppendDouble(float(row.get(input_schema.GetColumnName(i), 0)))
            elif tname == 'timestamp':
                req.AppendTimestamp(int(row.get(input_schema.GetColumnName(i), 0)))
            else:
                req.AppendNULL()
        if not req.Build():
            self.write("fail to build request")
            return
 
        ok, rs = fedb_driver.executeQuery('db_test', sql, req)
        if not ok:
            self.write("fail to execute sql")
            return
        rs.Next()
        ins = build_feature(rs)
        self.write("----------------ins---------------\n")
        self.write(str(ins) + "\n")
        duration = bst.predict(ins)
        self.write("---------------predict trip_duration -------------\n")
        self.write("%s s"%str(duration[0]))
Copy the code

## Final execution effect

Send an inference request and see the following outputPython3 predict. P y -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - ins -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- [[2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 1. 1.]] -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- predict trip_duration -- -- -- -- -- -- -- -- -- -- -- -- -- 859.3298781277192 sCopy the code

Run the demo please go to https://github.com/4paradigm/SparkSQLWithFeDB