Sfoglia il codice sorgente

add Goods drop_duplicates func

huangyifan 11 mesi fa
parent
commit
1bef053438

+ 6 - 0
sync_amz_data/public/adjust_budget_bid.py

@@ -198,6 +198,9 @@ class Automation_Bid_Budget:
 
     def budget_allocate_week(self):  # 总结过去每个不同日的数据,对每周每天预算都进行不同分配
         traffic_conversion = self.merge_common_operation()
+        # print(traffic_conversion.columns)
+        if len(traffic_conversion) < 1:
+            return json.dumps({})
         df = self.pre_deal(traffic_conversion[traffic_conversion['day'] == 0])
         df = self.func_rule_budget(df)
         for i in range(1, 7):
@@ -266,6 +269,9 @@ class Automation_Bid_Budget:
 
     def bid_adjust_week(self):
         traffic_conversion = self.merge_common_operation()
+        # print(traffic_conversion.columns)
+        if len(traffic_conversion) < 1:
+            return json.dumps({})
         # 单独筛选周一至周日每天的traffic,再进行聚合
         df = self.pre_deal(traffic_conversion[traffic_conversion['day'] == 0])
         df = self.func_rule_bid(df)

+ 94 - 4
sync_amz_data/public/sp_api_client.py

@@ -18,6 +18,9 @@ import pymysql
 from typing import List, Literal
 from random import shuffle
 from retry import retry
+import requests
+from urllib import request
+import json
 
 try:
     from ..settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF
@@ -155,12 +158,50 @@ class SpApiRequest:
                     return df
             elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
                 print(reportId_info)
-                print("取消或失败")
+                reportDocumentId = reportId_info.payload.get("reportDocumentId")
+                rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=True)
+                print("取消或失败",rp_table)
                 return pd.DataFrame()
             time.sleep(15)
             print("please wait...")
 
+    def decompression2(self,reportId): # After-CreateReportFunc-simpleDeal - 根据获取到的报告id进行解压获取
+        report = Reports(credentials=self.credentials, marketplace=self.marketplace)
+        while True:
+            reportId_info = report.get_report(reportId=reportId)
+            # print(reportId_info.payload)
+            print("please wait...")
+            if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
+                reportDocumentId = reportId_info.payload.get("reportDocumentId")
+                rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
+                print(rp_table)
+                if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
+                    # df = pd.read_json(path_or_buf=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
+                    response = request.urlopen(rp_table.payload['url'])
+                    response.encoding='iso-8859-1'
+                    df = json.loads(response.read().decode())
+                    # pd.json_normalize()
+                    return df
 
+                elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
+                    df = pd.read_json(path_or_buf=rp_table.payload['url'], compression={"method": 'gzip'},
+                                       encoding='Shift-JIS')
+                    # df.columns =
+                    return df
+                elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
+                    df = pd.read_json(path_or_buf=rp_table.payload.get("url"),encoding='iso-8859-1')
+                    return df
+                elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
+                    df = pd.read_json(path_or_buf=rp_table.payload.get("url"),encoding='Shift-JIS')
+                    return df
+            elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
+                print(reportId_info)
+                reportDocumentId = reportId_info.payload.get("reportDocumentId")
+                rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=True)
+                print("取消或失败",rp_table)
+                return pd.DataFrame()
+            time.sleep(15)
+            print("please wait...")
     # def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # Not be used
     #     start = time.time()
     #     para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
@@ -941,8 +982,45 @@ class SpApiRequest:
                     print(e)
                     conn.rollback()
         print("Success")
+
         conn.close()
 
+    @classmethod
+    def Goods_drop_duplicates(cls):
+        conn = SpApiRequest.Data_auth()
+        cursor = conn.cursor()
+        cursor.execute(
+            f"""select seller_id,countryCode,asin as count_ from asj_ads.Goods group by seller_id,countryCode,asin having count(asin)>=2""")
+        query_ = cursor.fetchall()
+        print(len(query_))
+        if len(query_)>0:
+            query_sql2 = cursor.execute(
+                f"""select distinct main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time from asj_ads.Goods where (seller_id,countryCode,asin) in {query_}""")#
+            query_2 = cursor.fetchall()
+            try:
+                query = f"""delete from asj_ads.Goods where (seller_id,countryCode,asin) in {query_}
+                    """  # where (seller_id,country_code) in %s"""
+                cursor.execute(query)
+                conn.commit()
+            except Exception as e:
+                print("错误过程1",e)
+                conn.rollback()
+
+            sql = """insert into
+                            asj_ads.Goods(main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time)
+                            values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
+            try:
+                conn.begin()
+                cursor.executemany(sql, query_2)
+                conn.commit()
+                print("插入完成")
+                conn.close()
+                time.sleep(1)
+            except Exception as e:
+                conn.rollback()
+                print("错误过程2",e)
+
+
     @staticmethod
     def get_listing_info01(refresh_token, countryCode, asin, seller_id): # used in listing_infoTable
         # print(refresh_token)
@@ -1130,6 +1208,16 @@ class SpApiRequest:
                     'BigCat_title': '', 'SmallCat_rank': 0, 'SmallCat_title': '',
                     'brandName': '', 'browseNode': '', 'itemName': ''}
 
+    def GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT(self, refresh_token,seller_id,days=-3,**kwargs):
+        shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-04-21")
+        shopReportday_E = (datetime.now() + timedelta(days=days)).strftime("%Y-04-27")
+        print(shopReportday)
+        para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT,"reportOptions":{"reportPeriod": "WEEK"},"dataStartTime": shopReportday, "dataEndTime": shopReportday_E,}
+        reportid = self.create_report(**para)
+
+    def BRAND_ANALYTICS_TEXT_deal(self,text):
+        pass
+
     @staticmethod
     def data_judge_secondTry(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # Main-retry, 重试函数,重试次数2次
         a_kw = kwargs
@@ -1163,6 +1251,8 @@ class SpApiRequest:
             return sp_api.GET_SALES_AND_TRAFFIC_REPORT(refresh_token,seller_id,days,**a_kw)
         elif data_type == "GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA":
             return sp_api.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token,auth_conn,seller_id,days,**a_kw)
+        elif data_type=="GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT":
+            return sp_api.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT(refresh_token,seller_id,days,**a_kw)
         else:
             return ""
 
@@ -1238,7 +1328,7 @@ if __name__ == '__main__':
     # SpApiRequest.listing_infoTable()
     # rel = SpApiRequest.get_catelog(account_name='AM-ZOSI-US',country=Marketplaces.US,asin='B0B8CPHSL4')
     # print(rel)
-    # SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA")
+    # SpApiRequest.get_allShops("GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT")
     # pass
-    SpApiRequest.listing_infoTable()
-
+    # SpApiRequest.listing_infoTable()
+    SpApiRequest.Goods_drop_duplicates()

+ 3 - 0
sync_get_open_listing_data.py

@@ -14,6 +14,9 @@ def func_run():
         sp_api_client.SpApiRequest.listing_infoTable()
     except:
         sp_api_client.SpApiRequest.listing_infoTable()
+    sp_api_client.SpApiRequest.Goods_drop_duplicates()
+
+
 # func_run()
 
 #