huangyifan 1 жил өмнө
parent
commit
15d2a5986f

+ 99 - 46
sync_amz_data/public/sp_api_client.py

@@ -17,7 +17,10 @@ from dateutil.parser import parse
 import pymysql
 from typing import List, Literal
 from random import shuffle
-from ..settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF
+try:
+    from ..settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF
+except:
+    from sync_amz_data.sync_amz_data.settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF
 
 class SpApiRequest:
 
@@ -29,18 +32,18 @@ class SpApiRequest:
         # self.profileid = '3006125408623189'
 
     @classmethod
-    def mysql_connect_auth(cls):
+    def mysql_connect_auth(cls): # AUTH
         conn = pymysql.connect(**MYSQL_AUTH_CONF)
 
         return conn
 
     @classmethod
-    def mysql_connect_auth_lst(cls):
+    def mysql_connect_auth_lst(cls): # DATA
         conn = pymysql.connect(**MYSQL_DATA_CONF)
         return conn
 
     @classmethod
-    def mysql_connect(cls):
+    def mysql_connect(cls): # local database
         conn = pymysql.connect(user="huangyifan",
                                password="123456",
                                host="127.0.0.1",
@@ -49,7 +52,7 @@ class SpApiRequest:
         return conn
 
     @staticmethod
-    def auth_info():
+    def auth_info(): # get Auth-data from all of shops
         auth_conn = SpApiRequest.mysql_connect_auth()
         cursor = auth_conn.cursor()
         cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
@@ -60,13 +63,13 @@ class SpApiRequest:
 
 
     @classmethod
-    def get_refreshtoken(cls):
+    def get_refreshtoken(cls): # accroding to differnt shop get diffrent refreshtoken
         df = cls.auth_info()
         refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
         return refreshtoken_list
 
     @classmethod
-    def get_catelog(cls,account_name,country=Marketplaces.US,asin=None):
+    def get_catelog(cls,account_name,country=Marketplaces.US,asin=None): # desprecated
         if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
             region = 'NA'
         elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE,  Marketplaces.PL,
@@ -97,7 +100,7 @@ class SpApiRequest:
         title = title_info.payload['summaries'][0]['itemName']
         return {'images':images,'title':title}
 
-    def create_report(self,**kwargs):
+    def create_report(self,**kwargs): # Main-CreateReport-Function
         reportType = kwargs['reportType']
         reportOptions =kwargs.get("reportOptions")
 
@@ -113,7 +116,7 @@ class SpApiRequest:
         # print(reportId)
         return reportId
 
-    def decompression(self,reportId):
+    def decompression(self,reportId): # After-CreateReportFunc-simpleDeal
         report = Reports(credentials=self.credentials, marketplace=self.marketplace)
         while True:
             reportId_info = report.get_report(reportId=reportId)
@@ -140,12 +143,12 @@ class SpApiRequest:
             elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
                 print(reportId_info)
                 print("取消或失败")
-                break
+                return pd.DataFrame()
             time.sleep(15)
             print("please wait...")
 
 
-    def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None):
+    def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # Not be used
         start = time.time()
         para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
         reportid = self.create_report(**para)
@@ -225,7 +228,8 @@ class SpApiRequest:
         end =time.time()
         print("duration:",end-start)
         return decom_df
-    def data_deal(self, decom_df, seller_id):
+
+    def data_deal(self, decom_df, seller_id): # desprecated
         decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
         url_columns = [i for i in decom_df.columns if "url" in i.lower()]
         if len(url_columns) > 0:
@@ -262,8 +266,47 @@ class SpApiRequest:
         # print(decom_df.info())
         return decom_df
 
+    def GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(self,refresh_token,conn=None,seller_id=None,days=-1,**kwargs):
+        try:
+            para = {"reportType": ReportType.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA}
+            reportid = self.create_report(**para)
+
+            df = self.decompression(reportid)
+            if len(df)==0:
+                return pd.DataFrame()
+            df['seller_id'] = seller_id
+            df['marketplace_id'] = self.marketplace.marketplace_id
+            df['country_code'] = str(self.marketplace)[-2:]
+            df_rel = df.query("condition=='New'")
+            df_rel = df_rel.groupby(['asin', 'sku', 'seller_id', 'marketplace_id', 'country_code']).agg({'afn-fulfillable-quantity':sum}).reset_index()
+            df_rel.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_','afn-fulfillable-quantity']
+            print(f"{seller_id}_{str(self.marketplace)[-2:]}_FBA_Inventory_OK")
+        except Exception as e:
+            print(e)
+            try:
+                time.sleep(15)
+                para = {"reportType": ReportType.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA}
+                reportid = self.create_report(**para)
+
+                df = self.decompression(reportid)
+                if len(df) == 0:
+                    return pd.DataFrame()
+                df['seller_id'] = seller_id
+                df['marketplace_id'] = self.marketplace.marketplace_id
+                df['country_code'] = str(self.marketplace)[-2:]
+                df_rel = df.query("condition=='New'")
+                df_rel = df_rel.groupby(['asin', 'sku', 'seller_id', 'marketplace_id', 'country_code']).agg(
+                    {'afn-fulfillable-quantity': sum}).reset_index()
+                df_rel.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_','afn-fulfillable-quantity']
+                print(f"{seller_id}_{str(self.marketplace)[-2:]}_FBA_Inventory_OK")
+            except Exception as e:
+                print(e)
+                df_rel = pd.DataFrame(columns=['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_','afn-fulfillable-quantity'])
+        return df_rel
+
+
 
-    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,refresh_token,conn=None,seller_id=None,days=-1):
+    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,refresh_token,conn=None,seller_id=None,days=-1): # To datatable asj_ads.seller_listings
         para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
         reportid = self.create_report(**para)
         df = self.decompression(reportid)
@@ -288,7 +331,9 @@ class SpApiRequest:
                 df['product-id'] = ''
 
             # 空值处理
-            df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
+            # df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
+            df['quantity'] = df['quantity'].map(lambda x:0 if pd.isna(x)==True else int(x))
+            df['quantity'] = df['quantity'].astype('int64')
             df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
             df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
             df.fillna('',inplace=True)
@@ -310,7 +355,6 @@ class SpApiRequest:
             col_name = [i[0] for i in cursor.description]
             df_datatable = pd.DataFrame(query_, columns=col_name)
             merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
-            # print(merged_df.head())
 
             def func_(asin,asin1,product_id,cred,market_p,seller_id,sku):
                 if 'B0' in str(product_id)[:3]:
@@ -337,12 +381,6 @@ class SpApiRequest:
                     return asin
 
             merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
-            # merged_df.to_csv("tmp.csv")
-            # merged_df = merged_df.loc[:10,:].copy()
-
-            # print("获取listing Info...")
-            # merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1)
-            # merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True)
             merged_df['image_link'] = ''
             merged_df['title'] = refresh_token
             merged_df['modifier'] = ''
@@ -350,7 +388,15 @@ class SpApiRequest:
 
             merged_df.fillna('',inplace=True)
             df1 = merged_df.copy()
-            # print(df1[origin_columns].head(1))
+            df_fbaInventory = self.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token, conn, seller_id, days)
+
+            if len(df_fbaInventory)>0:
+                df1 = df1.merge(df_fbaInventory,how='left',left_on=['asin1','seller-sku','seller_id','marketplace_id','country_code'],right_on=['asin_','sku_','seller_id_','marketplace_id_','country_code_'])
+                df1['quantity'] = df1.apply(lambda x:x['afn-fulfillable-quantity'] if x['fulfillment_channel']=='FBA' or pd.isna(['afn-fulfillable-quantity'])==False else x['quantity'],axis=1)
+
+            df1['quantity'] = df1['quantity'].map(lambda x:0 if pd.isna(x) else int(x))
+            df1['quantity'] = df1['quantity'].fillna(0)
+
             update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
             if len(update_df)==0:
                 return '无更新数据插入'
@@ -367,13 +413,17 @@ class SpApiRequest:
                 cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
                 conn.commit()
                 print("插入完成")
+
+                delete_sql = f"""delete from asj_ads.seller_listings where update_datetime<'{(datetime.today()+timedelta(days=-3)).date().isoformat()}'"""
+                cursor.execute(delete_sql)
+                conn.commit()
                 return '插入完成'
             except Exception as e:
                 print("插入错误:",e)
                 conn.rollback()
                 return '出错回滚'
 
-    def get_listing_info(self, sku,seller_id):
+    def get_listing_info(self, sku,seller_id): # desprecated
         listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace)
         try:
             r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
@@ -383,7 +433,6 @@ class SpApiRequest:
             item_name ='###' if item_name==None else item_name
             img = json_content.get("mainImage")
             img_url = '###' if img is None else img.get("link")
-            # print(str(img_url)+"-----"+ str(item_name))
             return str(img_url)+"-----"+ str(item_name)
         except Exception as e:
             try:
@@ -402,7 +451,7 @@ class SpApiRequest:
                 print(e)
                 return "###-----###"
 
-    def datetime_deal(self,timestring):
+    def datetime_deal(self,timestring): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, time deal
         timezone_ = {"AEST":"Australia/Sydney","AEDT":"Australia/Sydney","PST":"America/Los_Angeles",
                     "PDT":"America/Los_Angeles","CST":"America/Chicago","CDT":"America/Chicago",
                     "MET":"MET","MEST":"MET","BST":"Europe/London","GMT":"GMT","CET":"CET",
@@ -442,7 +491,7 @@ class SpApiRequest:
                     print(e1)
                     return datetime(1999,12,31,0,0,0)
 
-    def update_data(self,df,seller_id,country_code,conn):
+    def update_data(self,df,seller_id,country_code,conn): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, data compare
         conn = SpApiRequest.mysql_connect_auth_lst()
         cursor = conn.cursor()
         columns = ['listing-id', 'seller_id',
@@ -520,7 +569,7 @@ class SpApiRequest:
 
 
 
-    def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id,days=-2):
+    def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id,days=-2): # not be used
         shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
         # print(shopReportday)
         para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
@@ -533,7 +582,7 @@ class SpApiRequest:
 
 
 
-    def GET_SALES_AND_TRAFFIC_REPORT(self, refresh_token,seller_id,days=-2,**kwargs):
+    def GET_SALES_AND_TRAFFIC_REPORT(self, refresh_token,seller_id,days=-2,**kwargs): # To datatable asj_ads.SalesAndTrafficByAsin
         # ,level:Literal["PARENT","CHILD","SKU"]="PARENT")
         level = "PARENT" if len(kwargs.get("level"))==0 else kwargs.get("level")
         countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode")
@@ -605,7 +654,7 @@ class SpApiRequest:
             conn.rollback()
             print(e)
 
-    def sales_traffic_datadeal(self,data,seller_id,countryCode):
+    def sales_traffic_datadeal(self,data,seller_id,countryCode): # used in GET_SALES_AND_TRAFFIC_REPORT
         data = eval(data)
         if len(data['salesAndTrafficByAsin'])==0:
             return []
@@ -679,8 +728,7 @@ class SpApiRequest:
         return data_list
 
 
-    def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self, refresh_token,seller_id,days=-1,**kwargs): #refresh_token,seller_id,days,**a_kw
-        # timezone_ = pytz.timezone(self.timezone)
+    def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self, refresh_token,seller_id,days=-1,**kwargs): # To datatable asj_ads.orderReport_
         countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode")
         shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
         print(shopReportday)
@@ -778,7 +826,8 @@ class SpApiRequest:
         except Exception as e:
             conn.rollback()
             print(e)
-    def timeDeal(self, orgTime):
+
+    def timeDeal(self, orgTime): # time-deal
         orgTime = parse(orgTime)
         timezone = pytz.timezone("UTC")
         shopTime = orgTime.astimezone(timezone)
@@ -788,7 +837,7 @@ class SpApiRequest:
 
 
     @classmethod
-    def listing_infoTable(cls):
+    def listing_infoTable(cls): # To datatable asj_ads.Goods
         conn = SpApiRequest.mysql_connect_auth_lst()
         cursor = conn.cursor()
         cursor.execute(f"""select seller_id,country_code,asin,title from asj_ads.seller_listings where title is not null and title <>'' and (seller_id,country_code,asin) not in (select seller_id,countryCode,asin from asj_ads.Goods where update_time>='{datetime.today().date()}') group by seller_id,title,country_code,asin order by country_code desc""")
@@ -800,6 +849,7 @@ class SpApiRequest:
         distance = 50
         print(len(df_datatable))
         while count<len(df_datatable):
+            print(f"进度:{round(count / len(df_datatable) * 100, 2)}%")
             df = df_datatable.iloc[count:count+distance,:]
             count = count+distance
             df['detail_info'] = df.apply(lambda x: cls.get_listing_info01(x['title'],x['country_code'],x['asin'],x['seller_id']),axis=1)
@@ -826,7 +876,7 @@ class SpApiRequest:
                     conn.commit()
                     # print(delete_list)
                     # print("进行中...")
-                    print(f"进度:{round(count/len(df_datatable)*100,2)}%")
+
                 except Exception as e:
                     print(e)
                     conn.rollback()
@@ -834,7 +884,7 @@ class SpApiRequest:
         conn.close()
 
     @staticmethod
-    def get_listing_info01(refresh_token, countryCode, asin, seller_id):
+    def get_listing_info01(refresh_token, countryCode, asin, seller_id): # used in listing_infoTable
         # print(refresh_token)
         aws_credentials = {
             'refresh_token': refresh_token,
@@ -867,7 +917,6 @@ class SpApiRequest:
             time.sleep(2.5)
             cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode], version='2020-12-01')
             detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode)
-        # print(countryCode,asin,detail_info,variations_info,)
         detail_info.update(variations_info)
         detail_info['asin'] = asin
         detail_info['countryCode'] = countryCode
@@ -877,7 +926,7 @@ class SpApiRequest:
         return detail_info
 
     @staticmethod
-    def variations_judge(cate_item, asin):
+    def variations_judge(cate_item, asin): # used in listing_infoTable
         try:
             variations = cate_item.get_catalog_item(asin=asin, **{"includedData": ['relationships']})#'variations',
             var_info = variations.payload
@@ -935,7 +984,7 @@ class SpApiRequest:
                 return {"IsParent": 'Erro', "parent_asin": 'Erro'}
 
     @staticmethod
-    def Goods_insert(conn,detail_info_v,detail_info_k):
+    def Goods_insert(conn,detail_info_v,detail_info_k): # To datatable asj.Goods
         try:
             cursor = conn.cursor()
         except:
@@ -978,7 +1027,7 @@ class SpApiRequest:
             conn.rollback()
 
     @staticmethod
-    def get_detail_cat(cate_item, asin, mak, countryCode):
+    def get_detail_cat(cate_item, asin, mak, countryCode): # used in listing_infoTable, category sp-api
         try:
             detail_info = cate_item.get_catalog_item(asin=asin, **{
                 "includedData": ["images,productTypes,salesRanks,summaries"],
@@ -1022,8 +1071,7 @@ class SpApiRequest:
                     'brandName': '', 'browseNode': '', 'itemName': ''}
 
     @staticmethod
-    def data_judge_secondTry(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs):
-        # print(kwargs)
+    def data_judge_secondTry(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # Main-retry
         a_kw = kwargs
         try:
             SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw)
@@ -1037,10 +1085,11 @@ class SpApiRequest:
             except Exception as e:
                 print(e)
                 print('twice time to try...')
-                time.sleep(15)
+                time.sleep(20)
                 SpApiRequest.data_judge(refresh_token, sp_api, data_type, seller_id, auth_conn, days=days, **a_kw)
+
     @staticmethod
-    def data_judge(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs):
+    def data_judge(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # select Report type
         a_kw = kwargs
         if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA":
             return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(refresh_token,auth_conn,seller_id,days)
@@ -1052,14 +1101,16 @@ class SpApiRequest:
             return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(refresh_token,seller_id,days)
         elif data_type =="GET_SALES_AND_TRAFFIC_REPORT":
             return sp_api.GET_SALES_AND_TRAFFIC_REPORT(refresh_token,seller_id,days,**a_kw)
+        elif data_type == "GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA":
+            return sp_api.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token,auth_conn,seller_id,days,**a_kw)
         else:
             return ""
 
     @classmethod
-    def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"],days=-1,**kwargs):
+    def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"],days=-1,**kwargs): # Main-AllCountries-AuthAndPost
         df = cls.auth_info()
         zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
-        print(zosi)
+        # print(zosi)
         refreshtoken_list = df.query("account_name!='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
         zosi.extend(refreshtoken_list)
         refreshtoken_list = zosi.copy()
@@ -1085,8 +1136,9 @@ class SpApiRequest:
                 for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
                     sp_api = SpApiRequest(aws_credentials, marketplace)
                     a_kw['countryCode'] = str(marketplace)[-2:]
-                    print(a_kw)
+                    # print(a_kw)
                     try:
+                        print("refresh_token:",refresh_token,'data_type:',data_type,'seller_id:',seller_id,'marketplace:',marketplace.marketplace_id,'country_code:',a_kw['countryCode'],sep='\n')
                         auth_conn = SpApiRequest.mysql_connect_auth()
                         # print(a_kw)
                         cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
@@ -1126,4 +1178,5 @@ if __name__ == '__main__':
     # SpApiRequest.listing_infoTable()
     # rel = SpApiRequest.get_catelog(account_name='ANLAPUS_US',country=Marketplaces.US,asin='B0BVXB4KT9')
     # print(rel)
+    SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA")
     pass

+ 1 - 1
sync_get_order_data.py

@@ -11,7 +11,7 @@ def func_run():
     except Exception as e:
         print(e)
     try:
-        for days in (-2,-3): #range(-29,-2):#
+        for days in (-2,-3):
            sp_api_client.SpApiRequest.get_allShops("GET_SALES_AND_TRAFFIC_REPORT",days=days,**{"level":"SKU"})
            sp_api_client.SpApiRequest.get_allShops("GET_SALES_AND_TRAFFIC_REPORT",days=days,**{"level":"PARENT"})
            sp_api_client.SpApiRequest.get_allShops("GET_SALES_AND_TRAFFIC_REPORT",days=days,**{"level":"CHILD"})