Welcome to Tencent Cloud + community, get more Tencent mass technology practice dry goods oh ~

This article was published by Week in cloud + Community

I. Data Source

1. Similar population data is stored in TDW database, and the data dictionary explains:

CREATE TABLE sim_people_tdw_tbl(
    uid STRING COMMENT 'reader id',
    sim_uids STRING COMMENT 'sim_uids',
    sim_num BIGINT COMMENT 'sim_num',
    update_date STRING COMMENT 'update_date'
)
Copy the code
field type meaning
uid string The user id
sim_uids string People with similar preferences to UID, in the format of user number: same number of reads, separated by commas between similar users
sim_num BIGINT The number of similar people
update_date string The data date

2. Basic user portraits are stored in MongoDB

field meaning
_id The user id
Profile (offline) Positive (real-time) Each dimension is separated by a semicolon, and each sub-dimension is separated by a comma. The value format is key_id:weight, and the meanings of the dimensions are primary category, secondary category, keyword, topic, and reading source in sequence
negative Negative portrait (dislike), other fields have the same meaning as positive portrait
update_time Update time
CityCode or city City code

3. Portraits of similar people also exist in MongoDB

Second, the overall idea

Since TESLA cluster cannot operate MongoDB directly, the user portrait data in TDW needs to be exported to HDFS through lowe subsystem, and then merged with the original group portrait in MongoDB.

3. Algorithm flow

Core code

#! The/usr/bin/python2.7
# -*- coding: utf8 -*-
import decimal
import time
import math
import sys
import os
import param_map
from pymongo.collection import Collection
from decimal import Decimal
import datetime

reload(sys)
sys.setdefaultencoding("utf-8")
sys.path.append(".. /")
from utils import mongoUtils, confUtils

decimal.getcontext().prec = 6
BATCH_NUM = 100000

now_time = datetime.datetime.now()
delta = datetime.timedelta(days=30)
delta30 = now_time - delta
time_limit = int(time.mktime(delta30.timetuple()))
print(time_limit)


def split_uid_similarity(uid_num_str):
    """Split uid and similarity and return: param uid_num_str: :return:uid, similarity"""
    uid_num = uid_num_str.split(":")
    return uid_num[0], float(uid_num[1])


def split_uid_sim_user(user_hd):
    """Split uid and similar people and return: param user_hd: :return: uid, similar people"""
    uid_sim_user = user_hd.strip().split("\t")
    return uid_sim_user[0], uid_sim_user[1]


def dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str):
    """ :param dimension_profile: :param min_i: :param max_i: :param limit: :param cluster_profile_str: :return: Return the first limit of feature labels and map the feature weights."""
    iflen(dimension_profile) ! = 0:# sort first
        dimension_profile = sorted(dimension_profile.iteritems(), key=lambda c: c[1], reverse=True)
        # map the previous limit bar record
        size = limit if len(dimension_profile) > limit else len(dimension_profile)
        for i in range(size):
            tag = dimension_profile[i]
            tag_id = tag[0]
            tag_value = tag[1]
            tag_value = max_i if tag_value > max_i else tag_value
            if tag_value >= min_i:
                cluster_profile_str = cluster_profile_str + str(tag_id) + ":" + str(tag_value) + ","
        iflen(dimension_profile) ! = 0:If the length is not 0, delete the last comma
            cluster_profile_str = cluster_profile_str[:-1]
    return cluster_profile_str


def cluster_profile_dic2list(cluster_profile, dimension_param_dic):
    """Dic ->list :param dimension_param_DIC: Dimension threshold :return: similar user group characteristics list :param Cluster_profile: group profile"""
    cluster_profile_str = ""
    if len(cluster_profile) == 0:
        return None
    for key, dimension_profile in cluster_profile.items():
        Fetch the dimension threshold
        dimention_param = dimension_param_dic.get(str(key))
        if dimention_param is not None:
            min_i = dimention_param.get("min")
            max_i = dimention_param.get("max")
            limit = dimention_param.get("limit")
            if dimension_profile is not None:
                cluster_profile_str = dimension_profile_limit(dimension_profile, min_i, max_i, limit,
                                                              cluster_profile_str)
        A semicolon is required for either # values or None
        cluster_profile_str = cluster_profile_str + ";"
    cluster_profile_list = cluster_profile_str[:-1].split(";")
    return cluster_profile_list


def sim_users_dic2list(cluster_dic, sim_users_max_size):
    """# similar population limit, dic->list :param sim_users_max_size: maximum similar population :type cluster_DIC :param cluster_DIC: similar population dictionary :return: The most similar people."""
    user_similarity_list = sorted(cluster_dic.iteritems(), key=lambda b: b[1], reverse=True)
    sim_users_s = ""
    i = 0
    new_cluster_dic = {}
    for i in range(len(user_similarity_list)):
        if i < sim_users_max_size:
            user_similarity = user_similarity_list[i]
            key = user_similarity[0]
            value = user_similarity[1]
            new_cluster_dic[key] = value
            sim_users_s = sim_users_s + key + ":" + str(value) + ","
        else:
            break
        i = i + 1
    sim_users_list = sim_users_s[:-1].split(",")
    return sim_users_list, new_cluster_dic


class ClusterProfileComputer(object):
    cf = confUtils.getConfig(".. /conf/setting.conf")

    def __init__(self, environment):
        self.xw_database, self.xw_client = mongoUtils.getMongodb("XW")
        self.pac_database, self.pac_client = mongoUtils.getMongodb("PAC")
        self.om_database, self.pac_client = mongoUtils.getMongodb("OM")
        item = "LOCAL_SIM_USERS_PATH" if environment == "local" else "SIM_USERS_PATH"
        self.sim_users_path = confUtils.getFilePath(self.cf, "SIM_USERS", item)
        self.decay_factor = param_map.SIM_USERS_PARAM.get("decay_factor")
        self.sim_users_max_size = param_map.SIM_USERS_PARAM.get("sim_users_max_size")
        self.similarity_low = param_map.SIM_USERS_PARAM.get("similarity_low")
        self.similarity_high = param_map.SIM_USERS_PARAM.get("similarity_high")

    @staticmethod
    def basic_cursor2dic(platform, mongodb_cursor):
        """Mongodb_cursor :param platform: Param mongodb_Cursor: :return:"""
        users_profile_map = {}
        for user_profile in mongodb_cursor:
            _uid = user_profile["name"] if platform == "PAC" else user_profile["_id"]
            users_profile_map[_uid] = user_profile
        return users_profile_map

    @staticmethod
    def get_sim_users_profile(all_users_profile, users_similarity):
        """
        :param all_users_profile:
        :param users_similarity:
        :return:相似人群的画像
        """
        rs = []
        for uid_similarity in users_similarity:
            uid, similarity = split_uid_similarity(uid_similarity)
            profile = all_users_profile.get(uid)
            if profile is not None:
                rs.append(profile)
        return rs

    def dump_basic_profile(self, all_uid, batch_num, platform, profile_collection):
        # type: (list, int) -> dict
        """:return: base profile :param platform: platform: return: base profile dictionary table :param profile_collection: Database set :param all_uid: user id list :type batch_num: int"""
        rs = {}
        The database queries the user portraits of all groups, and there is no similar group in this portrait
        for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):
            key = "name" if platform == "PAC" else "_id"
            cursor = profile_collection.find({"$and": [{key: {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},
                                                       {"update_time": {"$gt": time_limit}}]}, no_cursor_timeout=True)
            rs.update(self.basic_cursor2dic(platform, cursor))
            cursor.close()
        return rs

    def compute_single_file(self, path, xw_profile_collection, pac_profile_collection, om_profile_collection):
        users = open(path)
        all_uid_list = []
        uid_sim_map = {}
        # uid_sim_map["1_291083852"] = ["1_757155427:8"]
        for user_str in users:
            # Fetch udI's similar population from HDFS
            uid_hf, sim_users_hd = split_uid_sim_user(user_str)
            uid_sim_map[uid_hf] = sim_users_hd.split(",")
            all_uid_list.append(uid_hf)
        print("uid_sim_map : %d" % len(uid_sim_map))
        # The database queries the basic portrait of all users. There are no similar people in this portrait
        platform_basic_profile_dic = {}

        xw_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "XW", xw_profile_collection)
        platform_basic_profile_dic["XW"] = xw_users_basic_profile_map

        pac_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "PAC", pac_profile_collection)
        platform_basic_profile_dic["PAC"] = pac_users_basic_profile_map

        om_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "OM", om_profile_collection)
        platform_basic_profile_dic["OM"] = om_users_basic_profile_map
        # print("dump basic profile %d records" % len(pac_all_users_profile_map))
        # Database query similar population portraits
        cluster_profile_collection = self.xw_database.get_collection(
            param_map.MONGODB_CLUSTER_PROFILE_MAP["Cluster"])  # type: Collection
        old_cluster_profile_map = dump_cluster_profile_history(self, all_uid_list, cluster_profile_collection,
                                                               BATCH_NUM)
        print("dump cluster profile %d records" % len(old_cluster_profile_map))
        #index = 0
        for uid, sim_users_list in uid_sim_map.items():
            print ("uid = %s" % uid)
            # Combine old and new similar populations, and use attenuation factor to calculate similarity
            users_similarity_dic = merge_sim_users(uid, sim_users_list, self.decay_factor, self.similarity_low,
                                                   self.similarity_high, old_cluster_profile_map)
            # similar crowd ----> Convert dictionary tables to lists and store mongodb
            sim_users_list, users_similarity_dic = sim_users_dic2list(users_similarity_dic, self.sim_users_max_size)
            print("similar people len: %d" % len(sim_users_list))
            platform_cluster_profile_list = []
            for platform_name, platform_basic_profile in platform_basic_profile_dic.items():
                Extract user I's portrait of similar people
                sim_users_profile_list = self.get_sim_users_profile(platform_basic_profile, sim_users_list)
                cluster_profile_dic = cluster_profile_compute(platform_name, sim_users_profile_list,
                                                              users_similarity_dic)
                -----> dictionary table to list, easy to store mongodb
                cluster_profile_list = cluster_profile_dic2list(cluster_profile_dic, param_map.DIMENSION_PARAM)
                platform_cluster_profile_list.append(cluster_profile_list)
               
            xw_cluster_profile = platform_cluster_profile_list[0]
            pac_cluster_profile = platform_cluster_profile_list[1]
            om_cluster_profile = platform_cluster_profile_list[2]

            old_profile = cluster_profile_collection.find_one({"_id": uid})
            if old_profile is None:
                create_time = int(time.time())
            else:
                create_time = old_profile["create_time"]
            document = ({"_id": uid, "sim_users": sim_users_list, "xw_cluster_profile": xw_cluster_profile,
                         "pac_cluster_profile": pac_cluster_profile, "om_cluster_profile": om_cluster_profile,
                         "create_time": create_time,
                         "update_time": int(time.time())})
            cluster_profile_collection.save(document)
            #if index >= 100:
            # break
            #index = index + 1
        print("end")
        users.close()

    def run(self):
        # Similar population HDFS
        xw_profile_collection = self.xw_database.get_collection(param_map.MONGODB_PROFILE_MAP["XW"])
        pac_profile_collection = self.pac_database.get_collection(param_map.MONGODB_PROFILE_MAP["PAC"])
        om_profile_collection = self.om_database.get_collection(param_map.MONGODB_PROFILE_MAP["OM"])
        for dir_path, dir_names, file_names in os.walk(self.sim_users_path):
            print(dir_names)
            for file_name in file_names:
                if "attempt_" in file_name:
                    print(file_name)
                    path = os.path.join(dir_path, file_name)
                    self.compute_single_file(path, xw_profile_collection, pac_profile_collection, om_profile_collection)


def accumulate_dimension_profile(cluster_dimension_feature, user_dimension, ratio):
    """Add features of the specified dimension of user to the group portrait :param Cluster_dimension_feature: features of a dimension of the group portrait :param user_Dimension: features of a dimension of the user :param ratio: weight of the user. The formula is similarity/(similarity +10) and the interval is (1/3, 10/11) :return: group portrait of specified dimension"""
    ifuser_dimension ! ="":
        user_feature_list = user_dimension.split(",")
        for feature in user_feature_list:
            atom = feature.split(":")
            if len(atom) == 2:
                k = atom[0]
                w = atom[1]
                if cluster_dimension_feature.get(k) is None:
                    cluster_dimension_feature[k] = Decimal(w) * ratio
                else:
                    cluster_dimension_feature[k] = Decimal(w) * ratio + Decimal(cluster_dimension_feature[k])
    return cluster_dimension_feature


def dump_cluster_profile_history(self, all_uid, collection, batch_num):
    rs = {}
    for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):
        cursor = collection.find({'_id': {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},
                                 no_cursor_timeout=True)
        rs.update(cluster_cursor2dic(cursor))
        cursor.close()
    return rs


def cluster_cursor2dic(mongodb_cursor):
    ""Mongodb_cursor: :return: "Mongodb_cursor: : mongodb_cursor: :return:"""
    users_profile_map = {}
    for user_profile in mongodb_cursor:
        _uid = user_profile["_id"]
        users_profile_map[_uid] = user_profile
    return users_profile_map


def merge_sim_users(uid_hdf, sim_users_new, decay_factor, similarity_low, similarity_high, old_cluster_profile_dic):
    """Merge similar people :param similarity_low: lowest similarity value :param similarity_high: highest similarity value :param uid_hdf: User number :param sim_users_new: Latest similar users :param decay_factor: attenuating factor: param old_cluster_profile_dic: Old group portrait :return: Latest similar people"""
    cluster_union_dic = {}
    Extract uid and similarity to dictionary table
    for user_similarity in sim_users_new:
        _uid, similarity = split_uid_similarity(user_similarity)
        cluster_union_dic[_uid] = similarity

    # Fetch old portraits from mongodb
    old = old_cluster_profile_dic.get(uid_hdf)
    if old is not None:
        sim_users_old = old['sim_users']
        for uid_similarity_old in sim_users_old:
            uid_similarity_old_list = uid_similarity_old.split(":")
            if len(uid_similarity_old_list) == 2:
                sim_uid_old = uid_similarity_old_list[0]
                try:
                    weight_old = float(uid_similarity_old_list[1]) * float(decay_factor)
                except IndexError:
                    pass
                else:
                    if (cluster_union_dic.get(sim_uid_old) is None) and (weight_old >= similarity_low):
                        cluster_union_dic[sim_uid_old] = weight_old
                    else:
                        weight_new = weight_old + cluster_union_dic[sim_uid_old]
                        if weight_new > similarity_high:
                            weight_new = similarity_high
                        if weight_new < similarity_low:
                            del cluster_union_dic[sim_uid_old]
                        else:
                            cluster_union_dic[sim_uid_old] = weight_new
    return cluster_union_dic


def cluster_profile_compute(platform, sim_users_profile_array, sim_users_dic):
    # type: (String, list, dic) -> dic
    """Similar population profile calculation :param platform: param sim_users_profile_array: Similar population profile found in mongodb :param sim_users_dic: Return: Similar people portrait dictionary table"""
    cluster_profile_rs = {}
    for sim_user_obj in sim_users_profile_array:
        key = "name" if platform == "PAC" else "_id"
        sim_user_id = sim_user_obj.get(key)
        Get similarity between two users
        similarity = sim_users_dic.get(sim_user_id)
        if similarity is not None:
            sim_num = Decimal(similarity)
            # User corresponding weight
            rate = Decimal(sim_num / (10 + sim_num))
            # Take out a portrait of someone
            profile = sim_user_obj.get("profile") if sim_user_obj.get("profile") is not None else ""
            dimension_list = profile.split(";")
            i = 0
            for u_dimension in dimension_list:
                # Obtain the characteristics of group dimension I
                dimension_feature = cluster_profile_rs.get(i)
                if dimension_feature is None:
                    dimension_feature = {}
                Update dimension I characteristics
                cluster_profile_rs[i] = accumulate_dimension_profile(dimension_feature, u_dimension, rate)
                i = i + 1
    return cluster_profile_rs


if __name__ == "__main__":
    if len(sys.argv) == 2:
        env = sys.argv[1]
    else:
        env = "local"
    computer = ClusterProfileComputer(env)
    computer.run()
Copy the code

Question and answer

Linux Real-time scheduling algorithm?

reading

5 Docker logging best practices

Is your Nginx access too slow? Add a module!

This section describes the changes in MySQL 8.0

Has been authorized by the author tencent cloud + community release, the original link: https://cloud.tencent.com/developer/article/1159230?fromSource=waitui

Welcome to Tencent Cloud + community or follow the wechat public account (QcloudCommunity), the first time to get more mass technology practice dry goods oh ~

Massive technical practice experience, all in the cloud plus community!