Răsfoiți Sursa

Merge branch 'yifan' of ASJ_ADS/sync_amz_data into master

yifan_huang96 1 an în urmă
părinte
comite
7e3ac58cad
1 a modificat fișierele cu 135 adăugiri și 72 ștergeri
  1. 135 72
      sync_amz_data/public/adjust_budget_bid.py

+ 135 - 72
sync_amz_data/public/adjust_budget_bid.py

@@ -1,36 +1,24 @@
 import pymysql
 import pandas as pd
 import numpy as np
-from datetime import datetime,timedelta,timezone
+from datetime import datetime, timedelta, timezone
+
 pd.set_option('display.max_columns', None)
 pd.set_option('expand_frame_repr', False)
 import warnings
 from typing import Literal
+import json
 
 warnings.filterwarnings('ignore')
 
-class AdjustB:
-    def __init__(self,campaign_id,time_period:Literal["1week","2weeks","1month","2months","45days"]):
-        self.campaign_id = campaign_id
-        self.time_period = time_period
 
-    def datatable_connect01(self):
-        conn = pymysql.connect(user="admin",
-                               password="NSYbBSPbkGQUbOSNOeyy",
-                               host="retail-data.cnrgrbcygoap.us-east-1.rds.amazonaws.com",
-                               database="amzn_retail_ad",
-                               port=3306)
-        return conn
-
-    def datatable_connect02(self):
-        conn = pymysql.connect(user="root",
-                               password="sandbox",
-                               host="192.168.1.225",
-                               database="amzn_retail_ad",
-                               port=3306)
-        return conn
+class Automation_Bid_Budget:
+    def __init__(self, campaign_id,
+                 time_period: Literal["1week", "2weeks", "4weeks", "6weeks", "8weeks", "12weeks"] = "8weeks"):
+        self.campaign_id = campaign_id
+        self.time_period = time_period  # 默认初始化历史周期8周
 
-    def datatable_connect03(self):
+    def database_conv_traf(self):  # 连接数据库conversion、traffic
         conn = pymysql.connect(user="admin",
                                password="pvmBNS8q3duiUvvp",
                                host="amzn-retail.cluster-cnrgrbcygoap.us-east-1.rds.amazonaws.com",
@@ -38,9 +26,8 @@ class AdjustB:
                                port=3306)
         return conn
 
-    # datatable_connect()
-    def get_sp_conversion(self):
-        conn = self.datatable_connect03()
+    def get_sp_conversion(self):  # 获取转化
+        conn = self.database_conv_traf()
         cursor = conn.cursor()
         sql = "select * from zosi_ad_marketing_stream.sp_conversion_raw"
         sql = sql + self.add_condition(isbudgetTable=False)
@@ -49,16 +36,14 @@ class AdjustB:
         rel = cursor.fetchall()
         df = pd.DataFrame(rel, columns=columns_name)
         df = df.groupby('idempotency_id').head(1)
-        # print(df)
-        # df.to_excel("ttt111.xlsx")
         return df
 
-    def get_sp_traffic(self):
-        conn = self.datatable_connect03()
+    def get_sp_traffic(self):  # 获取流量
+        conn = self.database_conv_traf()
         cursor = conn.cursor()
         sql = "select * from zosi_ad_marketing_stream.sp_traffic_raw"
         sql = sql + self.add_condition(isbudgetTable=False)
-        print(sql)
+        # print(sql)
         cursor.execute(sql)
         columns_name = [i[0] for i in cursor.description]
         rel = cursor.fetchall()
@@ -66,34 +51,35 @@ class AdjustB:
         df = df.groupby('idempotency_id').head(1)
         return df
 
-    def get_sp_budgetug(self):
-        conn = self.datatable_connect03()
+    def get_sp_budgetug(self):  # 获取预算
+        conn = self.database_conv_traf()
         cursor = conn.cursor()
         sql = "select * from zosi_ad_marketing_stream.sp_budget_usage"
-        sql = sql+self.add_condition(isbudgetTable=True)
+        sql = sql + self.add_condition(isbudgetTable=True)
         cursor.execute(sql)
         columns_name = [i[0] for i in cursor.description]
         rel = cursor.fetchall()
         df = pd.DataFrame(rel, columns=columns_name)
         return df
 
-    def add_condition(self,isbudgetTable=False):
-        if self.time_period =='1week':
+    def add_condition(self, isbudgetTable=False):  # 添加筛选时间周期
+        if self.time_period == '1week':
             time_ = datetime.today().date() + timedelta(days=-7)
-        elif self.time_period =='2weeks':
+        elif self.time_period == '2weeks':
             time_ = datetime.today().date() + timedelta(days=-14)
-        elif self.time_period =='month':
-            time_ = datetime.today().date() + timedelta(days=-30)
-        elif self.time_period =='45days':
-            time_ = datetime.today().date() + timedelta(days=-45)
-        elif self.time_period == '2months':
-            time_ = datetime.today().date() + timedelta(days=-60)
-        # usage_updated_timestamp
+        elif self.time_period == '4weeks':
+            time_ = datetime.today().date() + timedelta(days=-28)
+        elif self.time_period == '6weeks':
+            time_ = datetime.today().date() + timedelta(days=-42)
+        elif self.time_period == '8weeks':
+            time_ = datetime.today().date() + timedelta(days=-56)
+        elif self.time_period == '12weeks':
+            time_ = datetime.today().date() + timedelta(days=-84)
         if isbudgetTable:
             return f" where usage_updated_timestamp>='{time_}' and budget_scope_id='{self.campaign_id}'"
         return f" where time_window_start>='{time_}' and campaign_id='{self.campaign_id}'"
 
-    def merge_common_operation(self):
+    def merge_common_operation(self):  # 转化与流量连表
         conversion = self.get_sp_conversion()
         conversion_ = conversion.groupby(
             ['advertiser_id', 'marketplace_id', 'time_window_start', 'campaign_id', 'ad_group_id', 'ad_id',
@@ -147,10 +133,12 @@ class AdjustB:
         # traffic_conversion['cpc'] = traffic_conversion['cpc'].replace([np.inf,np.nan,pd.NA],0)
         return traffic_conversion
 
-    def func_rule(self,traffic_conversion):
+    def pre_deal(self, traffic_conversion):  # 前处理,补全数据
+        if len(traffic_conversion) < 1:
+            return []
         pro_list = traffic_conversion.groupby(['campaign_id', 'ad_group_id', 'keyword_id']).head(1)[
             ['campaign_id', 'ad_group_id', 'keyword_id']].to_numpy().tolist()
-        for i in pro_list:
+        for i in pro_list:  # 补全24小时的数据
             cam_, adg, kid = i[0], i[1], i[2]
             df0 = traffic_conversion.query("campaign_id==@cam_ and ad_group_id==@adg and keyword_id==@kid")
             for hour in range(24):
@@ -166,6 +154,12 @@ class AdjustB:
         # 给当前没有竞价信息的赋予竞价,为该关键词最小竞价的45%
         traffic_conversion['cpc'] = traffic_conversion.apply(
             lambda x: x['cpc_min'] * 0.45 if pd.isna(x['cpc']) or x['cpc'] is None else x['cpc'], axis=1)
+        return traffic_conversion
+
+    def func_rule_budget(self, traffic_conversion):  # 预算规则
+        if len(traffic_conversion) < 1:
+            return pd.DataFrame(columns=['hour', 'pre_percent_s3'])
+        traffic_conversion = self.pre_deal(traffic_conversion)
         # total_spend = traffic_conversion['cpc'].sum()
         # 根据小时对竞价、转化、点击汇总
         tf_c = traffic_conversion.groupby(['hour']).agg(
@@ -192,38 +186,107 @@ class AdjustB:
         allocate_val = total_allocate / allocate_count if allocate_count != 0 else 0
         # 将超过25%的权重分配到其余小时区间内
         tf_c['pre_percent_s3'] = tf_c['pre_percent_s3'].map(lambda x: x + allocate_val if x != 0.25 else 0.25)
-        return tf_c[['hour','pre_percent_s3']]
+        return tf_c[['hour', 'pre_percent_s3']]
 
-    def merge_conv_traf(self): # 总结过去每天的数据,对单天预算分配
+    def budget_allocate_singleDay(self):  # 总结历史的数据,对单天预算分配
         traffic_conversion = self.merge_common_operation()
-        traffic_conversion = self.func_rule(traffic_conversion)
-        traffic_conversion.columns = ['hour','SingleDay']
-        return traffic_conversion
+        traffic_conversion = self.pre_deal(traffic_conversion)
+        traffic_conversion = self.func_rule_budget(traffic_conversion)
+        traffic_conversion.columns = ['hour', 'SingleDay']
+        return json.dumps({"budget_allocate_singleDay": traffic_conversion.to_dict(orient='records')})
 
-    def merge_cvtf_budt_accdday(self): # 总结过去每个不同工作日的数据,对每周每天预算都进行不同分配
+    def budget_allocate_week(self):  # 总结过去每个不同日的数据,对每周每天预算都进行不同分配
         traffic_conversion = self.merge_common_operation()
-        # TODO 单独筛选周一至周日每天的traffic,再进行后续步骤
-        Monday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==0])
-        Tuesday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==1])
-        Wednesday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==2])
-        Thursday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==3])
-        Friday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==4])
-        Saturday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==5])
-        Sunday_df = self.func_rule(traffic_conversion[traffic_conversion['day']==6])
-
-        weeksummary_percent = pd.merge(Monday_df,Tuesday_df,how='inner',on='hour')
-        weeksummary_percent = weeksummary_percent.merge(Wednesday_df,how='inner',on='hour')
-        weeksummary_percent = weeksummary_percent.merge(Thursday_df,how='inner',on='hour')
-        weeksummary_percent = weeksummary_percent.merge(Friday_df,how='inner',on='hour')
-        weeksummary_percent = weeksummary_percent.merge(Saturday_df,how='inner',on='hour')
-        weeksummary_percent = weeksummary_percent.merge(Sunday_df,how='inner',on='hour')
-        weeksummary_percent.columns = ["hour",'Monday','Tuesday','Wednesday','Thursday','Friday','Saturday','Sunday']
-        # weeksummary_percent.to_excel("S111.xlsx")
-        return weeksummary_percent
+        df = self.pre_deal(traffic_conversion[traffic_conversion['day'] == 0])
+        df = self.func_rule_budget(df)
+        for i in range(1, 7):
+            df1 = self.pre_deal(traffic_conversion[traffic_conversion['day'] == i])
+            df1 = self.func_rule_budget(df1)
+            df = pd.merge(df, df1, how='left', on='hour')
 
+        df.columns = ["hour", 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
+        return json.dumps({"budget_allocate_week": df.round(4).to_dict(orient='records')})
+
+    def rule_set_bid(self, avg_weight, cr, avg_cr, ctr, avg_ctr, weight_value, hour):  # 竞价规则设置
+        if weight_value > avg_weight * 1.5:  # 表现极好词
+            return 2
+        elif weight_value > avg_weight * 1.25:  # 表现较好词
+            if hour in [23, 0, 1, 2, 3, 4, 5]:
+                return 1.5
+            else:
+                return 1.5 + np.random.randint(100, 300) / 1000
+        elif weight_value > avg_weight * 1.15:  # 表现稍好词
+            if hour in [23, 0, 1, 2, 3, 4, 5]:
+                return 1.25
+            else:
+                return 1.5 + np.random.randint(100, 200) / 1000
+        elif weight_value > avg_weight:  # 标准权重词
+            return 1
+        else:
+            if ctr >= avg_ctr and cr >= 0.75 * avg_ctr:  # 点击较高,转化稍差词
+                return 1
+            elif cr > avg_ctr:  # 转化高词,点击不好的词
+                return 1.25
+            elif cr > 0.75 * avg_cr:  # 转化较差词
+                return 0.75
+            else:  # 该小时无cr、ctr记录的,并且时间不在23-5点的词
+                if ((pd.isna(cr) and pd.isna(ctr)) or None in [cr, ctr]) and hour not in [23, 0, 1, 2, 3, 4, 5]:
+                    return [0.5, 0.7, 0.8, 0.9, 1, 1.1][np.random.randint(0, 5)]
+            return 0.5  # 其余条件的词
+
+    def func_rule_bid(self, traffic_conversion):  # 竞价规则应用
+        if len(traffic_conversion) < 1:
+            return pd.DataFrame(columns=['hour', 'weight_allocate'])
+        tf_c = traffic_conversion.groupby(['hour']).agg(
+            {'cost': sum, 'attributed_conversions_1d': sum, 'clicks': sum, 'impressions': sum}).reset_index()
+        tf_c['cpc'] = tf_c['cost'] / tf_c['clicks']
+        tf_c['cr'] = tf_c['attributed_conversions_1d'] / tf_c['clicks']
+        tf_c['ctr'] = tf_c['clicks'] / tf_c['impressions']
+        avg_bid = tf_c['cpc'].mean()
+        avg_cr = tf_c['attributed_conversions_1d'].sum() / tf_c['clicks'].sum()
+        avg_ctr = tf_c['clicks'].sum() / tf_c['impressions'].sum()
+        tf_c['weight_value'] = tf_c['cr'] / tf_c['cpc']
+        avg_weight = avg_cr / avg_bid
+        # avg_weight = tf_c['weight_value'].mean()
+
+        tf_c['weight_allocate'] = tf_c.apply(
+            lambda x: self.rule_set_bid(avg_weight, x['cr'], avg_cr, x['ctr'], avg_ctr, x['weight_value'], x['hour']),
+            axis=1)
+        return tf_c[['hour', 'weight_allocate']].round(2)
+
+    def bid_adjust_singleDay(self):
+        traffic_conversion = self.merge_common_operation()
+        # traffic_conversion = self.pre_deal(traffic_conversion)
+        tf_c = self.pre_deal(traffic_conversion)
+        tf_c = self.func_rule_bid(tf_c)
+        tf_c.columns = ['hour', 'SingleDay']
+        # 完成
+        return json.dumps({"bid_adjust_singleDay": tf_c.to_dict(orient='records')})
+
+    def bid_adjust_week(self):
+        traffic_conversion = self.merge_common_operation()
+        # 单独筛选周一至周日每天的traffic,再进行聚合
+        df = self.pre_deal(traffic_conversion[traffic_conversion['day'] == 0])
+        df = self.func_rule_bid(df)
+        for i in range(1, 7):
+            df1 = self.pre_deal(traffic_conversion[traffic_conversion['day'] == i])
+            df1 = self.func_rule_bid(df1)
+            df = pd.merge(df, df1, how='left', on='hour')
+
+        df.columns = ["hour", 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
+
+        return json.dumps({"bid_adjust_week": df.to_dict(orient='records')})
 
 
 if __name__ == '__main__':
-    adjust_ = AdjustB(campaign_id='281441197839505',time_period='45days')
-    rel = adjust_.merge_conv_traf()
-    print(rel)
+    adjust_ = Automation_Bid_Budget(campaign_id='325523075677132')
+
+    # 竞价分配
+    bid_adjust = adjust_.bid_adjust_week()
+    print(bid_adjust)
+
+    print()
+
+    # 预算分配
+    budget_adjust = adjust_.budget_allocate_week()
+    print(budget_adjust)