123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- import pymysql
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta, timezone
- pd.set_option('display.max_columns', None)
- pd.set_option('expand_frame_repr', False)
- import warnings
- from typing import Literal
- import json
- warnings.filterwarnings('ignore')
- class Automation_Bid_Budget:
- def __init__(self, campaign_id,
- time_period: Literal["1week", "2weeks", "4weeks", "6weeks", "8weeks", "12weeks"] = "8weeks"
- ):
- self.campaign_id = campaign_id
- self.time_period = time_period # 默认初始化历史周期8周
- def database_conv_traf(self): # 连接数据库conversion、traffic
- conn = pymysql.connect(user="admin",
- password="pvmBNS8q3duiUvvp",
- host="amzn-retail.cluster-cnrgrbcygoap.us-east-1.rds.amazonaws.com",
- database="zosi_ad_marketing_stream",
- port=3306)
- return conn
- def get_sp_conversion(self): # 获取转化
- conn = self.database_conv_traf()
- cursor = conn.cursor()
- sql = "select * from zosi_ad_marketing_stream.sp_conversion_raw"
- sql = sql + self.add_condition(isbudgetTable=False)
- cursor.execute(sql)
- columns_name = [i[0] for i in cursor.description]
- rel = cursor.fetchall()
- df = pd.DataFrame(rel, columns=columns_name)
- df = df.groupby('idempotency_id').head(1)
- return df
- def get_sp_traffic(self): # 获取流量
- conn = self.database_conv_traf()
- cursor = conn.cursor()
- sql = "select * from zosi_ad_marketing_stream.sp_traffic_raw"
- sql = sql + self.add_condition(isbudgetTable=False)
- # print(sql)
- cursor.execute(sql)
- columns_name = [i[0] for i in cursor.description]
- rel = cursor.fetchall()
- df = pd.DataFrame(rel, columns=columns_name)
- df = df.groupby('idempotency_id').head(1)
- return df
- def get_sp_budgetug(self): # 获取预算
- conn = self.database_conv_traf()
- cursor = conn.cursor()
- sql = "select * from zosi_ad_marketing_stream.sp_budget_usage"
- sql = sql + self.add_condition(isbudgetTable=True)
- cursor.execute(sql)
- columns_name = [i[0] for i in cursor.description]
- rel = cursor.fetchall()
- df = pd.DataFrame(rel, columns=columns_name)
- return df
- def add_condition(self, isbudgetTable=False): # 添加筛选时间周期
- if self.time_period == '1week':
- time_ = datetime.today().date() + timedelta(days=-7)
- elif self.time_period == '2weeks':
- time_ = datetime.today().date() + timedelta(days=-14)
- elif self.time_period == '4weeks':
- time_ = datetime.today().date() + timedelta(days=-28)
- elif self.time_period == '6weeks':
- time_ = datetime.today().date() + timedelta(days=-42)
- elif self.time_period == '8weeks':
- time_ = datetime.today().date() + timedelta(days=-56)
- elif self.time_period == '12weeks':
- time_ = datetime.today().date() + timedelta(days=-84)
- if isbudgetTable:
- return f" where usage_updated_timestamp>='{time_}' and budget_scope_id='{self.campaign_id}'"
- return f" where time_window_start>='{time_}' and campaign_id='{self.campaign_id}'"
- def merge_common_operation(self): # 转化与流量连表
- conversion = self.get_sp_conversion()
- conversion_ = conversion.groupby(
- ['advertiser_id', 'marketplace_id', 'time_window_start', 'campaign_id', 'ad_group_id', 'ad_id',
- 'keyword_id', 'placement', 'currency']).agg({
- 'attributed_sales_1d': sum,
- 'attributed_sales_1d_same_sku': sum,
- 'attributed_sales_7d': sum,
- 'attributed_sales_7d_same_sku': sum,
- 'attributed_sales_14d': sum,
- 'attributed_sales_14d_same_sku': sum,
- 'attributed_sales_30d': sum,
- 'attributed_sales_30d_same_sku': sum,
- 'attributed_conversions_1d': sum,
- 'attributed_conversions_1d_same_sku': sum,
- 'attributed_conversions_7d': sum,
- 'attributed_conversions_14d_same_sku': sum,
- 'attributed_conversions_30d': sum,
- 'attributed_conversions_30d_same_sku': sum,
- 'attributed_units_ordered_1d': sum,
- 'attributed_units_ordered_1d_same_sku': sum,
- 'attributed_units_ordered_7d': sum,
- 'attributed_units_ordered_7d_same_sku': sum,
- 'attributed_units_ordered_14d': sum,
- 'attributed_units_ordered_14d_same_sku': sum,
- 'attributed_units_ordered_30d': sum,
- 'attributed_units_ordered_30d_same_sku': sum
- }).reset_index()
- traffic = self.get_sp_traffic()
- traffic[['impressions', 'clicks']] = traffic[['impressions', 'clicks']].astype('int64')
- traffic['cost'] = traffic['cost'].astype('float64')
- traffic_ = traffic.groupby(
- ['advertiser_id', 'marketplace_id', 'time_window_start', 'campaign_id', 'ad_group_id', 'ad_id',
- 'keyword_id', 'keyword_text', 'placement', 'match_type', 'currency'
- ]).agg({'impressions': sum,
- 'clicks': sum,
- 'cost': sum
- }).reset_index()
- traffic_conversion = traffic_.merge(conversion_,
- on=['advertiser_id', 'marketplace_id', 'campaign_id', 'ad_group_id',
- 'ad_id', 'keyword_id', 'placement', 'time_window_start', 'currency'],
- how='inner')
- if len(traffic_conversion) < 1:
- return pd.DataFrame()
- traffic_conversion['hour'] = traffic_conversion['time_window_start'].dt.hour
- traffic_conversion['day'] = traffic_conversion['time_window_start'].dt.dayofweek
- traffic_conversion = traffic_conversion.groupby(
- ['campaign_id', 'ad_group_id', 'keyword_id', 'hour']).sum().reset_index()
- traffic_conversion['cpc'] = traffic_conversion['cost'] / traffic_conversion['clicks']
- # traffic_conversion['cpc'].fillna(0,inplace=True)
- # traffic_conversion['cpc'] = traffic_conversion['cpc'].replace([np.inf,np.nan,pd.NA],0)
- return traffic_conversion
- def pre_deal(self, traffic_conversion): # 前处理,补全数据
- if len(traffic_conversion) < 1:
- return []
- pro_list = traffic_conversion.groupby(['campaign_id', 'ad_group_id', 'keyword_id']).head(1)[
- ['campaign_id', 'ad_group_id', 'keyword_id']].to_numpy().tolist()
- for i in pro_list: # 补全24小时的数据
- cam_, adg, kid = i[0], i[1], i[2]
- df0 = traffic_conversion.query("campaign_id==@cam_ and ad_group_id==@adg and keyword_id==@kid")
- for hour in range(24):
- if hour not in df0['hour'].tolist():
- traffic_conversion = traffic_conversion.append(
- {'campaign_id': cam_, 'ad_group_id': adg, 'keyword_id': kid, 'hour': hour},
- ignore_index=True)
- traffic_conversion['cpc_min'] = traffic_conversion.groupby(['campaign_id', 'ad_group_id', 'keyword_id'])[
- 'cpc'].transform('min')
- traffic_conversion = traffic_conversion.sort_values(by=['campaign_id', 'ad_group_id', 'keyword_id', 'hour'])
- # 给当前没有竞价信息的赋予竞价,为该关键词最小竞价的45%
- traffic_conversion['cpc'] = traffic_conversion.apply(
- lambda x: x['cpc_min'] * 0.45 if pd.isna(x['cpc']) or x['cpc'] is None else x['cpc'], axis=1)
- return traffic_conversion
- def func_rule_budget(self, traffic_conversion): # 预算规则
- if len(traffic_conversion) < 1:
- return pd.DataFrame(columns=['hour', 'pre_percent_s3'])
- traffic_conversion = self.pre_deal(traffic_conversion)
- # total_spend = traffic_conversion['cpc'].sum()
- # 根据小时对竞价、转化、点击汇总
- tf_c = traffic_conversion.groupby(['hour']).agg(
- {'cpc': sum, 'attributed_conversions_1d': sum, 'clicks': sum}).reset_index()
- # 根据以下公式,突出转化高与竞价低的重要性
- tf_c['pre_percent'] = tf_c.apply(
- lambda x: (x['attributed_conversions_1d'] ** 3 - (x['clicks'] - x['attributed_conversions_1d']) ** 3) / x[
- 'cpc'] ** 3 + 1.001, axis=1)
- tf_c['pre_percent'] = tf_c['pre_percent'].map(lambda x: np.sqrt(x)) # 避免各时间之间差距太大,进行开根处理
- # 对无效数值/空值 赋值1.0001
- tf_c['pre_percent'] = tf_c['pre_percent'].map(lambda x: 1.0001 if pd.isna(x) or x is None else x)
- # 对23-5点的权重值降低至60%
- tf_c['pre_percent_s2'] = tf_c.apply(
- lambda x: x['pre_percent'] * 0.6 if x['hour'] < 6 or x['hour'] > 22 else x['pre_percent'], axis=1)
- total_val = tf_c['pre_percent_s2'].sum()
- # print(total_val)
- # 计算各小时权重(初次分配权重,后续修正)
- tf_c['pre_percent_s2'] = tf_c['pre_percent_s2'] / total_val
- # 对分配过度不均衡进行调整,对超过分配的25%部分只给予25%的权重百分比
- tf_c['pre_percent_s3'] = tf_c['pre_percent_s2'].map(lambda x: 0.25 if x > 0.25 else x)
- tf_c['temp'] = tf_c['pre_percent_s2'] - tf_c['pre_percent_s3']
- total_allocate = tf_c['temp'].sum()
- allocate_count = tf_c['temp'].tolist().count(0.25)
- allocate_val = total_allocate / allocate_count if allocate_count != 0 else 0
- # 将超过25%的权重分配到其余小时区间内
- tf_c['pre_percent_s3'] = tf_c['pre_percent_s3'].map(lambda x: x + allocate_val if x != 0.25 else 0.25)
- return tf_c[['hour', 'pre_percent_s3']]
- def budget_allocate_singleDay(self): # 总结历史的数据,对单天预算分配
- traffic_conversion = self.merge_common_operation()
- traffic_conversion = self.pre_deal(traffic_conversion)
- traffic_conversion = self.func_rule_budget(traffic_conversion)
- traffic_conversion.columns = ['hour', 'SingleDay']
- return json.dumps({"budget_allocate_singleDay": traffic_conversion.to_dict(orient='records')})
- def budget_allocate_week(self): # 总结过去每个不同日的数据,对每周每天预算都进行不同分配
- traffic_conversion = self.merge_common_operation()
- # print(traffic_conversion.columns)
- if len(traffic_conversion) < 1:
- return json.dumps({})
- df = self.pre_deal(traffic_conversion[traffic_conversion['day'] == 0])
- df = self.func_rule_budget(df)
- for i in range(1, 7):
- df1 = self.pre_deal(traffic_conversion[traffic_conversion['day'] == i])
- df1 = self.func_rule_budget(df1)
- df = pd.merge(df, df1, how='left', on='hour')
- df.columns = ["hour", 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
- return json.dumps({"budget_allocate_week": df.round(4).to_dict(orient='records')})
- def rule_set_bid(self, avg_weight, cr, avg_cr, ctr, avg_ctr, weight_value, hour): # 竞价规则设置
- if weight_value > avg_weight * 1.5: # 表现极好词
- return 2
- elif weight_value > avg_weight * 1.25: # 表现较好词
- if hour in [23, 0, 1, 2, 3, 4, 5]:
- return 1.5
- else:
- return 1.5 + np.random.randint(100, 300) / 1000
- elif weight_value > avg_weight * 1.15: # 表现稍好词
- if hour in [23, 0, 1, 2, 3, 4, 5]:
- return 1.25
- else:
- return 1.5 + np.random.randint(100, 200) / 1000
- elif weight_value > avg_weight: # 标准权重词
- return 1
- else:
- if ctr >= avg_ctr and cr >= 0.75 * avg_ctr: # 点击较高,转化稍差词
- return 1
- elif cr > avg_ctr: # 转化高词,点击不好的词
- return 1.25
- elif cr > 0.75 * avg_cr: # 转化较差词
- return 0.75
- else: # 该小时无cr、ctr记录的,并且时间不在23-5点的词
- if ((pd.isna(cr) and pd.isna(ctr)) or None in [cr, ctr]) and hour not in [23, 0, 1, 2, 3, 4, 5]:
- return [0.5, 0.7, 0.8, 0.9, 1, 1.1][np.random.randint(0, 5)]
- return 0.5 # 其余条件的词
- def func_rule_bid(self, traffic_conversion): # 竞价规则应用
- if len(traffic_conversion) < 1:
- return pd.DataFrame(columns=['hour', 'weight_allocate'])
- tf_c = traffic_conversion.groupby(['hour']).agg(
- {'cost': sum, 'attributed_conversions_1d': sum, 'clicks': sum, 'impressions': sum}).reset_index()
- tf_c['cpc'] = tf_c['cost'] / tf_c['clicks']
- tf_c['cr'] = tf_c['attributed_conversions_1d'] / tf_c['clicks']
- tf_c['ctr'] = tf_c['clicks'] / tf_c['impressions']
- avg_bid = tf_c['cpc'].mean()
- avg_cr = tf_c['attributed_conversions_1d'].sum() / tf_c['clicks'].sum()
- avg_ctr = tf_c['clicks'].sum() / tf_c['impressions'].sum()
- tf_c['weight_value'] = tf_c['cr'] / tf_c['cpc']
- avg_weight = avg_cr / avg_bid
- # avg_weight = tf_c['weight_value'].mean()
- tf_c['weight_allocate'] = tf_c.apply(
- lambda x: self.rule_set_bid(avg_weight, x['cr'], avg_cr, x['ctr'], avg_ctr, x['weight_value'], x['hour']),
- axis=1)
- return tf_c[['hour', 'weight_allocate']].round(2)
- def bid_adjust_singleDay(self):
- traffic_conversion = self.merge_common_operation()
- # traffic_conversion = self.pre_deal(traffic_conversion)
- tf_c = self.pre_deal(traffic_conversion)
- tf_c = self.func_rule_bid(tf_c)
- tf_c.columns = ['hour', 'SingleDay']
- # 完成
- return json.dumps({"bid_adjust_singleDay": tf_c.to_dict(orient='records')})
- def bid_adjust_week(self):
- traffic_conversion = self.merge_common_operation()
- # print(traffic_conversion.columns)
- if len(traffic_conversion) < 1:
- return json.dumps({})
- # 单独筛选周一至周日每天的traffic,再进行聚合
- df = self.pre_deal(traffic_conversion[traffic_conversion['day'] == 0])
- df = self.func_rule_bid(df)
- for i in range(1, 7):
- df1 = self.pre_deal(traffic_conversion[traffic_conversion['day'] == i])
- df1 = self.func_rule_bid(df1)
- df = pd.merge(df, df1, how='left', on='hour')
- df.columns = ["hour", 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
- return json.dumps({"bid_adjust_week": df.to_dict(orient='records')})
- if __name__ == '__main__':
- adjust_ = Automation_Bid_Budget(campaign_id='532194419483669',time_period='12weeks')
- # 竞价分配
- bid_adjust = adjust_.bid_adjust_week()
- print(bid_adjust)
- print()
- # 预算分配
- budget_adjust = adjust_.budget_allocate_week()
- print(budget_adjust)
|