Răsfoiți Sursa

report_retrival 10 days, seller_listing retry

huangyifan 1 an în urmă
părinte
comite
be99d67f0d

+ 78 - 68
start_sync_amz.py

@@ -4,82 +4,91 @@ logging.config.dictConfig(LOG_CONF)
 from sync_amz_data.public.amz_ad_client import shop_infos
 from sync_amz_data.DataTransform.Data_ETL import Common_ETLMethod,SP_ETL,SB_ETL,SD_ETL
 from apscheduler.schedulers.blocking import BlockingScheduler
-
+from datetime import datetime,timedelta
 
 def amz_report(AWS_CREDENTIALS,para=None):
-    try:
-        refresh_token = shop_infos(AWS_CREDENTIALS['profile_id'])['refresh_token']
-        AWS_CREDENTIALS['refresh_token'] = refresh_token
-    except Exception as e:
-        print(e)
+    list_date = [((datetime.today()+timedelta(days=i)).date()).isoformat() for i in range(-2,-10,-1)]
+    if para is not None:
+        list_date = [para['startDate']]
+    # print(list_date)
+    for date_ in list_date:
+        print(date_)
+        print(date_.replace("-", ""))
+        # amz_report(conn, AWS_CREDENTIALS, para={"startDate": date_, "endDate": date_, "date": date_.replace("-", "")})
+
+        try:
+            refresh_token = shop_infos(AWS_CREDENTIALS['profile_id'])['refresh_token']
+            AWS_CREDENTIALS['refresh_token'] = refresh_token
+        except Exception as e:
+            print(e)
 
-    conn = SB_ETL(**AWS_CREDENTIALS).clickhouse_connect()
-    sb_report = SB_ETL(**AWS_CREDENTIALS)
-    sb_report.reportV3_campaign_sbCampaigns_ETL(conn,params=para)
-    sb_report.reportV3_adGroup_sbAdGroup_ETL(conn,params=para)
-    sb_report.reportV3_sbCampaignPlacement_ETL(conn,params=para)
-    sb_report.reportV3_sbTargeting_ETL(conn,params=para)
-    sb_report.reportV3_sbSearchTerm_ETL(conn,params=para)
-    sb_report.reportV3_sbAds_ETL(conn, params=para)
-    sb_report.reportV3_purchasedAsinRecord_ETL(conn,params=para)
+        conn = SB_ETL(**AWS_CREDENTIALS).clickhouse_connect()
+        sb_report = SB_ETL(**AWS_CREDENTIALS)
+        sb_report.reportV3_campaign_sbCampaigns_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV3_adGroup_sbAdGroup_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV3_sbCampaignPlacement_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV3_sbTargeting_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV3_sbSearchTerm_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV3_sbAds_ETL(conn, params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV3_purchasedAsinRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
 
-    sb_report.reportV3_purchasedAsinRecord_ETL(conn,params=para)
-    sb_report.reportV2_campaignsRecord_ETL(conn,params=para)
-    sb_report.reportV2_campaignsVideo_ETL(conn,params=para)
-    sb_report.reportV2_adGroupsRecord_ETL(conn,params=para)
-    sb_report.reportV2_adGroupsVideo_ETL(conn,params=para)
-    sb_report.reportV2_adsRecord_ETL(conn,params=para)
-    sb_report.reportV2_adsVideo_ETL(conn,params=para)
-    sb_report.reportV2_keywordsRecord_ETL(conn,params=para)
-    sb_report.reportV2_keywordsVideo_ETL(conn,params=para)
-    sb_report.reportV2_placementRecord_ETL(conn,params=para)
-    sb_report.reportV2_placementVideo_ETL(conn,params=para)
-    sb_report.reportV2_searchtermsRecord_ETL(conn,params=para)
-    sb_report.reportV2_searchtermsVideo_ETL(conn,params=para)
-    sb_report.reportV2_targetsRecord_ETL(conn,params=para)
-    sb_report.reportV2_targetsVideo_ETL(conn,params=para)
-    conn.close()
+        sb_report.reportV3_purchasedAsinRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_campaignsRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_campaignsVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_adGroupsRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_adGroupsVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_adsRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_adsVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_keywordsRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_keywordsVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_placementRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_placementVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_searchtermsRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_searchtermsVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_targetsRecord_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sb_report.reportV2_targetsVideo_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        conn.close()
 
-    conn = SP_ETL(**AWS_CREDENTIALS).clickhouse_connect()
-    sp_report = SP_ETL(**AWS_CREDENTIALS)
-    sp_report.reportV3_campaign_spCampaignsETL(conn,params=para)
-    sp_report.reportV3_adGroup_spCampaignsETL(conn,params=para)
-    sp_report.reportV3_campaignPlacement_spCampaignsETL(conn,params=para)
-    sp_report.reportV3_targeting_spTargetingETL(conn,params=para)
-    sp_report.reportV3_searchTerm_spSearchTermETL(conn,params=para)
-    sp_report.reportV3_advertiser_spAdvertisedProductETL(conn,params=para)
-    sp_report.reportV3_asin_spPurchasedProductETL(conn,params=para)
-    conn.close()
+        conn = SP_ETL(**AWS_CREDENTIALS).clickhouse_connect()
+        sp_report = SP_ETL(**AWS_CREDENTIALS)
+        sp_report.reportV3_campaign_spCampaignsETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sp_report.reportV3_adGroup_spCampaignsETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sp_report.reportV3_campaignPlacement_spCampaignsETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sp_report.reportV3_targeting_spTargetingETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sp_report.reportV3_searchTerm_spSearchTermETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sp_report.reportV3_advertiser_spAdvertisedProductETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sp_report.reportV3_asin_spPurchasedProductETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        conn.close()
 
-    conn = SD_ETL(**AWS_CREDENTIALS).clickhouse_connect()
-    sd_report = SD_ETL(**AWS_CREDENTIALS)
+        conn = SD_ETL(**AWS_CREDENTIALS).clickhouse_connect()
+        sd_report = SD_ETL(**AWS_CREDENTIALS)
 
-    sd_report.reportV3_campaign_sdCampaigns_ETL(conn,params=para)
-    sd_report.reportV3_campaignMT_sdCampaigns_ETL(conn,params=para)
-    sd_report.reportV3_adgroup_sdAdGroup_ETL(conn,params=para)
-    sd_report.reportV3_adgroupMT_sdAdGroup_ETL(conn,params=para)
-    sd_report.reportV3_targeting_sdTargeting_ETL(conn,params=para)
-    sd_report.reportV3_targetingMT_sdTargeting_ETL(conn,params=para)
-    sd_report.reportV3_asin_sdPurchasedProduct_ETL(conn,params=para)
-    sd_report.reportV3_advertiser_sdAdvertisedProduct_ETL(conn,params=para)
+        sd_report.reportV3_campaign_sdCampaigns_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_campaignMT_sdCampaigns_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_adgroup_sdAdGroup_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_adgroupMT_sdAdGroup_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_targeting_sdTargeting_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_targetingMT_sdTargeting_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_asin_sdPurchasedProduct_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        sd_report.reportV3_advertiser_sdAdvertisedProduct_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
 
-    # sd_report.reportV2_campaignsRecord_t2_ETL(conn,params=para)
-    # sd_report.reportV2_campaignsRecord_t3_ETL(conn,params=para)
-    # sd_report.reportV2_adGroupsRecord_t2_ETL(conn,params=para)
-    # sd_report.reportV2_adGroupsRecord_t3_ETL(conn,params=para)
-    # sd_report.reportV2_asins_t2_ETL(conn,params=para)
-    # sd_report.reportV2_asins_t3_ETL(conn,params=para)
-    # sd_report.reportV2_productAds_t2_ETL(conn,params=para)
-    # sd_report.reportV2_productAds_t3_ETL(conn,params=para)
-    # sd_report.reportV2_targets_t2_ETL(conn,params=para)
-    # sd_report.reportV2_targets_t3_ETL(conn,params=para)
-    # sd_report.reportV2_campaign_matchedTarget_t2_ETL(conn,params=para)
-    # sd_report.reportV2_campaign_matchedTarget_t3_ETL(conn,params=para)
-    # sd_report.reportV2_adGroups_matchedTarget_t2_ETL(conn,params=para)
-    # sd_report.reportV2_adGroups_matchedTarget_t3_ETL(conn,params=para)
-    # sd_report.reportV2_targets_matchedTarget_t2_ETL(conn,params=para)
-    # sd_report.reportV2_targets_matchedTarget_t3_ETL(conn,params=para)
-    conn.close()
+        # sd_report.reportV2_campaignsRecord_t2_ETL(conn,params=para)
+        # sd_report.reportV2_campaignsRecord_t3_ETL(conn,params=para)
+        # sd_report.reportV2_adGroupsRecord_t2_ETL(conn,params=para)
+        # sd_report.reportV2_adGroupsRecord_t3_ETL(conn,params=para)
+        # sd_report.reportV2_asins_t2_ETL(conn,params=para)
+        # sd_report.reportV2_asins_t3_ETL(conn,params=para)
+        # sd_report.reportV2_productAds_t2_ETL(conn,params=para)
+        # sd_report.reportV2_productAds_t3_ETL(conn,params=para)
+        # sd_report.reportV2_targets_t2_ETL(conn,params=para)
+        # sd_report.reportV2_targets_t3_ETL(conn,params=para)
+        # sd_report.reportV2_campaign_matchedTarget_t2_ETL(conn,params=para)
+        # sd_report.reportV2_campaign_matchedTarget_t3_ETL(conn,params=para)
+        # sd_report.reportV2_adGroups_matchedTarget_t2_ETL(conn,params=para)
+        # sd_report.reportV2_adGroups_matchedTarget_t3_ETL(conn,params=para)
+        # sd_report.reportV2_targets_matchedTarget_t2_ETL(conn,params=para)
+        # sd_report.reportV2_targets_matchedTarget_t3_ETL(conn,params=para)
+        conn.close()
 
 if __name__ == '__main__':
     # AccountTask("3006125408623189").do({"record": "portfolios"})
@@ -97,6 +106,7 @@ if __name__ == '__main__':
 
     timezone_ = Common_ETLMethod(**AWS_CREDENTIALS).timeZone()
     print(timezone_)
+    # amz_report(AWS_CREDENTIALS)
     sched = BlockingScheduler()
     sched.add_job(amz_report,'cron',hour=17,minute=0,second=0,timezone=timezone_, args=(AWS_CREDENTIALS, ))#,params={"startDate":"2023-11-04","endDate":"2023-11-04","date":"20231104"}
     sched.start()

+ 4 - 3
sync_amz_data/public/amz_ad_client.py

@@ -10,7 +10,7 @@ import gzip
 from pathlib import Path
 import s3fs
 from s3fs import S3FileSystem
-
+from retry import retry
 import logging
 
 URL_AUTH = "https://api.amazon.com/auth/o2/token"
@@ -105,6 +105,7 @@ class BaseClient:
             "Authorization": f"Bearer {self.access_token}",
         }
 
+    @retry(tries=2, delay=3, backoff=2,)
     def _request(self, url_path: str, method: str = "GET", headers: dict = None, params: dict = None,
                  body: dict = None):
         head = self.auth_headers
@@ -117,8 +118,8 @@ class BaseClient:
             params=params,
             json=body,
         )
-        if resp.status_code == 429:
-            raise RateLimitError(resp.headers.get("Retry-After"))
+        # if resp.status_code == 429:
+        #     raise RateLimitError(resp.headers.get("Retry-After"))
         if resp.status_code >= 400:
             raise Exception(resp.text)
 

+ 36 - 6
sync_amz_data/public/sp_api_client.py

@@ -278,6 +278,21 @@ class SpApiRequest:
     #     decom_df.fillna('', inplace=True)
     #     # print(decom_df.info())
     #     return decom_df
+    def fba_inventorySQL(self,conn,seller_id):
+        cursor = conn.cursor()
+        # 执行语句
+        cursor.execute(f"""select asin,sku,seller_id,marketplace_id,country_code,quantity from 
+                                                (select * from asj_ads.seller_listings order by update_datetime desc) as t
+                                               where seller_id='{seller_id}' and marketplace_id='{self.marketplace.marketplace_id}'
+                                               and fulfillment_channel='FBA'
+                                               group by asin,sku,seller_id,marketplace_id,country_code,quantity
+                                                """)
+        query_ = cursor.fetchall()
+        col_name = [i[0] for i in cursor.description]
+        df_datatable = pd.DataFrame(query_, columns=col_name)
+        df_datatable.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_',
+                                'afn-fulfillable-quantity']
+        return df_datatable
 
     def GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(self,refresh_token,conn=None,seller_id=None,days=-1,**kwargs): # FBA库存信息
         try:
@@ -285,7 +300,8 @@ class SpApiRequest:
             reportid = self.create_report(**para)
             df = self.decompression(reportid)
             if len(df)==0:
-                return pd.DataFrame()
+                return self.fba_inventorySQL(conn,seller_id)
+                # pd.DataFrame()
             df['seller_id'] = seller_id
             df['marketplace_id'] = self.marketplace.marketplace_id
             df['country_code'] = str(self.marketplace)[-2:]
@@ -302,7 +318,7 @@ class SpApiRequest:
 
                 df = self.decompression(reportid)
                 if len(df) == 0:
-                    return pd.DataFrame()
+                    return self.fba_inventorySQL(conn, seller_id)
                 df['seller_id'] = seller_id
                 df['marketplace_id'] = self.marketplace.marketplace_id
                 df['country_code'] = str(self.marketplace)[-2:]
@@ -329,8 +345,10 @@ class SpApiRequest:
             df['seller_id'] = seller_id
             df['marketplace_id'] = self.marketplace.marketplace_id
             df['country_code'] = str(self.marketplace)[-2:]
+
             if 'fulfilment-channel' in df.columns: # 判断是否存在’fulfilment‘字段(1个film),如果存在则添加一个’fulfillment‘字段(两个fillm)
                 df['fulfillment-channel'] = df['fulfilment-channel'].copy()
+
             # 如果是amazon,则字段改为FBA
             df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
             # 如果是DEFAULT,则字段该为FBM
@@ -369,9 +387,11 @@ class SpApiRequest:
             # 连接数据库
             conn = SpApiRequest.Data_auth()
             cursor = conn.cursor()
+
             # 执行语句,筛选出asin不为空并且product_id不为空的两列唯一数据。
             cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings where asin is not null 
                                 and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
+
             query_ = cursor.fetchall()
             col_name = [i[0] for i in cursor.description]
             df_datatable = pd.DataFrame(query_, columns=col_name)
@@ -415,6 +435,7 @@ class SpApiRequest:
             # 填充NA值
             merged_df.fillna('',inplace=True)
             df1 = merged_df.copy()
+            # df1.to_csv("第一次合并处理后.csv")
 
             #获取FBA库存数据
             df_fbaInventory = self.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token, conn, seller_id, days)
@@ -426,6 +447,7 @@ class SpApiRequest:
 
             df1['quantity'] = df1['quantity'].map(lambda x:0 if pd.isna(x) else int(x))
             df1['quantity'] = df1['quantity'].fillna(0)
+            # df1.to_csv("第二次合并处理后的数据.csv")
 
             # 判断更新数据
             update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
@@ -527,10 +549,11 @@ class SpApiRequest:
     def update_data(self,df,seller_id,country_code,conn): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, data compare
         conn = SpApiRequest.Data_auth()
         cursor = conn.cursor()
+
         columns = ['listing-id', 'seller_id',
          'asin1', 'seller-sku', 'title', 'image_link', 'country_code',
          'marketplace_id', 'quantity', 'fulfillment_channel',
-         'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type'
+         'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type','modifier'
          ]
         if country_code=='GB':
             country_code="UK"
@@ -546,22 +569,28 @@ class SpApiRequest:
             col = [i[0] for i in cursor.description]
             query_rel = cursor.fetchall()
             df_rel = pd.DataFrame(query_rel, columns=col)
+            # df_rel.to_csv("数据库数据.csv")
+
+            #数据库数据
             df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64')
             df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64')
             df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64')
-            df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]')
 
+            # 新数据
+            df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]')
             df['quantity'] = df['quantity'].fillna(0).astype('int64')
             df['price']= df['price'].fillna(0.0).astype('float64')
 
             row = 0
             while row < len(df):
                 temp_df = df.iloc[row, :]
+                temp_d = df.iloc[row:row+1, :]
 
                 listing_id = temp_df['listing-id']
                 asin = temp_df['asin1']
                 sku = temp_df['seller-sku']
                 quantity = temp_df['quantity']
+
                 fulfillment_channel = temp_df['fulfillment_channel']
                 price = temp_df['price']
                 product_id = temp_df['product-id']
@@ -572,11 +601,12 @@ class SpApiRequest:
                 # print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
                 if len(temp)>1:
                     # temp = temp.head(1).to_numpy().tolist()
-                    df_data = pd.concat((df_data,temp_df),ignore_index=True) #df_data.append(temp_df, ignore_index=True)
+                    df_data = pd.concat((df_data,temp_d),ignore_index=True) #df_data.append(temp_df, ignore_index=True)
+
                     delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id))
                 # print(len(temp))
                 elif len(temp)==0:
-                    df_data = pd.concat((df_data,temp_df),ignore_index=True)
+                    df_data = pd.concat((df_data,temp_d),ignore_index=True)
                     delete_list.append((seller_id,marketplace_id,sku,listing_id,product_id))
                 row += 1
             print("判断不同数据条数",len(delete_list))