Pārlūkot izejas kodu

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	sync_amz_data/DataTransform/Data_ETL.py
wengao 1 gadu atpakaļ
vecāks
revīzija
a8140002ee

+ 8 - 1
start_sync_amz.py

@@ -1,4 +1,3 @@
-from sync_amz_data.tasks.account import AccountTask
 from sync_amz_data.settings import LOG_CONF
 import logging.config
 logging.config.dictConfig(LOG_CONF)
@@ -13,6 +12,14 @@ def amz_report(AWS_CREDENTIALS,para=None):
 
     conn = SB_ETL(**AWS_CREDENTIALS).clickhouse_connect()
     sb_report = SB_ETL(**AWS_CREDENTIALS)
+    sb_report.reportV3_campaign_sbCampaigns_ETL(conn,params=para)
+    sb_report.reportV3_adGroup_sbAdGroup_ETL(conn,params=para)
+    sb_report.reportV3_sbCampaignPlacement_ETL(conn,params=para)
+    sb_report.reportV3_sbTargeting_ETL(conn,params=para)
+    sb_report.reportV3_sbSearchTerm_ETL(conn,params=para)
+    sb_report.reportV3_sbAds_ETL(conn, params=para)
+    sb_report.reportV3_purchasedAsinRecord_ETL(conn,params=para)
+
     sb_report.reportV3_purchasedAsinRecord_ETL(conn,params=para)
     sb_report.reportV2_campaignsRecord_ETL(conn,params=para)
     sb_report.reportV2_campaignsVideo_ETL(conn,params=para)

+ 15 - 7
start_sync_amz_RightNowRun.py

@@ -1,7 +1,7 @@
-from sync_amz_data.tasks.account import AccountTask
-from sync_amz_data.settings import LOG_CONF
-import logging.config
-logging.config.dictConfig(LOG_CONF)
+# from sync_amz_data.tasks.account import AccountTask
+# from sync_amz_data.settings import LOG_CONF
+# import logging.config
+# logging.config.dictConfig(LOG_CONF)
 import time
 import requests
 
@@ -12,7 +12,14 @@ def amz_report(conn,AWS_CREDENTIALS,para=None):
     refresh_token = shop_infos(AWS_CREDENTIALS['profile_id'])['refresh_token']
     AWS_CREDENTIALS['refresh_token'] = refresh_token
     sb_report = SB_ETL(**AWS_CREDENTIALS)
+    sb_report.reportV3_campaign_sbCampaigns_ETL(conn,params=para)
+    sb_report.reportV3_adGroup_sbAdGroup_ETL(conn,params=para)
+    sb_report.reportV3_sbCampaignPlacement_ETL(conn,params=para)
+    sb_report.reportV3_sbTargeting_ETL(conn,params=para)
+    sb_report.reportV3_sbSearchTerm_ETL(conn,params=para)
+    sb_report.reportV3_sbAds_ETL(conn, params=para)
     sb_report.reportV3_purchasedAsinRecord_ETL(conn,params=para)
+
     sb_report.reportV2_campaignsRecord_ETL(conn,params=para)
     sb_report.reportV2_campaignsVideo_ETL(conn,params=para)
     sb_report.reportV2_adGroupsRecord_ETL(conn,params=para)
@@ -87,9 +94,10 @@ if __name__ == '__main__':
     refresh_token = shop_infos(AWS_CREDENTIALS['profile_id'])['refresh_token']
     AWS_CREDENTIALS['refresh_token'] = refresh_token
     amz_report(conn, AWS_CREDENTIALS=AWS_CREDENTIALS)
-    # list_date = ['2023-11-17',"2023-11-18",]
-    # list_date = [f'2023-11-{"0"+str(i) if len(str(i))==1 else i}' for i in range(13,16)]
-    # print(list_date)
+
+    # list_date = ["2023-10-22",]
+    # # list_date = [f'2023-10-{"0"+str(i) if len(str(i))==1 else i}' for i in range(23,32)]
+    # # print(list_date)
     # for date_ in list_date:
     #     print(date_)
     #     print(date_.replace("-",""))

+ 5 - 5
sync_amz_data/public/amz_ad_client.py

@@ -946,14 +946,14 @@ class SBClient(BaseClient):
             except:
                 try:
                     with open(
-                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
+                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['creative_type']}_{str(self.profile_id)}.json.gz",
                             'wb') as f:
                         for data in resp.iter_content(chunk_size=10 * 1024):
                             f.write(data)
                     if not decompress:
                         return file_path
                     with open(
-                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
+                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['creative_type']}_{str(self.profile_id)}.json.gz",
                             'rb') as f:  # 读取s3数据
                         data = gzip.GzipFile(fileobj=f, mode='rb')
                         de_file = json.load(data)
@@ -963,12 +963,12 @@ class SBClient(BaseClient):
                 except:
                     logger.info(f"过期开始重试")
                     self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
-                                       report_info['segment'], report_info['tactic'], report_info['download'])
+                                       report_info['segment'], report_info['creative_type'], report_info['download'])
 
         else:
             logger.info(f"状态码{resp.status_code},开始重试")
             self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
-                               report_info['segment'], report_info['tactic'], report_info['download'])
+                               report_info['segment'], report_info['creative_type'], report_info['download'])
 
 class SDClient(BaseClient):
     def get_campaigns(self, **params) -> List[dict]:
@@ -1331,6 +1331,6 @@ if __name__ == '__main__':
         'profile_id': "3006125408623189"
     }
     sp = SPClient(**AWS_CREDENTIALS)
-    rel = sp.iter_campaignNegativetargeting()
+    rel = sp.iter_negativekeyword()
     print(list(rel))
     # print(rel)

+ 40 - 9
sync_amz_data/public/sp_api_client.py

@@ -24,6 +24,7 @@ class SpApiRequest:
         self.marketplace = marketplace
         self.shopInfo = shop_infos(profile_id)
         self.timezone = self.shopInfo['time_zone']
+        self.profileid = profile_id
 
     def mysql_connect(self):
         conn = pymysql.connect(user="huangyifan",
@@ -132,35 +133,42 @@ class SpApiRequest:
             decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string',errors='ignore')
         decom_df['updateTime'] = datetime.now(tz=pytz.timezone(self.timezone))
         decom_df['timezone'] = self.timezone
+        decom_df['profileid'] = str(self.profileid)
         decom_df['item-description'] = decom_df['item-description'].str.slice(0,500)
         decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
         decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
         decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
             'string')
         decom_df.fillna('', inplace=True)
+        # print(decom_df.info())
         return decom_df
 
     def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None):
+        start = time.time()
         para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
         reportid = self.create_report(**para)
         decom_df = self.decompression(reportid)
         print("连接数据库")
         conn = self.mysql_connect()
+        print("连接成功")
         cursor = conn.cursor()
-        bondary_date = (datetime.today() + timedelta(days=-28)).strftime("%Y-%m-%d")
+        timezone = pytz.timezone(self.timezone)
+        bondary_date = (datetime.now(tz=timezone)).strftime("%Y-%m-%d") #+ timedelta(days=-28)
         cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and 
                         (`seller-sku` not in ('',' ') and `seller-sku` is not null) and 
                         `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl`
         col = [i[0] for i in cursor.description]
         query_rel = cursor.fetchall()
-        print(query_rel[0])
+
         if len(query_rel)!=0:
+            print(query_rel[0])
             df = pd.DataFrame(query_rel,columns=col)
             listingid = df['listing-id'].to_numpy().tolist()
             decom_df = decom_df.query("`listing-id` not in @listingid")
+            print("数据条数: ",len(decom_df))
             # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}")
-            cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}")
-            conn.commit()
+
+            # conn.commit()
 
         if len(decom_df)==0:
             return "Done"
@@ -179,19 +187,41 @@ class SpApiRequest:
             # print(list(conn.query("select * from amz_sp_api.orderReport")))
             sql = f"""
                         insert into amz_sp_api.productInfo
-                        values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s)
-                    """
+                        values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
+                    """ #ok
             # print(sql)
+            conn = self.mysql_connect()
+            cursor = conn.cursor()
             try:
                 conn.begin()
                 cursor.executemany(sql, list_df)
                 conn.commit()
                 print("插入中...")
+                insert_listingid = df_insert['listing-id'].to_numpy().tolist()
+                cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
+                conn.commit()
                 rowcount += 200
             except Exception as e:
                 conn.rollback()
                 print(e)
+                try:
+                    conn = self.mysql_connect()
+                    cursor = conn.cursor()
+                    conn.begin()
+                    cursor.executemany(sql, list_df)
+                    conn.commit()
+                    insert_listingid = df_insert['listing-id'].to_numpy().tolist()
+                    cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
+                    conn.commit()
+                except Exception as e:
+                    conn.rollback()
+                    print(e)
+                    break
+                # break
+        conn.close()
         print("全部完成")
+        end =time.time()
+        print("duration:",end-start)
         return decom_df
 
     def get_mainImage_url(self, sku):
@@ -201,7 +231,7 @@ class SpApiRequest:
             img = r1.payload.get("summaries")[0].get("mainImage")
             img_url = None if img is None else img.get("link")
         except Exception as e:
-            print(e)
+            print("获取图片url过程错误重试, 错误message: ",e)
             time.sleep(3)
             r1 = listingClient.get_listings_item(sellerId=self.shopInfo['advertiser_id'], sku=sku)
             img = r1.payload.get("summaries")[0].get("mainImage")
@@ -225,6 +255,7 @@ class SpApiRequest:
         decom_df.fillna('',inplace=True)
         decom_df["ReportDate"] = parse(shopReportday)
         decom_df['timezone'] = self.timezone
+        decom_df['profileid'] = str(self.profileid)
         list_df = decom_df.to_numpy().tolist()
         print(list_df[0])
         # tuple_data = [tuple(i) for i in list_df]
@@ -233,8 +264,8 @@ class SpApiRequest:
         # print(list(conn.query("select * from amz_sp_api.orderReport")))
         sql = f"""
             insert into amz_sp_api.orderReport
-            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s)
-        """
+            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
+        """ #ok
         # print(sql)
         try:
             conn.begin()

+ 0 - 65
sync_amz_data/tasks/_base.py

@@ -1,65 +0,0 @@
-from functools import lru_cache
-import logging
-from typing import List
-
-from sync_amz_data.public import BaseClient, asj_api
-from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
-
-logger = logging.getLogger(__name__)
-ASJ_URL_BASE = "http://127.0.0.1:8000/api/ad_manage/"
-
-
-@lru_cache(maxsize=100)
-def query_shop_info(profile_id: str) -> dict:
-    # todo 临时
-    if profile_id == "3006125408623189":
-        return {
-            "profile_id": profile_id,
-            "id": 1,
-            "shop_name": "ZosiDirect",
-            "region": "NA",
-            "access_token": "Atza|IwEBION12cAeJLW1CrhirYbH8ianuTfh1JQtC2nbnZjWcCk_J0v9XVF00Pm7AYzZPHXAuFgdul0vuQt4XUAcwVdzQF9AzDfk5wKAUXe9fuGZhb0nq0mdJxg08u2BR_rUggxKWSd1sg6OW7szIEq8xQzok9hcr-Ai-WupyaA-CCznOt7STmyEZltNsK8VuJb7ySxTlxwf-DbuX2Tn9JdsEta7DwQIsHcYv2QwsfLYGnk2LcLUUXG-6TtwCWFGHMxHfoScOvN92hOHiPl3CsdTs5RmKO5eVFdf0XUu8OU5Z9icnjuP1tYBy7_e9s2oTL9fVLLfH_ATUJplPsfm1MhtW6ioX9IXxfJSmOJ0ntEv45ndb9t-wHE_vLukuy_4jwmy_50NU_TOU_9pbFJLQhVyAB0f4HVcb5fgPn--feAX89ANhRJnIn5zibVk_rY_rte7Xu7JMJNNW41PoCUvfVdxjnorhGTIGh2u2JHQPqdww2xLcZ93SQ",
-            "status": 1,
-            "refresh_token": "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
-            "update_time": 1688351991,
-            "token_expires_time": 1688355491,
-            "create_time": 1683702488
-        }
-    return {}
-
-
-class BaseTask:
-    AmzAdClientClass = BaseClient
-
-    def __init__(self, profile_id: str):
-        self.shop_info = query_shop_info(profile_id)
-        self.ad_cil = self.AmzAdClientClass(
-            profile_id=profile_id,
-            refresh_token=self.shop_info["refresh_token"],
-            data_path=DATA_PATH, **AWS_LWA_CLIENT)
-
-    def do(self, task_info: dict):
-        record = task_info["record"]
-        iter_records = getattr(self.ad_cil, f"iter_{record}", None)
-        change_func = getattr(self, f"change_{record}", None)
-        ad_api_params = task_info.get("params")
-        if ad_api_params is None:
-            records_iterator = iter_records()
-        else:
-            records_iterator = iter_records(ad_api_params)
-        for data in records_iterator:
-            if change_func:
-                data = change_func(data)
-            self.to_mysql(record, data)
-            logger.info(data)
-
-    def batch_do(self, task_info: dict):
-        """
-        适用于使用pandas等进行批处理
-        @param task_info:
-        @return:
-        """
-        pass
-
-    def to_mysql(self, record: str, data: [dict, List[dict]]):
-        asj_api.create(f"{ASJ_URL_BASE}{record}/", data)

+ 0 - 19
sync_amz_data/tasks/account.py

@@ -1,19 +0,0 @@
-from ._base import BaseTask
-from sync_amz_data.public import AccountClient
-from sync_amz_data.tools import timestamp2utc_dt
-
-
-class AccountTask(BaseTask):
-    AmzAdClientClass = AccountClient
-
-    def change_portfolios(self, data: dict):
-        data["shop"] = self.shop_info["profile_id"]
-        for key in ["lastUpdatedDate", "creationDate"]:
-            if key in data:
-                data[key] = timestamp2utc_dt(data[key])
-
-        budget: dict = data.pop("budget", None)
-        if budget:
-            for key, val in budget.items():
-                data[f"budget_{key}"] = val
-        return data

+ 0 - 16
sync_amz_data/tasks/sb.py

@@ -1,16 +0,0 @@
-from sync_amz_data.public import SBClient
-from sync_amz_data.tasks._base import BaseTask
-
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class SBTask(BaseTask):
-    AmzAdClientClass = SBClient
-
-    def change_campaigns(self, data: dict):
-        return data
-
-    def change_groups(self, data: dict):
-        return data

+ 0 - 10
sync_amz_data/tasks/sd.py

@@ -1,10 +0,0 @@
-from sync_amz_data.public import SDClient
-from sync_amz_data.tasks._base import BaseTask
-
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class SDTask(BaseTask):
-    AmzAdClientClass = SDClient

+ 0 - 9
sync_amz_data/tasks/sp.py

@@ -1,9 +0,0 @@
-from sync_amz_data.public import SPClient
-from sync_amz_data.tasks._base import BaseTask
-
-
-class SpTask(BaseTask):
-    AmzAdClientClass = SPClient
-
-    def change_campaigns(self, data: dict):
-        return data