Răsfoiți Sursa

Merge branch 'yifan' of ASJ_ADS/sync_amz_data into master

yifan_huang96 5 luni în urmă
părinte
comite
0557249aba
2 a modificat fișierele cu 53 adăugiri și 25 ștergeri
  1. 39 13
      sync_amz_data/public/sp_api_client.py
  2. 14 12
      sync_listing_order_Retry.py

+ 39 - 13
sync_amz_data/public/sp_api_client.py

@@ -142,7 +142,8 @@ class SpApiRequest:
             if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
                 reportDocumentId = reportId_info.payload.get("reportDocumentId")
                 rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
-                print(rp_table)
+                print(rp_table,rp_table.payload.get("url"))
+
                 if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
                     try:
                         df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
@@ -157,9 +158,11 @@ class SpApiRequest:
                     return df
                 elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
                     df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1')
+                    print(df.columns)
                     return df
                 elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
                     df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS')
+                    print(df.columns)
                     return df
             elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
                 try:
@@ -844,11 +847,12 @@ class SpApiRequest:
         try:
             conn = self.Data_auth()
             cursor = conn.cursor()
-        except:
+        except Exception as e:
+            print(e)
             time.sleep(5)
             conn = self.Data_auth()
             cursor = conn.cursor()
-        query_judge = f"""select count(*) from asj_ads.orderReport_ where ReportDate='{shopReportday}' and country_code='{countryCode}'"""
+        query_judge = f"""select count(*) from asj_ads.orderReport_ where ReportDate='{shopReportday}' and country_code='{countryCode}' and seller_id='{seller_id}'"""
         print(query_judge)
         cursor.execute(query_judge)
         rel = cursor.fetchall()
@@ -862,6 +866,8 @@ class SpApiRequest:
                 "reportOptions": {"ShowSalesChannel": "true"}}
         reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
         decom_df = self.decompression(reportid)
+        # print(decom_df)
+        # print(decom_df.info())
         decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
         decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
         decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
@@ -1243,6 +1249,14 @@ class SpApiRequest:
         insert_time = datetime.now()
         shopReportday = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d")
         shopReportday_E = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d")
+        client = get_client(host='3.93.43.158', port=8123, username='root',
+                            password='6f0eyLuiVn3slzbGWpzI')
+        tmp_df = client.query_df(
+            """select date from ams.performance where seller_id='%s' and country_code='%s' and date='%s' and isFeedback=1""" % (
+            seller_id, country_code, shopReportday))
+        if len(tmp_df) > 0:
+            print("数据已存在")
+            return 'Done'
         print(shopReportday)
         para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,#GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA
                  "dataStartTime": shopReportday,
@@ -1276,8 +1290,8 @@ class SpApiRequest:
         df_feedback['date'] = df_feedback['date'].map(lambda x: pd.to_datetime(x).date())
         df_feedback['seller_id'] = seller_id
         df_feedback['country_code'] = country_code
-        client = get_client(host='3.93.43.158', port=8123, username='root',
-                            password='6f0eyLuiVn3slzbGWpzI')
+        # client = get_client(host='3.93.43.158', port=8123, username='root',
+        #                     password='6f0eyLuiVn3slzbGWpzI')
         try:
             client.insert_df(table='ams.performance', df=df_feedback[
             ["date", "rating", "comment", "order_id", "isFeedback", 'seller_id', 'country_code','insert_time']],
@@ -1357,7 +1371,12 @@ class SpApiRequest:
         insert_time = datetime.now()
         shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
         shopReportday_E = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
-
+        client = get_client(host='3.93.43.158', port=8123, username='root',
+                            password='6f0eyLuiVn3slzbGWpzI')
+        tmp_df = client.query_df("""select date from ams.performance where seller_id='%s' and country_code='%s' and date='%s' and isFeedback=0""" % (seller_id,country_code,shopReportday))
+        if len(tmp_df)>0:
+            print("数据已存在")
+            return 'Done'
         para = {"reportType": ReportType.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE,
                 # GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA
                 "dataStartTime": shopReportday,
@@ -1365,16 +1384,18 @@ class SpApiRequest:
         try:
             reportid = self.create_report(**para)
             decom_df = self.decompression(reportid)
-        except:
+        except Exception as e:
+            print(e)
             return pd.DataFrame()
         if len(decom_df) == 0:
             return reportid
+        print("执行插入")
         decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
         decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
         decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
         decom_df.fillna('', inplace=True)
         # decom_df.to_csv('order.csv')
-        decom_df["ReportDate"] = parse(shopReportday)
+        #parse(shopReportday)
         # decom_df['timezone'] =  decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
         decom_df['timezone'] = "UTC"
 
@@ -1383,8 +1404,8 @@ class SpApiRequest:
         decom_df['insert_time'] = datetime.now()
         decom_df['Order date'] = decom_df['Order date'].map(lambda x:'' if pd.isna(x) or x=='' else parse(x))
         decom_df['Return request date'] = decom_df['Return request date'].map(lambda x:'' if pd.isna(x) or x=='' else parse(x))
-        client = get_client(host='3.93.43.158', port=8123, username='root',
-                            password='6f0eyLuiVn3slzbGWpzI')
+        decom_df["ReportDate"] = decom_df["Return request date"]
+
 
         # print(decom_df.dtypes)
         # print("======================-------------==================")
@@ -1413,6 +1434,7 @@ class SpApiRequest:
                'Order_Amount', 'Order_quantity','SafeT_Action_reason', 'SafeT_claim_id',
                'SafeT_claim_state','SafeT_claim_creation_time', 'SafeT_claim_reimbursement_amount',
                'Refunded_Amount', 'ReportDate', 'timezone', 'seller_id','country_code', 'insert_time'])
+            print("完成插入")
         except Exception as e:
             print(e)
         client.close()
@@ -1493,7 +1515,8 @@ class SpApiRequest:
     @classmethod
     def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"],days=-1,**kwargs): # Main-AllCountries-AuthAndPost, 所有店铺数据报告获取的主要函数
         df = cls.auth_info()
-        zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
+        # zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
+        zosi = df.query("account_name=='ANLAPUS_US'")['refresh_token'].to_numpy().tolist()
         # print(zosi)
         refreshtoken_list = df.query("account_name!='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
         zosi.extend(refreshtoken_list)
@@ -1561,8 +1584,11 @@ class SpApiRequest:
         return base_product.get_product_pricing_for_asins(asin)
 
 if __name__ == '__main__':
-    for days in range(-35,-2):
-        SpApiRequest.get_allShops("GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE",days=days,**{})
+    # for days in range(-17,-2):
+    #     SpApiRequest.get_allShops("GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE",days=days,**{})
+    for days in range(-60,-2):
+        SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",days=days,**{})
+    #
     # SpApiRequest.listing_infoTable()
     # rel = SpApiRequest.get_catelog(account_name='AM-ZOSI-US',country=Marketplaces.US,asin='B0B8CPHSL4')
     # print(rel)

+ 14 - 12
sync_listing_order_Retry.py

@@ -4,18 +4,20 @@ from apscheduler.schedulers.blocking import BlockingScheduler
 from sync_amz_data.public import sp_api_client
 def func_run():
     days = -4
-    try:
-        sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",days=days,**{})
-    except Exception as e:
-        print(e)
-    try:
-        sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE",days=-2,**{})
-    except Exception as e:
-        print(e)
-    try:
-        sp_api_client.SpApiRequest.get_allShops("GET_SELLER_FEEDBACK_DATA",days=-2,**{})
-    except Exception as e:
-        print(e)
+    for d_ in (-2,-3,-4,-5):
+        try:
+            sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",days=d_,**{})
+        except Exception as e:
+            print(e)
+
+        try:
+            sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE",days=d_,**{})
+        except Exception as e:
+            print(e)
+        try:
+            sp_api_client.SpApiRequest.get_allShops("GET_SELLER_FEEDBACK_DATA",days=d_,**{})
+        except Exception as e:
+            print(e)
     try:
         sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA",days=-2,**{})
     except Exception as e: