huangyifan 1 gadu atpakaļ
vecāks
revīzija
94e80ab11c

+ 7 - 7
start_sync_amz_RightNowRun.py

@@ -95,13 +95,13 @@ if __name__ == '__main__':
     AWS_CREDENTIALS['refresh_token'] = refresh_token
     # amz_report(conn, AWS_CREDENTIALS=AWS_CREDENTIALS)
 
-    list_date = ["2024-01-12","2024-01-11"]
-    # list_date = [f'2023-11-{"0"+str(i) if len(str(i))==1 else i}' for i in range(27,30)]
-    # print(list_date)
-    for date_ in list_date:
-        print(date_)
-        print(date_.replace("-",""))
-        amz_report(conn,AWS_CREDENTIALS,para={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+    # list_date = ["2024-01-12","2024-01-11"]
+    # list_date = [f'2024-02-{"0"+str(i) if len(str(i))==1 else i}' for i in range(12,18)]
+    # # print(list_date)
+    # for date_ in list_date:
+    #     print(date_)
+    #     print(date_.replace("-",""))
+    #     amz_report(conn,AWS_CREDENTIALS,para={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
 
 
     conn.close()

+ 658 - 353
sync_amz_data/public/sp_api_client.py

@@ -16,7 +16,7 @@ import time
 from dateutil.parser import parse
 import pymysql
 from typing import List, Literal
-
+from random import shuffle
 
 class SpApiRequest:
 
@@ -39,11 +39,16 @@ class SpApiRequest:
 
     @classmethod
     def mysql_connect_auth_lst(cls):
-        conn = pymysql.connect(user="huangyifan",
-                               password="123456",
-                               host="127.0.0.1",
-                               database="amz_sp_api",
+        conn = pymysql.connect(user="root",
+                               password="sandbox",
+                               host="192.168.1.225",
+                               database="asj_ads",
                                port=3306)
+        # conn = pymysql.connect(user="huangyifan",
+        #                        password="123456",
+        #                        host="127.0.0.1",
+        #                        database="amz_sp_api",
+        #                        port=3306)
         return conn
 
     @classmethod
@@ -55,14 +60,22 @@ class SpApiRequest:
                                port=3306)
         return conn
 
+    @staticmethod
+    def auth_info():
+        auth_conn = SpApiRequest.mysql_connect_auth()
+        cursor = auth_conn.cursor()
+        cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
+        columns_name = [i[0] for i in cursor.description]
+        rel = cursor.fetchall()
+        df = pd.DataFrame(rel, columns=columns_name)
+        return df
+
+
     @classmethod
-    def mysql_adTest_connect(cls):
-        conn = pymysql.connect(user="root",
-                               password="sandbox",
-                               host="192.168.1.225",
-                               database="asj_ads",
-                               port=3306)
-        return conn
+    def get_refreshtoken(cls):
+        df = cls.auth_info()
+        refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
+        return refreshtoken_list
 
     @classmethod
     def get_catelog(cls,account_name,country=Marketplaces.US,asin=None):
@@ -144,42 +157,6 @@ class SpApiRequest:
             print("please wait...")
 
 
-    def data_deal(self,decom_df,seller_id):
-        decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
-        url_columns = [i for i in decom_df.columns if "url" in i.lower()]
-        if len(url_columns) > 0:
-            decom_df[url_columns] = decom_df[url_columns].astype("string")
-        asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()]
-        if len(asin_columns) > 0:
-            decom_df[asin_columns] = decom_df[asin_columns].astype("string")
-        if 'pending-quantity' in decom_df.columns:
-            decom_df['pending-quantity'] = decom_df['pending-quantity'].map(
-                lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32")
-        deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()]
-        decom_df.drop(columns=deletecolumns, inplace=True)
-        if 'quantity' in decom_df.columns:
-            decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype(
-                "int32")
-        decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x))
-        if 'add-delete' in decom_df.columns:
-            decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore')
-        if 'will-ship-internationally' in decom_df.columns:
-            decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',errors='ignore')
-        if 'expedited-shipping' in decom_df.columns:
-            decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string',errors='ignore')
-        decom_df['updateTime'] = datetime.now()
-        decom_df['timezone'] = "UTC"
-        decom_df['seller_id'] = seller_id
-        #
-        decom_df['item-description'] = decom_df['item-description'].str.slice(0,500)
-        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
-        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
-        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
-            'string')
-        decom_df.fillna('', inplace=True)
-        # print(decom_df.info())
-        return decom_df
-
     def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None):
         start = time.time()
         para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
@@ -260,6 +237,153 @@ class SpApiRequest:
         end =time.time()
         print("duration:",end-start)
         return decom_df
+    def data_deal(self, decom_df, seller_id):
+        decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
+        url_columns = [i for i in decom_df.columns if "url" in i.lower()]
+        if len(url_columns) > 0:
+            decom_df[url_columns] = decom_df[url_columns].astype("string")
+        asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()]
+        if len(asin_columns) > 0:
+            decom_df[asin_columns] = decom_df[asin_columns].astype("string")
+        if 'pending-quantity' in decom_df.columns:
+            decom_df['pending-quantity'] = decom_df['pending-quantity'].map(
+                lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32")
+        deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()]
+        decom_df.drop(columns=deletecolumns, inplace=True)
+        if 'quantity' in decom_df.columns:
+            decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype(
+                "int32")
+        decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x))
+        if 'add-delete' in decom_df.columns:
+            decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore')
+        if 'will-ship-internationally' in decom_df.columns:
+            decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',
+                                                                                                 errors='ignore')
+        if 'expedited-shipping' in decom_df.columns:
+            decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string', errors='ignore')
+        decom_df['updateTime'] = datetime.now()
+        decom_df['timezone'] = "UTC"
+        decom_df['seller_id'] = seller_id
+        #
+        decom_df['item-description'] = decom_df['item-description'].str.slice(0, 500)
+        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
+        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
+        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
+            'string')
+        decom_df.fillna('', inplace=True)
+        # print(decom_df.info())
+        return decom_df
+
+
+    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,refresh_token,conn=None,seller_id=None,days=-1):
+        para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
+        reportid = self.create_report(**para)
+        df = self.decompression(reportid)
+        if len(df)>0:
+            if self.marketplace.marketplace_id =='A1VC38T7YXB528':
+                df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description',
+                              'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status']
+
+            df['seller_id'] = seller_id
+            df['marketplace_id'] = self.marketplace.marketplace_id
+            df['country_code'] = str(self.marketplace)[-2:]
+            if 'fulfilment-channel' in df.columns:
+                print("changed fulfilment-channel:")
+                print(seller_id,self.marketplace)
+                df['fulfillment-channel'] = df['fulfilment-channel'].copy()
+            df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
+            df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x)
+
+            if 'asin1' not in df.columns:
+                df['asin1'] = ''
+            if 'product-id' not in df.columns:
+                df['product-id'] = ''
+
+            # 空值处理
+            df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
+            df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
+            df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
+            df.fillna('',inplace=True)
+
+            # 时间处理
+            df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x))
+            df['update_datetime'] = datetime.now(pytz.UTC).date()
+
+            origin_columns = ['listing-id','seller_id',
+                              'asin1','seller-sku','title','image_link','country_code',
+                              'marketplace_id','quantity','fulfillment_channel',
+                              'price','opendate','status','update_datetime','product-id','product-id-type','modifier'
+                              ]
+            conn = SpApiRequest.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+            cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings where asin is not null 
+                                and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
+            query_ = cursor.fetchall()
+            col_name = [i[0] for i in cursor.description]
+            df_datatable = pd.DataFrame(query_, columns=col_name)
+            merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
+            # print(merged_df.head())
+
+            def func_(asin,asin1,product_id,cred,market_p,seller_id,sku):
+                if 'B0' in str(product_id)[:3]:
+                    return str(product_id)
+                if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''):
+                    if 'B0' in asin[:3]:
+                        return asin
+                elif (pd.isna(asin1)==False and asin1!=''):
+                    if 'B0' in asin1[:3]:
+                        return asin1
+
+                listingClient = ListingsItems(credentials=cred, marketplace=market_p)
+                try:
+                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
+                    print(r1.payload)
+                    asin = r1.payload.get("summaries")[0].get("asin")
+                    return asin
+                except Exception as e:
+                    print("获取图片url过程错误重试, 错误message: ", e)
+                    time.sleep(3)
+                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
+                    print(r1.payload)
+                    asin = r1.payload.get("summaries")[0].get("asin")
+                    return asin
+
+            merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
+            # merged_df.to_csv("tmp.csv")
+            # merged_df = merged_df.loc[:10,:].copy()
+
+            # print("获取listing Info...")
+            # merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1)
+            # merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True)
+            merged_df['image_link'] = ''
+            merged_df['title'] = ''
+            merged_df['modifier'] = refresh_token
+
+
+            merged_df.fillna('',inplace=True)
+            df1 = merged_df.copy()
+            # print(df1[origin_columns].head(1))
+            update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
+            if len(update_df)==0:
+                return '无更新数据插入'
+            # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"})
+            conn = SpApiRequest.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+
+            try:
+                insertsql = """insert into
+                asj_ads.seller_listings(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity,
+                        fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type,modifier)
+                values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
+                conn.begin()
+                cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
+                conn.commit()
+                print("插入完成")
+                return '插入完成'
+            except Exception as e:
+                print("插入错误:",e)
+                conn.rollback()
+                return '出错回滚'
 
     def get_listing_info(self, sku,seller_id):
         listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace)
@@ -290,22 +414,11 @@ class SpApiRequest:
                 print(e)
                 return "###-----###"
 
-
     def datetime_deal(self,timestring):
-        timezone_ = {"AEST":"Australia/Sydney",
-                    "AEDT":"Australia/Sydney",
-                    "PST":"America/Los_Angeles",
-                    "PDT":"America/Los_Angeles",
-                    "CST":"America/Chicago",
-                    "CDT":"America/Chicago",
-                    "MET":"MET",
-                    "MEST":"MET",
-                    "BST":"Europe/London",
-                    "GMT":"GMT",
-                    "CET":"CET",
-                    "CEST":"CET",
-                    "JST":"Asia/Tokyo",
-                    "BRT":"America/Sao_Paulo"}
+        timezone_ = {"AEST":"Australia/Sydney","AEDT":"Australia/Sydney","PST":"America/Los_Angeles",
+                    "PDT":"America/Los_Angeles","CST":"America/Chicago","CDT":"America/Chicago",
+                    "MET":"MET","MEST":"MET","BST":"Europe/London","GMT":"GMT","CET":"CET",
+                    "CEST":"CET","JST":"Asia/Tokyo","BRT":"America/Sao_Paulo"}
 
         date_list = str.split(timestring,sep = ' ')
         if len(date_list)<3:
@@ -360,7 +473,7 @@ class SpApiRequest:
         marketplace_id = self.marketplace.marketplace_id
         try:
             cursor.execute(f"""select * from
-                                amz_sp_api.seller_listings where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""")
+                                asj_ads.seller_listings where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""")
             col = [i[0] for i in cursor.description]
             query_rel = cursor.fetchall()
             df_rel = pd.DataFrame(query_rel, columns=col)
@@ -371,8 +484,7 @@ class SpApiRequest:
 
             df['quantity'] = df['quantity'].fillna(0).astype('int64')
             df['price']= df['price'].fillna(0.0).astype('float64')
-            # print(df_rel.dtypes)
-            # print(df[columns].dtypes)
+
             row = 0
             while row < len(df):
                 temp_df = df.iloc[row, :]
@@ -386,8 +498,9 @@ class SpApiRequest:
                 product_id = temp_df['product-id']
                 title = temp_df['title']
                 imageurl = temp_df['image_link']
-                temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl""")
-                print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
+                modifier = temp_df['modifier']
+                temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl and modifier==@modifier""")
+                # print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
                 if len(temp)>1:
                     # temp = temp.head(1).to_numpy().tolist()
                     df_data = df_data.append(temp_df, ignore_index=True)
@@ -402,12 +515,12 @@ class SpApiRequest:
             try:
                 # print(tuple(delete_list))
                 if len(delete_list)>0:
-                    query = f"""delete from amz_sp_api.seller_listings 
+                    query = f"""delete from asj_ads.seller_listings 
                            where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s"""
                     cursor.execute(query,(delete_list,))
 
                     conn.commit()
-                    print(delete_list)
+                    # print(delete_list)
                     print("进行中...")
             except Exception as e:
                 print(e)
@@ -418,160 +531,506 @@ class SpApiRequest:
             return df
 
 
-    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,conn=None,seller_id=None):
-        para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
-        reportid = self.create_report(**para)
-        df = self.decompression(reportid)
-        if len(df)>0:
-            if self.marketplace.marketplace_id =='A1VC38T7YXB528':
-                df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description',
-                              'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status']
 
-            df['seller_id'] = seller_id
-            df['marketplace_id'] = self.marketplace.marketplace_id
-            df['country_code'] = str(self.marketplace)[-2:]
-            if 'fulfilment-channel' in df.columns:
-                print("changed fulfilment-channel:")
-                print(seller_id,self.marketplace)
-                df['fulfillment-channel'] = df['fulfilment-channel'].copy()
-            df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
-            df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x)
+    def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id,days=-2):
+        shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
+        # print(shopReportday)
+        para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
+                "dataStartTime": shopReportday, "dataEndTime": shopReportday,
+                }
+        reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
+        decom_df = self.decompression(reportid)
+        print(decom_df)
+        # print(decom_df.columns)
 
-            if 'asin1' not in df.columns:
-                df['asin1'] = ''
-            if 'product-id' not in df.columns:
-                df['product-id'] = ''
 
-            # 空值处理
-            df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
-            df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
-            df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
-            df.fillna('',inplace=True)
 
-            # 时间处理
-            df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x))
-            df['update_datetime'] = datetime.now(pytz.UTC).date()
+    def GET_SALES_AND_TRAFFIC_REPORT(self, refresh_token,seller_id,days=-2,**kwargs):
+        # ,level:Literal["PARENT","CHILD","SKU"]="PARENT")
+        level = "PARENT" if len(kwargs.get("level"))==0 else kwargs.get("level")
+        countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode")
+        # print(level)
+        shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
+        print(shopReportday,countryCode,seller_id)
+        try:
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        except:
+            time.sleep(5)
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where data_date='{shopReportday}' and countryCode='{countryCode}'"""
+        print(query_judge)
+        cursor.execute(query_judge)
+        rel = cursor.fetchall()
+        # print()
+        if rel[0][0]!=0:
+            return '已存在'
+        # print(shopReportday)
+        para = {"reportType": ReportType.GET_SALES_AND_TRAFFIC_REPORT,
+                "dataStartTime": shopReportday, "dataEndTime": shopReportday,
+                "reportOptions":{"dateGranularity":"DAY","asinGranularity":level}
+                }
+        reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
+        decom_df = self.decompression(reportid)
+        # print(decom_df.columns[0])
+        data_rel = self.sales_traffic_datadeal(decom_df.columns[0],seller_id,countryCode)
+        try:
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        except:
+            time.sleep(5)
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        # print(list(conn.query("select * from amz_sp_api.orderReport")))
+        sql = f"""
+            insert into asj_ads.SalesAndTrafficByAsin(data_date,data_marketpalceId,parent_asin,
+                childAsin,sku,sBA_unitsOrdered,sBA_unitsOrderedB2B,sBA_amount,
+                currencyCode,totalOrderItems,totalOrderItemsB2B,tBA_browserSessions,
+                tBA_browserSessionsB2B,tBA_mobileAppSessions,tBA_mobileAppSessionsB2B,
+                tBA_sessions,tBA_sessionsB2B,tBA_browserSessionPercentage,
+                tBA_browserSessionPercentageB2B,tBA_mobileAppSessionPercentage,
+                tBA_mobileAppSessionPercentageB2B,tBA_sessionPercentage,
+                tBA_sessionPercentageB2B,tBA_browserPageViews,tBA_browserPageViewsB2B,
+                tBA_mobileAppPageViews,tBA_mobileAppPageViewsB2B,tBA_pageViews,
+                tBA_pageViewsB2B,tBA_browserPageViewsPercentage,tBA_browserPageViewsPercentageB2B,
+                tBA_mobileAppPageViewsPercentage,tBA_mobileAppPageViewsPercentageB2B,
+                tBA_pageViewsPercentage,tBA_pageViewsPercentageB2B,tBA_buyBoxPercentage,
+                tBA_buyBoxPercentageB2B,tBA_unitSessionPercentage,tBA_unitSessionPercentageB2B,seller_id,countryCode)
+            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s)
+        """  # ok
+        try:
+            conn.begin()
+            cursor.executemany(sql, data_rel)
+            conn.commit()
+            print("插入完成")
+            conn.close()
+            time.sleep(1)
+        except Exception as e:
+            conn.rollback()
+            print(e)
 
-            origin_columns = ['listing-id','seller_id',
-                              'asin1','seller-sku','title','image_link','country_code',
-                              'marketplace_id','quantity','fulfillment_channel',
-                              'price','opendate','status','update_datetime','product-id','product-id-type'
-                              ]
-            conn = SpApiRequest.mysql_connect_auth_lst()
+    def sales_traffic_datadeal(self,data,seller_id,countryCode):
+        data = eval(data)
+        if len(data['salesAndTrafficByAsin'])==0:
+            return []
+        data_list = []
+        data_date = data["reportSpecification"]["dataEndTime"]
+        data_marketpalceId = data["reportSpecification"]["marketplaceIds"][0]
+        # print(data_marketpalceId)
+        for single_item in data["salesAndTrafficByAsin"]:
+            # print(single_item)
+            parent_asin = single_item.get("parentAsin")
+            childAsin = single_item.get("childAsin")
+            sku = single_item.get("sku")
+            salesByAsin = single_item.get("salesByAsin")
+            # if salesByAsin is not None:
+            sBA_unitsOrdered = salesByAsin.get("unitsOrdered") if salesByAsin is not None else ''
+            sBA_unitsOrderedB2B = salesByAsin.get("unitsOrderedB2B") if salesByAsin is not None else ''
+            orderedProductSales = salesByAsin.get("orderedProductSales")
+            sBA_amount = orderedProductSales.get("amount") if orderedProductSales is not None else ''
+            currencyCode = orderedProductSales.get("currencyCode") if orderedProductSales is not None else ''
+            orderedProductSalesB2B = salesByAsin.get("orderedProductSalesB2B") if salesByAsin is not None else None
+
+            # if orderedProductSalesB2B is not None:
+            oPS_amount = orderedProductSalesB2B.get("amount") if orderedProductSalesB2B is not None else ''
+            totalOrderItems = salesByAsin.get("totalOrderItems") if salesByAsin is not None else ''
+            totalOrderItemsB2B = salesByAsin.get("totalOrderItemsB2B") if salesByAsin is not None else ''
+            trafficByAsin = single_item.get("trafficByAsin")
+            # if trafficByAsin is not None:
+            tBA_browserSessions = trafficByAsin.get("browserSessions") if trafficByAsin is not None else ''
+            tBA_browserSessionsB2B = trafficByAsin.get("browserSessionsB2B") if trafficByAsin is not None else ''
+            tBA_mobileAppSessions = trafficByAsin.get("mobileAppSessions") if trafficByAsin is not None else ''
+            tBA_mobileAppSessionsB2B = trafficByAsin.get("mobileAppSessionsB2B") if trafficByAsin is not None else ''
+            tBA_sessions = trafficByAsin.get("sessions") if trafficByAsin is not None else ''
+            tBA_sessionsB2B = trafficByAsin.get("sessionsB2B") if trafficByAsin is not None else ''
+            tBA_browserSessionPercentage = trafficByAsin.get("browserSessionPercentage") if trafficByAsin is not None else ''
+            tBA_browserSessionPercentageB2B = trafficByAsin.get("browserSessionPercentageB2B") if trafficByAsin is not None else ''
+            tBA_mobileAppSessionPercentage = trafficByAsin.get("mobileAppSessionPercentage") if trafficByAsin is not None else ''
+            tBA_mobileAppSessionPercentageB2B = trafficByAsin.get("mobileAppSessionPercentageB2B") if trafficByAsin is not None else ''
+            tBA_sessionPercentage = trafficByAsin.get("sessionPercentage") if trafficByAsin is not None else ''
+            tBA_sessionPercentageB2B = trafficByAsin.get("sessionPercentageB2B") if trafficByAsin is not None else ''
+            tBA_browserPageViews = trafficByAsin.get("browserPageViews") if trafficByAsin is not None else ''
+            tBA_browserPageViewsB2B = trafficByAsin.get("browserPageViewsB2B") if trafficByAsin is not None else ''
+            tBA_mobileAppPageViews = trafficByAsin.get("mobileAppPageViews") if trafficByAsin is not None else ''
+            tBA_mobileAppPageViewsB2B = trafficByAsin.get("mobileAppPageViewsB2B") if trafficByAsin is not None else ''
+            tBA_pageViews = trafficByAsin.get("pageViews") if trafficByAsin is not None else ''
+            tBA_pageViewsB2B = trafficByAsin.get("pageViewsB2B") if trafficByAsin is not None else ''
+            tBA_browserPageViewsPercentage = trafficByAsin.get("browserPageViewsPercentage") if trafficByAsin is not None else ''
+            tBA_browserPageViewsPercentageB2B = trafficByAsin.get("browserPageViewsPercentageB2B") if trafficByAsin is not None else ''
+            tBA_mobileAppPageViewsPercentage = trafficByAsin.get("mobileAppPageViewsPercentage") if trafficByAsin is not None else ''
+            tBA_mobileAppPageViewsPercentageB2B = trafficByAsin.get("mobileAppPageViewsPercentageB2B") if trafficByAsin is not None else ''
+            tBA_pageViewsPercentage = trafficByAsin.get("pageViewsPercentage") if trafficByAsin is not None else ''
+            tBA_pageViewsPercentageB2B = trafficByAsin.get("pageViewsPercentageB2B") if trafficByAsin is not None else ''
+            tBA_buyBoxPercentage = trafficByAsin.get("buyBoxPercentage") if trafficByAsin is not None else ''
+            tBA_buyBoxPercentageB2B = trafficByAsin.get("buyBoxPercentageB2B") if trafficByAsin is not None else ''
+            tBA_unitSessionPercentage = trafficByAsin.get("unitSessionPercentage") if trafficByAsin is not None else ''
+            tBA_unitSessionPercentageB2B = trafficByAsin.get("unitSessionPercentageB2B") if trafficByAsin is not None else ''
+            data_list.append([data_date,data_marketpalceId,parent_asin,
+                childAsin,sku,sBA_unitsOrdered,sBA_unitsOrderedB2B,sBA_amount,
+                currencyCode,totalOrderItems,totalOrderItemsB2B,tBA_browserSessions,
+                tBA_browserSessionsB2B,tBA_mobileAppSessions,tBA_mobileAppSessionsB2B,
+                tBA_sessions,tBA_sessionsB2B,tBA_browserSessionPercentage,
+                tBA_browserSessionPercentageB2B,tBA_mobileAppSessionPercentage,
+                tBA_mobileAppSessionPercentageB2B,tBA_sessionPercentage,
+                tBA_sessionPercentageB2B,tBA_browserPageViews,tBA_browserPageViewsB2B,
+                tBA_mobileAppPageViews,tBA_mobileAppPageViewsB2B,tBA_pageViews,
+                tBA_pageViewsB2B,tBA_browserPageViewsPercentage,tBA_browserPageViewsPercentageB2B,
+                tBA_mobileAppPageViewsPercentage,tBA_mobileAppPageViewsPercentageB2B,
+                tBA_pageViewsPercentage,tBA_pageViewsPercentageB2B,tBA_buyBoxPercentage,
+                tBA_buyBoxPercentageB2B,tBA_unitSessionPercentage,tBA_unitSessionPercentageB2B,seller_id,countryCode
+                                ])
+        # print(data_list)
+        return data_list
+
+
+    def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self, refresh_token,seller_id,days=-1,**kwargs): #refresh_token,seller_id,days,**a_kw
+        # timezone_ = pytz.timezone(self.timezone)
+        countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode")
+        shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
+        print(shopReportday)
+        try:
+            conn = self.mysql_connect_auth_lst()
             cursor = conn.cursor()
-            cursor.execute("""select product_id,asin from (select * from amz_sp_api.seller_listings where asin is not null 
-                                and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
-            query_ = cursor.fetchall()
-            col_name = [i[0] for i in cursor.description]
-            df_datatable = pd.DataFrame(query_, columns=col_name)
-            merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
-            print(merged_df.head())
+        except:
+            time.sleep(5)
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        query_judge = f"""select count(*) from asj_ads.orderReport where ReportDate='{shopReportday}' and country_code='{countryCode}'"""
+        print(query_judge)
+        cursor.execute(query_judge)
+        rel = cursor.fetchall()
+        # print()
+        if rel[0][0]!=0:
+            print("已存在")
+            return '已存在'
 
-            def func_(asin,asin1,product_id,cred,market_p,seller_id,sku):
-                if 'B0' in str(product_id)[:3]:
-                    return str(product_id)
-                if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''):
-                    if 'B0' in asin[:3]:
-                        return asin
-                elif (pd.isna(asin1)==False and asin1!=''):
-                    if 'B0' in asin1[:3]:
-                        return asin1
+        para = {"reportType": ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,
+                "dataStartTime": shopReportday, "dataEndTime": shopReportday,
+                "reportOptions": {"ShowSalesChannel": "true"}}
+        reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
+        decom_df = self.decompression(reportid)
+        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
+        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
+        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
+            'string')
+        if "purchase-order-number" in decom_df.columns:
+            decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string")
 
-                listingClient = ListingsItems(credentials=cred, marketplace=market_p)
-                try:
-                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                    print(r1.payload)
-                    asin = r1.payload.get("summaries")[0].get("asin")
-                    return asin
-                except Exception as e:
-                    print("获取图片url过程错误重试, 错误message: ", e)
-                    time.sleep(3)
-                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                    print(r1.payload)
-                    asin = r1.payload.get("summaries")[0].get("asin")
-                    return asin
+        decom_df.fillna('', inplace=True)
+        # decom_df.to_csv('order.csv')
+        decom_df["ReportDate"] = parse(shopReportday)
+        # decom_df['timezone'] =  decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
+        decom_df['timezone'] = "UTC"
+        print("==========================================================")
+        decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap(
+            lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x)
+        if 'is-business-order' not in decom_df.columns:
+            decom_df['is-business-order'] = None
+        if 'purchase-order-number' not in decom_df.columns:
+            decom_df['purchase-order-number'] = '-'
+        if 'price-designation' not in decom_df.columns:
+            decom_df['price-designation'] = '-'
 
-            merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
-            # merged_df.to_csv("tmp.csv")
-            # merged_df = merged_df.loc[:10,:].copy()
+        decom_df['seller_id'] = seller_id
+        country_code = str(self.marketplace)[-2:]
+        if country_code == 'GB':
+            country_code = "UK"
+            # decom_df['country_code'] = "UK"
+        decom_df['country_code'] = country_code
+        decom_df['insert_time'] = datetime.now()
+        # print(decom_df[])
+        reserve_columns = ["amazon-order-id", "merchant-order-id", "purchase-date", "last-updated-date", "order-status",
+                           "fulfillment-channel", "sales-channel", "order-channel", "ship-service-level",
+                           "product-name",
+                           "sku", "asin", "item-status", "quantity", "currency", "item-price", "item-tax",
+                           "shipping-price",
+                           "shipping-tax", "gift-wrap-price", "gift-wrap-tax", "item-promotion-discount",
+                           "ship-promotion-discount", "ship-city", "ship-state", "ship-postal-code", "ship-country",
+                           "promotion-ids", "is-business-order", "purchase-order-number", "price-designation",
+                           "ReportDate",
+                           "timezone", "seller_id", "country_code",'insert_time'
+                           ]
+        list_df = decom_df[reserve_columns].to_numpy().tolist()
+        try:
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        except:
+            time.sleep(5)
+            conn = self.mysql_connect_auth_lst()
+            cursor = conn.cursor()
+        # print(list(conn.query("select * from amz_sp_api.orderReport")))
+        sql = f"""
+            insert into asj_ads.orderReport
+            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
+        """  # ok
+        try:
+            conn.begin()
+            cursor.executemany(sql, list_df)
+            conn.commit()
+            print("插入完成")
+            conn.close()
+            time.sleep(1)
+        except Exception as e:
+            conn.rollback()
+            print(e)
+    def timeDeal(self, orgTime):
+        orgTime = parse(orgTime)
+        timezone = pytz.timezone("UTC")
+        shopTime = orgTime.astimezone(timezone)
+        shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute,
+                                     shopTime.second)
+        return shopTime_datetime
 
-            print("获取listing Info...")
-            merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1)
-            merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True)
-            # merged_df['image_link'] = ''
-            # merged_df['title'] = ''
 
+    @classmethod
+    def listing_infoTable(cls):
+        conn = SpApiRequest.mysql_connect_auth_lst()
+        cursor = conn.cursor()
+        cursor.execute(f"""select seller_id,country_code,asin,modifier from asj_ads.seller_listings where modifier is not null and (seller_id,country_code,asin) not in (select seller_id,country_code,asin from asj_ads.Goods where update_time>='{datetime.today().date()}') group by seller_id,modifier,country_code,asin""")
+        query_ = cursor.fetchall()
+        col_name = [i[0] for i in cursor.description]
+        df_datatable = pd.DataFrame(query_, columns=col_name)
+        count=0
+        distance = 50
+        print(len(df_datatable))
+        while count<len(df_datatable):
+            df = df_datatable.iloc[count:count+distance,:]
+            count = count+distance
+            df['detail_info'] = df.apply(lambda x: cls.get_listing_info01(x['modifier'],x['country_code'],x['asin'],x['seller_id']),axis=1)
+            detail_info_k = df['detail_info'].map(lambda x: list(x.keys())).to_numpy().tolist()
+            detail_info_v = df['detail_info'].map(lambda x: list(x.values())).to_numpy().tolist()
 
-            merged_df.fillna('',inplace=True)
-            df1 = merged_df.copy()
-            print(df1[origin_columns].head(1))
-            update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
-            if len(update_df)==0:
-                return '无更新数据插入'
-            # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"})
             conn = SpApiRequest.mysql_connect_auth_lst()
-            cursor = conn.cursor()
+            print(count)
+            SpApiRequest.Goods_insert(conn,detail_info_v,detail_info_k)
+
+            if count%distance==0:
+                cursor = conn.cursor()
+                cursor.execute(
+                    f"""select seller_id,countryCode,asin from asj_ads.Goods where update_time>='{datetime.today().date()}'""")
+                query_d = cursor.fetchall()
+                # print(query_d)
+                try:
+                    # print(tuple(delete_list))
+                    query = f"""delete from asj_ads.Goods where update_time<'{datetime.today().date()}' 
+                        and (seller_id,countryCode,asin) in {query_d}
+                        """ #where (seller_id,country_code) in %s"""
+                    cursor.execute(query)
+
+                    conn.commit()
+                    # print(delete_list)
+                    # print("进行中...")
+                    print(f"进度:{round(count/len(df_datatable)*100,2)}%")
+                except Exception as e:
+                    print(e)
+                    conn.rollback()
+        print("Success")
+        conn.close()
+
+    @staticmethod
+    def get_listing_info01(refresh_token, countryCode, asin, seller_id):
+        aws_credentials = {
+            'refresh_token': refresh_token,
+            'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e',  # 卖家中心里面开发者资料LWA凭证
+            'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
+            'aws_access_key': 'AKIARBAGHTGOZC7544GN',
+            'aws_secret_key': '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',
+            'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
+        }
+        mak = {'AE': Marketplaces.AE, 'BE': Marketplaces.BE, 'DE': Marketplaces.DE,
+               'PL': Marketplaces.PL, 'EG': Marketplaces.EG, 'ES': Marketplaces.ES,
+               'FR': Marketplaces.FR, 'GB': Marketplaces.GB, 'IN': Marketplaces.IN,
+               'IT': Marketplaces.IT, 'NL': Marketplaces.NL, 'SA': Marketplaces.SA,
+               'SE': Marketplaces.SE, 'TR': Marketplaces.TR, 'UK': Marketplaces.UK,
+               'AU': Marketplaces.AU, 'JP': Marketplaces.JP, 'SG': Marketplaces.SG,
+               'US': Marketplaces.US,
+               'BR': Marketplaces.BR, 'CA': Marketplaces.CA, 'MX': Marketplaces.MX
+               }
+
+        cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode])
+        try:
+            variations_info = SpApiRequest.variations_judge(cate_item, asin)
+        except:
+            time.sleep(2.5)
+            variations_info = SpApiRequest.variations_judge(cate_item, asin)
+        try:
+            detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode)
+        except:
+            time.sleep(2.5)
+            detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode)
+        # print(countryCode,asin,detail_info,variations_info,)
+        detail_info.update(variations_info)
+        detail_info['asin'] = asin
+        detail_info['countryCode'] = countryCode
+        detail_info['marketplace_id'] = mak[countryCode].marketplace_id
+        detail_info['seller_id'] = seller_id
+        detail_info['update_time'] = datetime.now()
+        return detail_info
 
+    @staticmethod
+    def variations_judge(cate_item, asin):
+        try:
+            variations = cate_item.get_catalog_item(asin=asin, **{"includedData": ['variations']})
+            var_info = variations.payload
+            IsParent = 'Y'
+            parent_asin = ''
+            if len(var_info['variations']) > 0:
+                variationType = var_info['variations'][0]['variationType']
+                # print(variationType)
+                if variationType == "CHILD":
+                    parent_asin = var_info['variations'][0]['asins']
+                    IsParent = 'N'
+                else:
+                    parent_asin = variations.payload['asin']
+            else:
+                IsParent = 'SG'
+                parent_asin = variations.payload['asin']
+            return {"IsParent": IsParent, "parent_asin": parent_asin}
+        except:
             try:
-                insertsql = """insert into
-                amz_sp_api.seller_listings(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity,
-                        fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type)
-                values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-                conn.begin()
-                cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
-                conn.commit()
-                print("插入完成")
-                return '插入完成'
+                time.sleep(3.5)
+                variations = cate_item.get_catalog_item(asin=asin, **{"includedData": ['variations']})
+                var_info = variations.payload
+                IsParent = 'Y'
+                parent_asin = ''
+                if len(var_info['variations']) > 0:
+                    variationType = var_info['variations'][0]['variationType']
+                    # print(variationType)
+                    if variationType == "CHILD":
+                        parent_asin = var_info['variations'][0]['asins']
+                        IsParent = 'N'
+                    else:
+                        parent_asin = variations.payload['asin']
+                else:
+                    IsParent = 'SG'
+                    parent_asin = variations.payload['asin']
+                return {"IsParent": IsParent, "parent_asin": parent_asin}
             except Exception as e:
-                print("插入错误:",e)
-                conn.rollback()
-                return '出错回滚'
+                print("判断是否为父子asin时出错:", e)
+                return {"IsParent": 'Erro', "parent_asin": 'Erro'}
 
     @staticmethod
-    def auth_info():
-        auth_conn = SpApiRequest.mysql_connect_auth()
-        cursor = auth_conn.cursor()
-        cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
-        columns_name = [i[0] for i in cursor.description]
-        rel = cursor.fetchall()
-        df = pd.DataFrame(rel, columns=columns_name)
-        return df
+    def Goods_insert(conn,detail_info_v,detail_info_k):
+        try:
+            cursor = conn.cursor()
+        except:
+            time.sleep(2.5)
+            conn = SpApiRequest.mysql_connect_auth_lst()
+            cursor = conn.cursor()
 
-    @classmethod
-    def get_orders_allShops(cls):
-        pass
+        try:
+            insertsql = """insert into
+                        asj_ads.Goods(main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time)
+                        values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
+            conn.begin()
+            cursor.executemany(insertsql, tuple(detail_info_v))
+            conn.commit()
+            print("插入完成Goods")
+            # return '插入完成'
+        except Exception as e:
+            print("插入错误Goods:", e)
+            conn.rollback()
+
+        sales_rankData = []
+        for i in detail_info_v:
+            tmp_list = []
+            for j in [2,3,4,5,6,8,11,12,13,14]:
+                tmp_list.extend([i[j]])
+            tmp_list.extend([datetime.now(),datetime.utcnow()])
+            sales_rankData.append(tmp_list)
+        # print(sales_rankData,len(sales_rankData))
+        try:
+            insertsql = """insert into
+                        asj_ads.ProductRank(BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, itemName, asin, countryCode, marketplace_id, seller_id, update_time,time_UTC)
+                        values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
+            conn.begin()
+            cursor.executemany(insertsql, tuple(sales_rankData))
+            conn.commit()
+            print("插入完成rank")
+            # return '插入完成'
+        except Exception as e:
+            print("插入错误rank:", e)
+            conn.rollback()
 
     @staticmethod
-    def data_judge_secondTry(sp_api,data_type,seller_id,auth_conn):
+    def get_detail_cat(cate_item, asin, mak, countryCode):
         try:
-            SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
+            detail_info = cate_item.get_catalog_item(asin=asin, **{
+                "includedData": ["images,productTypes,salesRanks,summaries"],
+                "marketplaceIds": [str(mak[countryCode].marketplace_id)]})
+            payload = detail_info.payload
+            # print(payload)
+            try:
+                main_image = payload['images'][0]['images'][0]['link'] if len(payload['images']) > 0 else "#"
+            except:
+                main_image = '#-'
+            try:
+                productTypes = payload['productTypes'][0]['productType'] if len(payload['productTypes'][0]) > 0 else "#"
+            except:
+                productTypes = '#-'
+            try:
+                # print(payload['ranks'][0])
+                if len(payload['ranks'][0]) > 0:
+                    BigCat_rank = payload['ranks'][0]['ranks'][0]['rank']
+                    BigCat_title = payload['ranks'][0]['ranks'][0]['title']
+                    SmallCat_rank = payload['ranks'][0]['ranks'][1]['rank']
+                    SmallCat_title = payload['ranks'][0]['ranks'][1]['title']
+                else:
+                    BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title = 0, '#', 0, '#'
+            except:
+                BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title = 0, '#-', 0, '#-'
+            try:
+                if len(payload['summaries'][0]) > 0:
+                    brandName = payload['summaries'][0]['brandName']
+                    browseNode = payload['summaries'][0]['browseNode']
+                    itemName = payload['summaries'][0]['itemName']
+                else:
+                    brandName, browseNode, itemName = '#', '#', '#'
+            except:
+                brandName, browseNode, itemName = '#-', '#-', '#-'
+            return {'main_image': main_image, 'productTypes': productTypes, 'BigCat_rank': BigCat_rank,
+                    'BigCat_title': BigCat_title, 'SmallCat_rank': SmallCat_rank, 'SmallCat_title': SmallCat_title,
+                    'brandName': brandName, 'browseNode': browseNode, 'itemName': itemName}
         except:
-            time.sleep(3)
-            SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
+            return {'main_image': '', 'productTypes': '', 'BigCat_rank': 0,
+                    'BigCat_title': '', 'SmallCat_rank': 0, 'SmallCat_title': '',
+                    'brandName': '', 'browseNode': '', 'itemName': ''}
 
+    @staticmethod
+    def data_judge_secondTry(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs):
+        # print(kwargs)
+        a_kw = kwargs
+        try:
+            SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw)
+        except Exception as e:
+            print(e)
+            time.sleep(10)
+            SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw)
 
     @staticmethod
-    def data_judge(sp_api,data_type,seller_id,auth_conn):
+    def data_judge(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs):
+        a_kw = kwargs
         if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA":
-            return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn,seller_id)
+            return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(refresh_token,auth_conn,seller_id,days)
         elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL":
-            return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id)
+            # for day_ in range(31,1):
+            #     sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id,days=day_*-1)
+            return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(refresh_token,seller_id,days,**a_kw)
         elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE":
-            return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(seller_id)
+            return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(refresh_token,seller_id,days)
+        elif data_type =="GET_SALES_AND_TRAFFIC_REPORT":
+            return sp_api.GET_SALES_AND_TRAFFIC_REPORT(refresh_token,seller_id,days,**a_kw)
         else:
             return ""
 
     @classmethod
-    def get_refreshtoken(cls):
+    def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"],days=-1,**kwargs):
         df = cls.auth_info()
         refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
-        return refreshtoken_list
-
-    @classmethod
-    def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"]):
-        df = cls.auth_info()
-        refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
-        refreshtoken_list.reverse()
+        shuffle(refreshtoken_list)
+        # print(type)
+        a_kw = kwargs
         for refresh_token in refreshtoken_list:
             aws_credentials = {
                 'refresh_token': refresh_token,
@@ -590,9 +1049,11 @@ class SpApiRequest:
                 pass
                 for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
                     sp_api = SpApiRequest(aws_credentials, marketplace)
+                    a_kw['countryCode'] = str(marketplace)[-2:]
                     try:
                         auth_conn = SpApiRequest.mysql_connect_auth()
-                        cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
+                        # print(a_kw)
+                        cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
                         ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
                     except Exception as e:
                         print(e)
@@ -603,9 +1064,10 @@ class SpApiRequest:
                                     Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
                                     ]:
                     sp_api = SpApiRequest(aws_credentials, marketplace)
+                    a_kw['countryCode'] = str(marketplace)[-2:]
                     try:
                         auth_conn = SpApiRequest.mysql_connect_auth()
-                        cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
+                        cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
                         ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
                     except Exception as e:
                         print(e)
@@ -616,171 +1078,14 @@ class SpApiRequest:
                 print(region_circle)
                 marketplace = eval(f'Marketplaces.{region_circle}')
                 sp_api = SpApiRequest(aws_credentials, marketplace)
-                cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
+                a_kw['countryCode'] = str(marketplace)[-2:]
+                cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
             ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
 
-    def timeDeal(self, orgTime):
-        orgTime = parse(orgTime)
-        timezone = pytz.timezone("UTC")
-        shopTime = orgTime.astimezone(timezone)
-        shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute,
-                                     shopTime.second)
-        return shopTime_datetime
-
-    def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id):
-        shopReportday = (datetime.now() + timedelta(days=-2)).strftime("%Y-%m-%d")
-        # print(shopReportday)
-        para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
-                "dataStartTime": shopReportday, "dataEndTime": shopReportday,
-                }
-        reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
-        decom_df = self.decompression(reportid)
-        print(decom_df)
-        # print(decom_df.columns)
-
-    def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self,seller_id):
-        # timezone_ = pytz.timezone(self.timezone)
-        shopReportday = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d")
-        # print(shopReportday)
-        para = {"reportType":ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,"dataStartTime":shopReportday,"dataEndTime":shopReportday,"reportOptions":{"ShowSalesChannel":"true"}}
-        reportid = self.create_report(**para) #{"ShowSalesChannel":"true"}
-        decom_df = self.decompression(reportid)
-        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
-        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
-        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype('string')
-        if "purchase-order-number" in decom_df.columns:
-            decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string")
-
-        decom_df.fillna('',inplace=True)
-        # decom_df.to_csv('order.csv')
-        decom_df["ReportDate"] = parse(shopReportday)
-        # decom_df['timezone'] =  decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
-        decom_df['timezone'] = "UTC"
-        print("==========================================================")
-        decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap(
-            lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x)
-        if 'is-business-order' not in decom_df.columns:
-            decom_df['is-business-order'] = None
-        if 'purchase-order-number' not in decom_df.columns:
-            decom_df['purchase-order-number'] = '-'
-        if 'price-designation' not in decom_df.columns:
-            decom_df['price-designation'] = '-'
-
-        decom_df['seller_id'] = seller_id
-        country_code = str(self.marketplace)[-2:]
-        if country_code=='GB':
-            country_code="UK"
-            # decom_df['country_code'] = "UK"
-        decom_df['country_code'] = country_code
-        # print(decom_df[])
-        reserve_columns = ["amazon-order-id","merchant-order-id","purchase-date","last-updated-date","order-status",
-                        "fulfillment-channel","sales-channel","order-channel","ship-service-level","product-name",
-                        "sku","asin","item-status","quantity","currency","item-price","item-tax","shipping-price",
-                        "shipping-tax","gift-wrap-price","gift-wrap-tax","item-promotion-discount",
-                        "ship-promotion-discount","ship-city","ship-state","ship-postal-code","ship-country",
-                        "promotion-ids","is-business-order","purchase-order-number","price-designation","ReportDate",
-                        "timezone","seller_id","country_code"
-                        ]
-        list_df = decom_df[reserve_columns].to_numpy().tolist()
-
-        # tuple_data = [tuple(i) for i in list_df]
-        conn = self.mysql_connect()
-        cursor = conn.cursor()
-        # print(list(conn.query("select * from amz_sp_api.orderReport")))
-        sql = f"""
-            insert into amz_sp_api.orderreport_renew1
-            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s)
-        """ #ok
-        # print(sql)
-        try:
-            conn.begin()
-            cursor.executemany(sql,list_df)
-            conn.commit()
-            print("插入完成")
-        except Exception as e:
-            conn.rollback()
-            print(e)
-
-
 if __name__ == '__main__':
-    SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL")
-
+    for days in range(2,35):
+        SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",days=-days,**{"level":"SKU"})
+    # SpApiRequest.listing_infoTable()
     # rel = SpApiRequest.get_catelog(account_name='ANLAPUS_US',country=Marketplaces.US,asin='B0BVXB4KT9')
     # print(rel)
 
-"""
-create database amz_sp_api;
-"""
-"""
-
-        create table amz_sp_api.productInfo
-        (
-        `item-name`	VARCHAR(300),
-        `item-description`	VARCHAR(1000),
-        `listing-id`	VARCHAR(50),
-        `seller-sku`	VARCHAR(50),
-        `price`	FLOAT,
-        `quantity`	INT,
-        `open-date`	VARCHAR(70),
-        `image-url`	VARCHAR(300),
-        `item-is-marketplace`	VARCHAR(50),
-        `product-id-type`	INT,
-        `item-note`	VARCHAR(300),
-        `item-condition`	INT,
-        `asin1`	VARCHAR(50),
-        `asin2`	VARCHAR(50),
-        `asin3`	VARCHAR(50),
-        `will-ship-internationally`	VARCHAR(50),
-        `expedited-shipping`	VARCHAR(50),
-        `product-id`	VARCHAR(50),
-        `bid-for-featured-placement`	FLOAT,
-        `add-delete`	VARCHAR(50),
-        `pending-quantity`	INT,
-        `fulfillment-channel`	VARCHAR(50),
-        `merchant-shipping-group`	VARCHAR(50),
-        `status`	VARCHAR(50),
-        `mainImageUrl`	VARCHAR(300),
-        `opendate_date`	Date,
-        `updateTime`	Date,
-        `timezone`	VARCHAR(30)
-        )
-"""
-"""
-create table amz_sp_api.orderReport
-(`amazon-order-id` VARCHAR(40),
-`merchant-order-id` VARCHAR(40),
-`purchase-date` DATETIME,
-`last-updated-date` DATETIME,
-`order-status` VARCHAR(40),
-`fulfillment-channel` VARCHAR(40),
-`sales-channel` VARCHAR(40),
-`order-channel` VARCHAR(40),
-`ship-service-level` VARCHAR(40),
-`product-name` VARCHAR(250),
-`sku` VARCHAR(50),
-`asin` VARCHAR(40),
-`item-status` VARCHAR(40),
-`quantity` INT,
-`currency` VARCHAR(40),
-`item-price` FLOAT,
-`item-tax` FLOAT,
-`shipping-price` FLOAT,
-`shipping-tax` FLOAT,
-`gift-wrap-price` FLOAT,
-`gift-wrap-tax` FLOAT,
-`item-promotion-discount` FLOAT,
-`ship-promotion-discount` FLOAT,
-`ship-city` VARCHAR(40),
-`ship-state` VARCHAR(40),
-`ship-postal-code` VARCHAR(40),
-`ship-country` VARCHAR(40),
-`promotion-ids` VARCHAR(50),
-`cpf` VARCHAR(40),
-`is-business-order` BOOL,
-`purchase-order-number` VARCHAR(50),
-`price-designation` VARCHAR(40),
-`signature-confirmation-recommended` BOOL,
-`ReportDate` DATE not null,
-`timezone` VARCHAR(20) not null
-);
-"""

+ 13 - 795
sync_get_open_listing_data.py

@@ -1,804 +1,22 @@
-import clickhouse_connect
-import time
 import warnings
 warnings.filterwarnings('ignore')
-import numpy as np
-from pymysql import Timestamp
-from sp_api.util import throttle_retry, load_all_pages
-from sp_api.api import Orders,ListingsItems,Inventories,Reports,CatalogItems
-from sp_api.base import Marketplaces,ReportType,ProcessingStatus
-import pandas as pd
-from random import shuffle
-import gzip
-from io import BytesIO,StringIO
-from datetime import datetime, timedelta,timezone
-import pytz
-import time
-from dateutil.parser import parse
-import pymysql
-from typing import List, Literal
 from apscheduler.schedulers.blocking import BlockingScheduler
+from sync_amz_data.public import sp_api_client
+def func_run():
+    try:
+        sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA")
+    except:
+        sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA")
+    print("="*40)
+    try:
+        sp_api_client.SpApiRequest.listing_infoTable()
+    except:
+        sp_api_client.SpApiRequest.listing_infoTable()
+    # func_run()
 
-class SpApiRequest:
-
-    def __init__(self, credentials,marketplace):
-        self.credentials = credentials
-        self.marketplace = marketplace
-        # self.shopInfo = shop_infos('3006125408623189')
-        # self.timezone = self.shopInfo['time_zone']
-        # self.profileid = '3006125408623189'
-
-    @classmethod
-    def mysql_connect_auth(cls):
-        conn = pymysql.connect(user="admin",
-        password="NSYbBSPbkGQUbOSNOeyy",
-        host="retail-data.cnrgrbcygoap.us-east-1.rds.amazonaws.com",
-        database="ansjer_dvadmin",
-        port=3306)
-
-        return conn
-
-    @classmethod
-    def mysql_connect_auth_lst(cls):
-        conn = pymysql.connect(user="root",
-                               password="sandbox",
-                               host="192.168.1.225",
-                               database="asj_ads",
-                               port=3306)
-        # conn = pymysql.connect(user="huangyifan",
-        #                        password="123456",
-        #                        host="127.0.0.1",
-        #                        database="amz_sp_api",
-        #                        port=3306)
-        return conn
-
-    @classmethod
-    def mysql_connect(cls):
-        conn = pymysql.connect(user="huangyifan",
-                               password="123456",
-                               host="127.0.0.1",
-                               database="amz_sp_api",
-                               port=3306)
-        return conn
-
-    @classmethod
-    def mysql_adTest_connect(cls):
-        conn = pymysql.connect(user="root",
-                               password="sandbox",
-                               host="192.168.1.225",
-                               database="asj_ads",
-                               port=3306)
-        return conn
-
-    @classmethod
-    def get_catelog(cls,account_name,country=Marketplaces.US,asin=None):
-        if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
-            region = 'NA'
-        elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE,  Marketplaces.PL,
-                        Marketplaces.EG,Marketplaces.ES,  Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
-                        Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
-                        ]:
-            region = 'EU'
-        else:
-            region = str(country)[-2:]
-        df = cls.auth_info()
-        try:
-            refresh_token = df.query("account_name==@account_name and region==@region")['refresh_token'].values[0]
-        except:
-            print("请输入正确的account name与Marketplace")
-            return '获取失败'
-        cred = {
-                'refresh_token': refresh_token,
-                'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e',  # 卖家中心里面开发者资料LWA凭证
-                'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
-                'aws_access_key': 'AKIARBAGHTGOZC7544GN',
-                'aws_secret_key': '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',
-                'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
-            }
-        cate_item = CatalogItems(credentials=cred, marketplace=country)
-        images_info = cate_item.get_catalog_item(asin=asin,**{"includedData":['images']})
-        images = images_info.images[0].get('images')[0]['link']
-        title_info = cate_item.get_catalog_item(asin=asin)
-        title = title_info.payload['summaries'][0]['itemName']
-        return {'images':images,'title':title}
-
-
-    def create_report(self,**kwargs): # 创建报告
-        reportType = kwargs['reportType']
-        reportOptions =kwargs.get("reportOptions")
-
-        # give a time period
-        dataStartTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataStartTime") is None else kwargs.get("dataStartTime")+"T00:00:00"
-        dataEndTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataEndTime") is None else kwargs.get("dataEndTime")+"T23:59:59"
-
-        report = Reports(credentials=self.credentials, marketplace=self.marketplace)
-        rel = report.create_report(
-            reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id],
-            reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime
-                        )
-        reportId = rel.payload.get("reportId")
-        # print(reportId)
-        return reportId
-
-    def decompression(self,reportId): #解压生成的报告
-        report = Reports(credentials=self.credentials, marketplace=self.marketplace)
-        while True:
-            reportId_info = report.get_report(reportId=reportId)
-            # print(reportId_info.payload)
-            print("please wait...")
-            if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
-                reportDocumentId = reportId_info.payload.get("reportDocumentId")
-                rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
-                print(rp_table)
-                if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
-                    df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
-                    return df
-                elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
-                    df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'},
-                                       encoding='Shift-JIS')
-                    # df.columns =
-                    return df
-                elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
-                    df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1')
-                    return df
-                elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
-                    df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS')
-                    return df
-            elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
-                print(reportId_info)
-                print("取消或失败")
-                break
-            time.sleep(15)
-            print("please wait...")
-
-
-    def data_deal(self,decom_df,seller_id): # 数据处理-for listing SUB
-        decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
-        url_columns = [i for i in decom_df.columns if "url" in i.lower()]
-        if len(url_columns) > 0:
-            decom_df[url_columns] = decom_df[url_columns].astype("string")
-        asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()]
-        if len(asin_columns) > 0:
-            decom_df[asin_columns] = decom_df[asin_columns].astype("string")
-        if 'pending-quantity' in decom_df.columns:
-            decom_df['pending-quantity'] = decom_df['pending-quantity'].map(
-                lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32")
-        deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()]
-        decom_df.drop(columns=deletecolumns, inplace=True)
-        if 'quantity' in decom_df.columns:
-            decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype(
-                "int32")
-        decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x))
-        if 'add-delete' in decom_df.columns:
-            decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore')
-        if 'will-ship-internationally' in decom_df.columns:
-            decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',errors='ignore')
-        if 'expedited-shipping' in decom_df.columns:
-            decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string',errors='ignore')
-        decom_df['updateTime'] = datetime.now()
-        decom_df['timezone'] = "UTC"
-        decom_df['seller_id'] = seller_id
-        #
-        decom_df['item-description'] = decom_df['item-description'].str.slice(0,500)
-        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
-        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
-        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
-            'string')
-        decom_df.fillna('', inplace=True)
-        # print(decom_df.info())
-        return decom_df
-
-    def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # 获取listing信息 SUB
-        start = time.time()
-        para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
-        reportid = self.create_report(**para)
-        decom_df = self.decompression(reportid)
-        print("连接数据库")
-        conn = self.mysql_connect()
-        print("连接成功")
-        cursor = conn.cursor()
-        timezone = "UTC" #pytz.timezone(self.timezone)
-        bondary_date = (datetime.now()).strftime("%Y-%m-%d") #+ timedelta(days=-28)
-        cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and 
-                        (`seller-sku` not in ('',' ') and `seller-sku` is not null) and 
-                        `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl`
-        col = [i[0] for i in cursor.description]
-        query_rel = cursor.fetchall()
-
-        if len(query_rel)!=0:
-            print(query_rel[0])
-            df = pd.DataFrame(query_rel,columns=col)
-            listingid = df['listing-id'].to_numpy().tolist()
-            decom_df = decom_df.query("`listing-id` not in @listingid")
-            print("数据条数: ",len(decom_df))
-            # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}")
-
-            # conn.commit()
-
-        if len(decom_df)==0:
-            return "Done"
-
-        if limit != None:
-            decom_df = decom_df.iloc[:limit,:]
-        print("getting mainImageInfo...")
-        rowcount = 0
-        while rowcount < len(decom_df):
-            df_insert = decom_df.copy()
-            df_insert = df_insert.iloc[rowcount:rowcount + 200, :]
-
-            df_insert = self.data_deal(df_insert)
-            list_df = df_insert.to_numpy().tolist()
-
-            # print(list(conn.query("select * from amz_sp_api.orderReport")))
-            sql = f"""
-                        insert into amz_sp_api.productInfo
-                        values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
-                    """ #ok
-            # print(sql)
-            conn = self.mysql_connect()
-            cursor = conn.cursor()
-            try:
-                conn.begin()
-                cursor.executemany(sql, list_df)
-                conn.commit()
-                print("插入中...")
-                insert_listingid = df_insert['listing-id'].to_numpy().tolist()
-                cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
-                conn.commit()
-                rowcount += 200
-            except Exception as e:
-                conn.rollback()
-                print(e)
-                try:
-                    conn = self.mysql_connect()
-                    cursor = conn.cursor()
-                    conn.begin()
-                    cursor.executemany(sql, list_df)
-                    conn.commit()
-                    insert_listingid = df_insert['listing-id'].to_numpy().tolist()
-                    cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
-                    conn.commit()
-                except Exception as e:
-                    conn.rollback()
-                    print(e)
-                    break
-                # break
-        conn.close()
-        print("全部完成")
-        end =time.time()
-        print("duration:",end-start)
-        return decom_df
-
-    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,conn=None,seller_id=None): # MAIN
-        para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
-        reportid = self.create_report(**para)
-        df = self.decompression(reportid)
-        if len(df)>0:
-            if self.marketplace.marketplace_id =='A1VC38T7YXB528':
-                df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description',
-                              'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status']
-
-            df['seller_id'] = seller_id
-            df['marketplace_id'] = self.marketplace.marketplace_id
-            df['country_code'] = str(self.marketplace)[-2:]
-            if 'fulfilment-channel' in df.columns:
-                print("changed fulfilment-channel:")
-                print(seller_id,self.marketplace)
-                df['fulfillment-channel'] = df['fulfilment-channel'].copy()
-            df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
-            df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x)
-
-            if 'asin1' not in df.columns:
-                df['asin1'] = ''
-            if 'product-id' not in df.columns:
-                df['product-id'] = ''
-
-            # 空值处理
-            df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
-            df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
-            df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
-            df.fillna('',inplace=True)
-
-            # 时间处理
-            df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x))
-            df['update_datetime'] = datetime.now(pytz.UTC).date()
-
-            origin_columns = ['listing-id','seller_id',
-                              'asin1','seller-sku','title','image_link','country_code',
-                              'marketplace_id','quantity','fulfillment_channel',
-                              'price','opendate','status','update_datetime','product-id','product-id-type'
-                              ]
-
-            # DONE 连数据库
-            conn = SpApiRequest.mysql_connect_auth_lst()
-            cursor = conn.cursor()
-            # DONE 修改查询条件
-            cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings_dailyUP where asin is not null 
-                                and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
-            query_ = cursor.fetchall()
-            col_name = [i[0] for i in cursor.description]
-            df_datatable = pd.DataFrame(query_, columns=col_name)
-            merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
-            print(merged_df.head())
-
-            def func_(asin,asin1,product_id,cred,market_p,seller_id,sku): # 创建功能函数
-                if 'B0' in str(product_id)[:3]:
-                    return str(product_id)
-                if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''):
-                    if 'B0' in asin[:3]:
-                        return asin
-                elif (pd.isna(asin1)==False and asin1!=''):
-                    if 'B0' in asin1[:3]:
-                        return asin1
-
-                listingClient = ListingsItems(credentials=cred, marketplace=market_p) # 获取listing 信息
-                try:
-                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                    print(r1.payload)
-                    asin = r1.payload.get("summaries")[0].get("asin")
-                    return asin
-                except Exception as e:
-                    print("获取图片url过程错误重试, 错误message: ", e)
-                    time.sleep(3)
-                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                    print(r1.payload)
-                    asin = r1.payload.get("summaries")[0].get("asin")
-                    return asin
-
-            # 应用自定函数
-            merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
-
-            print("获取listing Info...")
-            merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1)
-            merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True)
-            # merged_df['image_link'] = ''
-            # merged_df['title'] = ''
-
-
-            merged_df.fillna('',inplace=True)
-            df1 = merged_df.copy()
-            print(df1[origin_columns].head(1))
-
-            # DONE 更新数据
-            update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
-
-            if len(update_df)==0:
-                return '无更新数据插入'
-            # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"})
-
-            # DONE 连接数据库
-            conn = SpApiRequest.mysql_connect_auth_lst()
-            cursor = conn.cursor()
-
-            try:
-                insertsql = """insert into
-                asj_ads.seller_listings_dailyUP(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity,
-                        fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type)
-                values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-                conn.begin()
-                cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
-                conn.commit()
-                print("插入完成")
-                return '插入完成'
-            except Exception as e:
-                print("插入错误:",e)
-                conn.rollback()
-                return '出错回滚'
-
-    def get_listing_info(self, sku,seller_id): # MAIN—GET LISTING
-        listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace)
-        try:
-            r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-            # print(r1.payload)
-            json_content = r1.payload.get("summaries")[0]
-            item_name = json_content.get("itemName")
-            item_name ='###' if item_name==None else item_name
-            img = json_content.get("mainImage")
-            img_url = '###' if img is None else img.get("link")
-            # print(str(img_url)+"-----"+ str(item_name))
-            return str(img_url)+"-----"+ str(item_name)
-        except Exception as e:
-            try:
-                print("获取图片url过程错误重试, 错误message: ",e)
-                time.sleep(3)
-                r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                print(r1.payload)
-                json_content = r1.payload.get("summaries")[0]
-
-                item_name = json_content.get("itemName")
-                item_name = '###' if item_name == None else item_name
-                img = json_content.get("mainImage")
-                img_url = '###' if img is None else img.get("link")
-                return str(img_url)+"-----"+ str(item_name)
-            except Exception as e:
-                print(e)
-                return "###-----###"
-
-
-    def datetime_deal(self,timestring): #时间处理-Main
-        timezone_ = {"AEST":"Australia/Sydney",
-                    "AEDT":"Australia/Sydney",
-                    "PST":"America/Los_Angeles",
-                    "PDT":"America/Los_Angeles",
-                    "CST":"America/Chicago",
-                    "CDT":"America/Chicago",
-                    "MET":"MET",
-                    "MEST":"MET",
-                    "BST":"Europe/London",
-                    "GMT":"GMT",
-                    "CET":"CET",
-                    "CEST":"CET",
-                    "JST":"Asia/Tokyo",
-                    "BRT":"America/Sao_Paulo"}
-
-        date_list = str.split(timestring,sep = ' ')
-        if len(date_list)<3:
-            try:
-                return datetime.strptime(date_list[0],"%Y-%m-%d")
-            except:
-                try:
-                    return datetime.strptime(date_list[0], "%Y/%m/%d")
-                except:
-                    try:
-                        return datetime.strptime(date_list[0], "%d/%m/%Y")
-                    except Exception as e:
-                        print(e)
-                        return datetime(1999, 12, 31, 0, 0, 0)
-        try:
-            time_date = datetime.strptime(date_list[0]+date_list[1],"%Y-%m-%d%H:%M:%S")
-            timezone = pytz.timezone(timezone_[date_list[2]])
-            time_ = timezone.localize(time_date)
-            return time_.astimezone(pytz.UTC)
-        except:
-            try:
-                time_date = datetime.strptime(date_list[0] + date_list[1], "%d/%m/%Y%H:%M:%S")
-                timezone = pytz.timezone(timezone_[date_list[2]])
-                time_ = timezone.localize(time_date)
-                return time_.astimezone(pytz.UTC)
-            except :
-                try:
-                    time_date = datetime.strptime(date_list[0] + date_list[1], "%Y/%m/%d%H:%M:%S")
-                    timezone = pytz.timezone(timezone_[date_list[2]])
-                    time_ = timezone.localize(time_date)
-                    return time_.astimezone(pytz.UTC)
-                except Exception as e1:
-                    print(e1)
-                    return datetime(1999,12,31,0,0,0)
-
-    def update_data(self,df,seller_id,country_code,conn): # 更新-Main
-        conn = SpApiRequest.mysql_connect_auth_lst()
-        cursor = conn.cursor()
-        columns = ['listing-id', 'seller_id',
-         'asin1', 'seller-sku', 'title', 'image_link', 'country_code',
-         'marketplace_id', 'quantity', 'fulfillment_channel',
-         'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type'
-         ]
-        if country_code=='GB':
-            country_code="UK"
-            df['country_code'] = "UK"
-
-
-        df_data = pd.DataFrame(columns=columns)
-        delete_list = []
-
-        marketplace_id = self.marketplace.marketplace_id
-        try:
-            # DONE 更改查询语句
-            cursor.execute(f"""select * from
-                                asj_ads.seller_listings_dailyUP where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""")
-            col = [i[0] for i in cursor.description]
-            query_rel = cursor.fetchall()
-            df_rel = pd.DataFrame(query_rel, columns=col)
-            df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64')
-            df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64')
-            df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64')
-            df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]')
-
-            df['quantity'] = df['quantity'].fillna(0).astype('int64')
-            df['price']= df['price'].fillna(0.0).astype('float64')
-            # print(df_rel.dtypes)
-            # print(df[columns].dtypes)
-            row = 0
-            while row < len(df):
-                temp_df = df.iloc[row, :]
-
-                listing_id = temp_df['listing-id']
-                asin = temp_df['asin1']
-                sku = temp_df['seller-sku']
-                quantity = temp_df['quantity']
-                fulfillment_channel = temp_df['fulfillment_channel']
-                price = temp_df['price']
-                product_id = temp_df['product-id']
-                title = temp_df['title']
-                imageurl = temp_df['image_link']
-                temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl""")
-                print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
-
-                df_data = df_data.append(temp_df, ignore_index=True)
-                delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id))
-
-                row += 1
-            print("判断不同数据条数",len(delete_list))
-            print("预计更新数据条数",len(df_data))
-            try:
-                # print(tuple(delete_list))
-                if len(delete_list)>0:
-                    query = f"""delete from asj_ads.seller_listings_dailyUP 
-                           where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s"""
-                    cursor.execute(query,(delete_list,))
-
-                    conn.commit()
-                    print(delete_list)
-                    print("进行中...")
-            except Exception as e:
-                print(e)
-                conn.rollback()
-            return df_data
-        except Exception as e:
-            print("错误:", e)
-            return df
-
-
-
-
-    @staticmethod
-    def auth_info():
-        auth_conn = SpApiRequest.mysql_connect_auth()
-        cursor = auth_conn.cursor()
-        cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
-        columns_name = [i[0] for i in cursor.description]
-        rel = cursor.fetchall()
-        df = pd.DataFrame(rel, columns=columns_name)
-        return df
-
-    @classmethod
-    def get_orders_allShops(cls):
-        pass
-
-    @staticmethod
-    def data_judge_secondTry(sp_api,data_type,seller_id,auth_conn):
-        try:
-            SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
-        except:
-            time.sleep(3)
-            SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
-
-
-    @staticmethod
-    def data_judge(sp_api,data_type,seller_id,auth_conn):
-        if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA":
-            return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn,seller_id)
-        elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL":
-            return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id)
-        elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE":
-            return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(seller_id)
-        else:
-            return ""
-
-    @classmethod
-    def get_refreshtoken(cls):
-        df = cls.auth_info()
-        refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
-        return refreshtoken_list
-
-    @classmethod
-    def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"]):
-        df = cls.auth_info()
-        refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
-        shuffle(refreshtoken_list)
-        for refresh_token in refreshtoken_list:
-            aws_credentials = {
-                'refresh_token': refresh_token,
-                'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e',  # 卖家中心里面开发者资料LWA凭证
-                'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
-                'aws_access_key': 'AKIARBAGHTGOZC7544GN',
-                'aws_secret_key': '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',
-                'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
-            }
-
-            single_info = df.query("refresh_token==@refresh_token")
-            region_circle = single_info['region'].values[0]
-            seller_id = single_info['selling_partner_id'].values[0]
-            account_name = single_info['account_name'].values[0]
-            if region_circle == 'NA':
-                pass
-                for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
-                    sp_api = SpApiRequest(aws_credentials, marketplace)
-                    try:
-                        auth_conn = SpApiRequest.mysql_connect_auth()
-                        cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
-                        ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
-                    except Exception as e:
-                        print(e)
-            elif region_circle == 'EU':
-                pass
-                for marketplace in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE,  Marketplaces.PL,
-                                    Marketplaces.EG,Marketplaces.ES,  Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
-                                    Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
-                                    ]:
-                    sp_api = SpApiRequest(aws_credentials, marketplace)
-                    try:
-                        auth_conn = SpApiRequest.mysql_connect_auth()
-                        cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
-                        ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
-                    except Exception as e:
-                        print(e)
-
-            else:
-            # if region_circle not in ['NA','EU']:
-                auth_conn = SpApiRequest.mysql_connect_auth()
-                print(region_circle)
-                marketplace = eval(f'Marketplaces.{region_circle}')
-                sp_api = SpApiRequest(aws_credentials, marketplace)
-                cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
-            ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
-
-    def timeDeal(self, orgTime):
-        orgTime = parse(orgTime)
-        timezone = pytz.timezone("UTC")
-        shopTime = orgTime.astimezone(timezone)
-        shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute,
-                                     shopTime.second)
-        return shopTime_datetime
-
-    def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id):
-        shopReportday = (datetime.now() + timedelta(days=-2)).strftime("%Y-%m-%d")
-        # print(shopReportday)
-        para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
-                "dataStartTime": shopReportday, "dataEndTime": shopReportday,
-                }
-        reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
-        decom_df = self.decompression(reportid)
-        print(decom_df)
-        # print(decom_df.columns)
-
-    def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self,seller_id):
-        # timezone_ = pytz.timezone(self.timezone)
-        shopReportday = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d")
-        # print(shopReportday)
-        para = {"reportType":ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,"dataStartTime":shopReportday,"dataEndTime":shopReportday,"reportOptions":{"ShowSalesChannel":"true"}}
-        reportid = self.create_report(**para) #{"ShowSalesChannel":"true"}
-        decom_df = self.decompression(reportid)
-        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
-        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
-        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype('string')
-        if "purchase-order-number" in decom_df.columns:
-            decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string")
-
-        decom_df.fillna('',inplace=True)
-        # decom_df.to_csv('order.csv')
-        decom_df["ReportDate"] = parse(shopReportday)
-        # decom_df['timezone'] =  decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
-        decom_df['timezone'] = "UTC"
-        print("==========================================================")
-        decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap(
-            lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x)
-        if 'is-business-order' not in decom_df.columns:
-            decom_df['is-business-order'] = None
-        if 'purchase-order-number' not in decom_df.columns:
-            decom_df['purchase-order-number'] = '-'
-        if 'price-designation' not in decom_df.columns:
-            decom_df['price-designation'] = '-'
 
-        decom_df['seller_id'] = seller_id
-        country_code = str(self.marketplace)[-2:]
-        if country_code=='GB':
-            country_code="UK"
-            # decom_df['country_code'] = "UK"
-        decom_df['country_code'] = country_code
-        # print(decom_df[])
-        reserve_columns = ["amazon-order-id","merchant-order-id","purchase-date","last-updated-date","order-status",
-                        "fulfillment-channel","sales-channel","order-channel","ship-service-level","product-name",
-                        "sku","asin","item-status","quantity","currency","item-price","item-tax","shipping-price",
-                        "shipping-tax","gift-wrap-price","gift-wrap-tax","item-promotion-discount",
-                        "ship-promotion-discount","ship-city","ship-state","ship-postal-code","ship-country",
-                        "promotion-ids","is-business-order","purchase-order-number","price-designation","ReportDate",
-                        "timezone","seller_id","country_code"
-                        ]
-        list_df = decom_df[reserve_columns].to_numpy().tolist()
-        # print(list_df)
-        # print(list_df[0])
-        # tuple_data = [tuple(i) for i in list_df]
-        conn = self.mysql_connect()
-        cursor = conn.cursor()
-        # print(list(conn.query("select * from amz_sp_api.orderReport")))
-        sql = f"""
-            insert into amz_sp_api.orderreport_renew1
-            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s)
-        """ #ok
-        # print(sql)
-        try:
-            conn.begin()
-            cursor.executemany(sql,list_df)
-            conn.commit()
-            print("插入完成")
-        except Exception as e:
-            conn.rollback()
-            print(e)
 
-def func_run():
-    SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA")
-
-# func_run()
 if __name__ == '__main__':
     sched = BlockingScheduler()
-    sched.add_job(func_run,'cron',hour=1,minute=30,second=0)
+    sched.add_job(func_run,'cron',hour=4,minute=30,second=0)
     sched.start()
-
-
-"""
-create database amz_sp_api;
-"""
-"""
-
-        create table amz_sp_api.productInfo
-        (
-        `item-name`	VARCHAR(300),
-        `item-description`	VARCHAR(1000),
-        `listing-id`	VARCHAR(50),
-        `seller-sku`	VARCHAR(50),
-        `price`	FLOAT,
-        `quantity`	INT,
-        `open-date`	VARCHAR(70),
-        `image-url`	VARCHAR(300),
-        `item-is-marketplace`	VARCHAR(50),
-        `product-id-type`	INT,
-        `item-note`	VARCHAR(300),
-        `item-condition`	INT,
-        `asin1`	VARCHAR(50),
-        `asin2`	VARCHAR(50),
-        `asin3`	VARCHAR(50),
-        `will-ship-internationally`	VARCHAR(50),
-        `expedited-shipping`	VARCHAR(50),
-        `product-id`	VARCHAR(50),
-        `bid-for-featured-placement`	FLOAT,
-        `add-delete`	VARCHAR(50),
-        `pending-quantity`	INT,
-        `fulfillment-channel`	VARCHAR(50),
-        `merchant-shipping-group`	VARCHAR(50),
-        `status`	VARCHAR(50),
-        `mainImageUrl`	VARCHAR(300),
-        `opendate_date`	Date,
-        `updateTime`	Date,
-        `timezone`	VARCHAR(30)
-        )
-"""
-"""
-create table amz_sp_api.orderReport
-(`amazon-order-id` VARCHAR(40),
-`merchant-order-id` VARCHAR(40),
-`purchase-date` DATETIME,
-`last-updated-date` DATETIME,
-`order-status` VARCHAR(40),
-`fulfillment-channel` VARCHAR(40),
-`sales-channel` VARCHAR(40),
-`order-channel` VARCHAR(40),
-`ship-service-level` VARCHAR(40),
-`product-name` VARCHAR(250),
-`sku` VARCHAR(50),
-`asin` VARCHAR(40),
-`item-status` VARCHAR(40),
-`quantity` INT,
-`currency` VARCHAR(40),
-`item-price` FLOAT,
-`item-tax` FLOAT,
-`shipping-price` FLOAT,
-`shipping-tax` FLOAT,
-`gift-wrap-price` FLOAT,
-`gift-wrap-tax` FLOAT,
-`item-promotion-discount` FLOAT,
-`ship-promotion-discount` FLOAT,
-`ship-city` VARCHAR(40),
-`ship-state` VARCHAR(40),
-`ship-postal-code` VARCHAR(40),
-`ship-country` VARCHAR(40),
-`promotion-ids` VARCHAR(50),
-`cpf` VARCHAR(40),
-`is-business-order` BOOL,
-`purchase-order-number` VARCHAR(50),
-`price-designation` VARCHAR(40),
-`signature-confirmation-recommended` BOOL,
-`ReportDate` DATE not null,
-`timezone` VARCHAR(20) not null
-);
-"""

+ 16 - 806
sync_get_order_data.py

@@ -1,817 +1,27 @@
-import clickhouse_connect
-import time
+
 import warnings
 warnings.filterwarnings('ignore')
-import numpy as np
-from pymysql import Timestamp
-from sp_api.util import throttle_retry, load_all_pages
-from sp_api.api import Orders,ListingsItems,Inventories,Reports,CatalogItems
-from sp_api.base import Marketplaces,ReportType,ProcessingStatus
-import pandas as pd
-import gzip
-from io import BytesIO,StringIO
-from datetime import datetime, timedelta,timezone
-import pytz
-import time
-from dateutil.parser import parse
-import pymysql
-from typing import List, Literal
 from apscheduler.schedulers.blocking import BlockingScheduler
 
-class SpApiRequest:
-
-    def __init__(self, credentials,marketplace):
-        self.credentials = credentials
-        self.marketplace = marketplace
-        # self.shopInfo = shop_infos('3006125408623189')
-        # self.timezone = self.shopInfo['time_zone']
-        # self.profileid = '3006125408623189'
-
-    @classmethod
-    def mysql_connect_auth(cls):
-        conn = pymysql.connect(user="admin",
-        password="NSYbBSPbkGQUbOSNOeyy",
-        host="retail-data.cnrgrbcygoap.us-east-1.rds.amazonaws.com",
-        database="ansjer_dvadmin",
-        port=3306)
-
-        return conn
-
-    @classmethod
-    def mysql_connect_auth_lst(cls):
-        conn = pymysql.connect(user="root",
-                               password="sandbox",
-                               host="192.168.1.225",
-                               database="asj_ads",
-                               port=3306)
-        # conn = pymysql.connect(user="huangyifan",
-        #                        password="123456",
-        #                        host="127.0.0.1",
-        #                        database="amz_sp_api",
-        #                        port=3306)
-        return conn
-
-    @classmethod
-    def mysql_connect(cls):
-        conn = pymysql.connect(user="huangyifan",
-                               password="123456",
-                               host="127.0.0.1",
-                               database="amz_sp_api",
-                               port=3306)
-        return conn
-
-    @classmethod
-    def mysql_adTest_connect(cls):
-        conn = pymysql.connect(user="root",
-                               password="sandbox",
-                               host="192.168.1.225",
-                               database="asj_ads",
-                               port=3306)
-        return conn
-
-    @classmethod
-    def get_catelog(cls,account_name,country=Marketplaces.US,asin=None):
-        if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
-            region = 'NA'
-        elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE,  Marketplaces.PL,
-                        Marketplaces.EG,Marketplaces.ES,  Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
-                        Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
-                        ]:
-            region = 'EU'
-        else:
-            region = str(country)[-2:]
-        df = cls.auth_info()
-        try:
-            refresh_token = df.query("account_name==@account_name and region==@region")['refresh_token'].values[0]
-        except:
-            print("请输入正确的account name与Marketplace")
-            return '获取失败'
-        cred = {
-                'refresh_token': refresh_token,
-                'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e',  # 卖家中心里面开发者资料LWA凭证
-                'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
-                'aws_access_key': 'AKIARBAGHTGOZC7544GN',
-                'aws_secret_key': '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',
-                'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
-            }
-        cate_item = CatalogItems(credentials=cred, marketplace=country)
-        images_info = cate_item.get_catalog_item(asin=asin,**{"includedData":['images']})
-        images = images_info.images[0].get('images')[0]['link']
-        title_info = cate_item.get_catalog_item(asin=asin)
-        title = title_info.payload['summaries'][0]['itemName']
-        return {'images':images,'title':title}
-
-
-    def create_report(self,**kwargs): # 创建报告
-        reportType = kwargs['reportType']
-        reportOptions =kwargs.get("reportOptions")
-
-        # give a time period
-        dataStartTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataStartTime") is None else kwargs.get("dataStartTime")+"T00:00:00"
-        dataEndTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataEndTime") is None else kwargs.get("dataEndTime")+"T23:59:59"
-
-        report = Reports(credentials=self.credentials, marketplace=self.marketplace)
-        rel = report.create_report(
-            reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id],
-            reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime
-                        )
-        reportId = rel.payload.get("reportId")
-        # print(reportId)
-        return reportId
-
-    def decompression(self,reportId): #解压生成的报告
-        report = Reports(credentials=self.credentials, marketplace=self.marketplace)
-        while True:
-            reportId_info = report.get_report(reportId=reportId)
-            # print(reportId_info.payload)
-            print("please wait...")
-            if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
-                reportDocumentId = reportId_info.payload.get("reportDocumentId")
-                rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
-                print(rp_table)
-                if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
-                    df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
-                    return df
-                elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
-                    df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'},
-                                       encoding='Shift-JIS')
-                    # df.columns =
-                    return df
-                elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
-                    df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1')
-                    return df
-                elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
-                    df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS')
-                    return df
-            elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
-                print(reportId_info)
-                print("取消或失败")
-                break
-            time.sleep(15)
-            print("please wait...")
-
-
-    def data_deal(self,decom_df,seller_id): # 数据处理-for listing SUB
-        decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
-        url_columns = [i for i in decom_df.columns if "url" in i.lower()]
-        if len(url_columns) > 0:
-            decom_df[url_columns] = decom_df[url_columns].astype("string")
-        asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()]
-        if len(asin_columns) > 0:
-            decom_df[asin_columns] = decom_df[asin_columns].astype("string")
-        if 'pending-quantity' in decom_df.columns:
-            decom_df['pending-quantity'] = decom_df['pending-quantity'].map(
-                lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32")
-        deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()]
-        decom_df.drop(columns=deletecolumns, inplace=True)
-        if 'quantity' in decom_df.columns:
-            decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype(
-                "int32")
-        decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x))
-        if 'add-delete' in decom_df.columns:
-            decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore')
-        if 'will-ship-internationally' in decom_df.columns:
-            decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',errors='ignore')
-        if 'expedited-shipping' in decom_df.columns:
-            decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string',errors='ignore')
-        decom_df['updateTime'] = datetime.now()
-        decom_df['timezone'] = "UTC"
-        decom_df['seller_id'] = seller_id
-        #
-        decom_df['item-description'] = decom_df['item-description'].str.slice(0,500)
-        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
-        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
-        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
-            'string')
-        decom_df.fillna('', inplace=True)
-        # print(decom_df.info())
-        return decom_df
-
-    def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # 获取listing信息 SUB
-        start = time.time()
-        para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
-        reportid = self.create_report(**para)
-        decom_df = self.decompression(reportid)
-        print("连接数据库")
-        conn = self.mysql_connect()
-        print("连接成功")
-        cursor = conn.cursor()
-        timezone = "UTC" #pytz.timezone(self.timezone)
-        bondary_date = (datetime.now()).strftime("%Y-%m-%d") #+ timedelta(days=-28)
-        cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and 
-                        (`seller-sku` not in ('',' ') and `seller-sku` is not null) and 
-                        `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl`
-        col = [i[0] for i in cursor.description]
-        query_rel = cursor.fetchall()
-
-        if len(query_rel)!=0:
-            print(query_rel[0])
-            df = pd.DataFrame(query_rel,columns=col)
-            listingid = df['listing-id'].to_numpy().tolist()
-            decom_df = decom_df.query("`listing-id` not in @listingid")
-            print("数据条数: ",len(decom_df))
-            # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}")
-
-            # conn.commit()
-
-        if len(decom_df)==0:
-            return "Done"
-
-        if limit != None:
-            decom_df = decom_df.iloc[:limit,:]
-        print("getting mainImageInfo...")
-        rowcount = 0
-        while rowcount < len(decom_df):
-            df_insert = decom_df.copy()
-            df_insert = df_insert.iloc[rowcount:rowcount + 200, :]
-
-            df_insert = self.data_deal(df_insert)
-            list_df = df_insert.to_numpy().tolist()
-
-            # print(list(conn.query("select * from amz_sp_api.orderReport")))
-            sql = f"""
-                        insert into amz_sp_api.productInfo
-                        values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
-                    """ #ok
-            # print(sql)
-            conn = self.mysql_connect()
-            cursor = conn.cursor()
-            try:
-                conn.begin()
-                cursor.executemany(sql, list_df)
-                conn.commit()
-                print("插入中...")
-                insert_listingid = df_insert['listing-id'].to_numpy().tolist()
-                cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
-                conn.commit()
-                rowcount += 200
-            except Exception as e:
-                conn.rollback()
-                print(e)
-                try:
-                    conn = self.mysql_connect()
-                    cursor = conn.cursor()
-                    conn.begin()
-                    cursor.executemany(sql, list_df)
-                    conn.commit()
-                    insert_listingid = df_insert['listing-id'].to_numpy().tolist()
-                    cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
-                    conn.commit()
-                except Exception as e:
-                    conn.rollback()
-                    print(e)
-                    break
-                # break
-        conn.close()
-        print("全部完成")
-        end =time.time()
-        print("duration:",end-start)
-        return decom_df
-
-    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,conn=None,seller_id=None): # MAIN
-        para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
-        reportid = self.create_report(**para)
-        df = self.decompression(reportid)
-        if len(df)>0:
-            if self.marketplace.marketplace_id =='A1VC38T7YXB528':
-                df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description',
-                              'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status']
-
-            df['seller_id'] = seller_id
-            df['marketplace_id'] = self.marketplace.marketplace_id
-            df['country_code'] = str(self.marketplace)[-2:]
-            if 'fulfilment-channel' in df.columns:
-                print("changed fulfilment-channel:")
-                print(seller_id,self.marketplace)
-                df['fulfillment-channel'] = df['fulfilment-channel'].copy()
-            df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
-            df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x)
-
-            if 'asin1' not in df.columns:
-                df['asin1'] = ''
-            if 'product-id' not in df.columns:
-                df['product-id'] = ''
-
-            # 空值处理
-            df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
-            df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
-            df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
-            df.fillna('',inplace=True)
-
-            # 时间处理
-            df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x))
-            df['update_datetime'] = datetime.now(pytz.UTC).date()
-
-            origin_columns = ['listing-id','seller_id',
-                              'asin1','seller-sku','title','image_link','country_code',
-                              'marketplace_id','quantity','fulfillment_channel',
-                              'price','opendate','status','update_datetime','product-id','product-id-type'
-                              ]
-
-            # DONE 连数据库
-            conn = SpApiRequest.mysql_connect_auth_lst()
-            cursor = conn.cursor()
-            # DONE 修改查询条件
-            cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings_dailyUP where asin is not null 
-                                and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
-            query_ = cursor.fetchall()
-            col_name = [i[0] for i in cursor.description]
-            df_datatable = pd.DataFrame(query_, columns=col_name)
-            merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
-            print(merged_df.head())
-
-            def func_(asin,asin1,product_id,cred,market_p,seller_id,sku): # 创建功能函数
-                if 'B0' in str(product_id)[:3]:
-                    return str(product_id)
-                if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''):
-                    if 'B0' in asin[:3]:
-                        return asin
-                elif (pd.isna(asin1)==False and asin1!=''):
-                    if 'B0' in asin1[:3]:
-                        return asin1
-
-                listingClient = ListingsItems(credentials=cred, marketplace=market_p) # 获取listing 信息
-                try:
-                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                    print(r1.payload)
-                    asin = r1.payload.get("summaries")[0].get("asin")
-                    return asin
-                except Exception as e:
-                    print("获取图片url过程错误重试, 错误message: ", e)
-                    time.sleep(3)
-                    r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                    print(r1.payload)
-                    asin = r1.payload.get("summaries")[0].get("asin")
-                    return asin
-
-            # 应用自定函数
-            merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
-
-            print("获取listing Info...")
-            merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1)
-            merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True)
-            # merged_df['image_link'] = ''
-            # merged_df['title'] = ''
-
-
-            merged_df.fillna('',inplace=True)
-            df1 = merged_df.copy()
-            print(df1[origin_columns].head(1))
-
-            # DONE 更新数据
-            update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
-
-            if len(update_df)==0:
-                return '无更新数据插入'
-            # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"})
-
-            # DONE 连接数据库
-            conn = SpApiRequest.mysql_connect_auth_lst()
-            cursor = conn.cursor()
-
-            try:
-                insertsql = """insert into
-                asj_ads.seller_listings_dailyUP(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity,
-                        fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type)
-                values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-                conn.begin()
-                cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
-                conn.commit()
-                print("插入完成")
-                return '插入完成'
-            except Exception as e:
-                print("插入错误:",e)
-                conn.rollback()
-                return '出错回滚'
-
-    def get_listing_info(self, sku,seller_id): # MAIN—GET LISTING
-        listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace)
-        try:
-            r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-            # print(r1.payload)
-            json_content = r1.payload.get("summaries")[0]
-            item_name = json_content.get("itemName")
-            item_name ='###' if item_name==None else item_name
-            img = json_content.get("mainImage")
-            img_url = '###' if img is None else img.get("link")
-            # print(str(img_url)+"-----"+ str(item_name))
-            return str(img_url)+"-----"+ str(item_name)
-        except Exception as e:
-            try:
-                print("获取图片url过程错误重试, 错误message: ",e)
-                time.sleep(3)
-                r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
-                print(r1.payload)
-                json_content = r1.payload.get("summaries")[0]
-
-                item_name = json_content.get("itemName")
-                item_name = '###' if item_name == None else item_name
-                img = json_content.get("mainImage")
-                img_url = '###' if img is None else img.get("link")
-                return str(img_url)+"-----"+ str(item_name)
-            except Exception as e:
-                print(e)
-                return "###-----###"
-
-
-    def datetime_deal(self,timestring): #时间处理-Main
-        timezone_ = {"AEST":"Australia/Sydney",
-                    "AEDT":"Australia/Sydney",
-                    "PST":"America/Los_Angeles",
-                    "PDT":"America/Los_Angeles",
-                    "CST":"America/Chicago",
-                    "CDT":"America/Chicago",
-                    "MET":"MET",
-                    "MEST":"MET",
-                    "BST":"Europe/London",
-                    "GMT":"GMT",
-                    "CET":"CET",
-                    "CEST":"CET",
-                    "JST":"Asia/Tokyo",
-                    "BRT":"America/Sao_Paulo"}
-
-        date_list = str.split(timestring,sep = ' ')
-        if len(date_list)<3:
-            try:
-                return datetime.strptime(date_list[0],"%Y-%m-%d")
-            except:
-                try:
-                    return datetime.strptime(date_list[0], "%Y/%m/%d")
-                except:
-                    try:
-                        return datetime.strptime(date_list[0], "%d/%m/%Y")
-                    except Exception as e:
-                        print(e)
-                        return datetime(1999, 12, 31, 0, 0, 0)
-        try:
-            time_date = datetime.strptime(date_list[0]+date_list[1],"%Y-%m-%d%H:%M:%S")
-            timezone = pytz.timezone(timezone_[date_list[2]])
-            time_ = timezone.localize(time_date)
-            return time_.astimezone(pytz.UTC)
-        except:
-            try:
-                time_date = datetime.strptime(date_list[0] + date_list[1], "%d/%m/%Y%H:%M:%S")
-                timezone = pytz.timezone(timezone_[date_list[2]])
-                time_ = timezone.localize(time_date)
-                return time_.astimezone(pytz.UTC)
-            except :
-                try:
-                    time_date = datetime.strptime(date_list[0] + date_list[1], "%Y/%m/%d%H:%M:%S")
-                    timezone = pytz.timezone(timezone_[date_list[2]])
-                    time_ = timezone.localize(time_date)
-                    return time_.astimezone(pytz.UTC)
-                except Exception as e1:
-                    print(e1)
-                    return datetime(1999,12,31,0,0,0)
-
-    def update_data(self,df,seller_id,country_code,conn): # 更新-Main
-        conn = SpApiRequest.mysql_connect_auth_lst()
-        cursor = conn.cursor()
-        columns = ['listing-id', 'seller_id',
-         'asin1', 'seller-sku', 'title', 'image_link', 'country_code',
-         'marketplace_id', 'quantity', 'fulfillment_channel',
-         'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type'
-         ]
-        if country_code=='GB':
-            country_code="UK"
-            df['country_code'] = "UK"
-
-
-        df_data = pd.DataFrame(columns=columns)
-        delete_list = []
-
-        marketplace_id = self.marketplace.marketplace_id
-        try:
-            # DONE 更改查询语句
-            cursor.execute(f"""select * from
-                                asj_ads.seller_listings_dailyUP where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""")
-            col = [i[0] for i in cursor.description]
-            query_rel = cursor.fetchall()
-            df_rel = pd.DataFrame(query_rel, columns=col)
-            df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64')
-            df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64')
-            df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64')
-            df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]')
-
-            df['quantity'] = df['quantity'].fillna(0).astype('int64')
-            df['price']= df['price'].fillna(0.0).astype('float64')
-            # print(df_rel.dtypes)
-            # print(df[columns].dtypes)
-            row = 0
-            while row < len(df):
-                temp_df = df.iloc[row, :]
-
-                listing_id = temp_df['listing-id']
-                asin = temp_df['asin1']
-                sku = temp_df['seller-sku']
-                quantity = temp_df['quantity']
-                fulfillment_channel = temp_df['fulfillment_channel']
-                price = temp_df['price']
-                product_id = temp_df['product-id']
-                title = temp_df['title']
-                imageurl = temp_df['image_link']
-                temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl""")
-                print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
-                if len(temp)>1:
-                    # temp = temp.head(1).to_numpy().tolist()
-                    df_data = df_data.append(temp_df, ignore_index=True)
-                    delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id))
-                # print(len(temp))
-                elif len(temp)==0:
-                    df_data = df_data.append(temp_df,ignore_index=True)
-                    delete_list.append((seller_id,marketplace_id,sku,listing_id,product_id))
-                row += 1
-            print("判断不同数据条数",len(delete_list))
-            print("预计更新数据条数",len(df_data))
-            try:
-                # print(tuple(delete_list))
-                if len(delete_list)>0:
-                    query = f"""delete from asj_ads.seller_listings_dailyUP 
-                           where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s"""
-                    cursor.execute(query,(delete_list,))
-
-                    conn.commit()
-                    print(delete_list)
-                    print("进行中...")
-            except Exception as e:
-                print(e)
-                conn.rollback()
-            return df_data
-        except Exception as e:
-            print("错误:", e)
-            return df
-
-
-
-
-    @staticmethod
-    def auth_info():
-        auth_conn = SpApiRequest.mysql_connect_auth()
-        cursor = auth_conn.cursor()
-        cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
-        columns_name = [i[0] for i in cursor.description]
-        rel = cursor.fetchall()
-        df = pd.DataFrame(rel, columns=columns_name)
-        return df
-
-    @classmethod
-    def get_orders_allShops(cls):
-        pass
-
-    @staticmethod
-    def data_judge_secondTry(sp_api,data_type,seller_id,auth_conn):
-        try:
-            SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
-        except:
-            time.sleep(3)
-            SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
-
-
-    @staticmethod
-    def data_judge(sp_api,data_type,seller_id,auth_conn):
-        if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA":
-            return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn,seller_id)
-        elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL":
-            return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id)
-        elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE":
-            return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(seller_id)
-        else:
-            return ""
-
-    @classmethod
-    def get_refreshtoken(cls):
-        df = cls.auth_info()
-        refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
-        return refreshtoken_list
-
-    @classmethod
-    def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"]):
-        df = cls.auth_info()
-        refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
-        refreshtoken_list.reverse()
-        for refresh_token in refreshtoken_list:
-            aws_credentials = {
-                'refresh_token': refresh_token,
-                'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e',  # 卖家中心里面开发者资料LWA凭证
-                'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
-                'aws_access_key': 'AKIARBAGHTGOZC7544GN',
-                'aws_secret_key': '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',
-                'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
-            }
-
-            single_info = df.query("refresh_token==@refresh_token")
-            region_circle = single_info['region'].values[0]
-            seller_id = single_info['selling_partner_id'].values[0]
-            account_name = single_info['account_name'].values[0]
-            if region_circle == 'NA':
-                pass
-                for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
-                    sp_api = SpApiRequest(aws_credentials, marketplace)
-                    try:
-                        auth_conn = SpApiRequest.mysql_connect_auth()
-                        cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
-                        ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
-                    except Exception as e:
-                        print(e)
-            elif region_circle == 'EU':
-                pass
-                for marketplace in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE,  Marketplaces.PL,
-                                    Marketplaces.EG,Marketplaces.ES,  Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
-                                    Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
-                                    ]:
-                    sp_api = SpApiRequest(aws_credentials, marketplace)
-                    try:
-                        auth_conn = SpApiRequest.mysql_connect_auth()
-                        cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
-                        ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
-                    except Exception as e:
-                        print(e)
-
-            else:
-            # if region_circle not in ['NA','EU']:
-                auth_conn = SpApiRequest.mysql_connect_auth()
-                print(region_circle)
-                marketplace = eval(f'Marketplaces.{region_circle}')
-                sp_api = SpApiRequest(aws_credentials, marketplace)
-                cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
-            ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
-
-    def timeDeal(self, orgTime):
-        orgTime = parse(orgTime)
-        timezone = pytz.timezone("UTC")
-        shopTime = orgTime.astimezone(timezone)
-        shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute,
-                                     shopTime.second)
-        return shopTime_datetime
-
-    def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id):
-        shopReportday = (datetime.now() + timedelta(days=-2)).strftime("%Y-%m-%d")
-        # print(shopReportday)
-        para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
-                "dataStartTime": shopReportday, "dataEndTime": shopReportday,
-                }
-        reportid = self.create_report(**para)  # {"ShowSalesChannel":"true"}
-        decom_df = self.decompression(reportid)
-        print(decom_df)
-        # print(decom_df.columns)
-
-    def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self,seller_id):
-        # timezone_ = pytz.timezone(self.timezone)
-        shopReportday = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d")
-        # print(shopReportday)
-        para = {"reportType":ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,"dataStartTime":shopReportday,"dataEndTime":shopReportday,"reportOptions":{"ShowSalesChannel":"true"}}
-        reportid = self.create_report(**para) #{"ShowSalesChannel":"true"}
-        decom_df = self.decompression(reportid)
-        decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
-        decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
-        decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype('string')
-        if "purchase-order-number" in decom_df.columns:
-            decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string")
-
-        decom_df.fillna('',inplace=True)
-        # decom_df.to_csv('order.csv')
-        decom_df["ReportDate"] = parse(shopReportday)
-        # decom_df['timezone'] =  decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
-        decom_df['timezone'] = "UTC"
-        print("==========================================================")
-        decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap(
-            lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x)
-        if 'is-business-order' not in decom_df.columns:
-            decom_df['is-business-order'] = None
-        if 'purchase-order-number' not in decom_df.columns:
-            decom_df['purchase-order-number'] = '-'
-        if 'price-designation' not in decom_df.columns:
-            decom_df['price-designation'] = '-'
-
-        decom_df['seller_id'] = seller_id
-        country_code = str(self.marketplace)[-2:]
-        if country_code=='GB':
-            country_code="UK"
-            # decom_df['country_code'] = "UK"
-        decom_df['country_code'] = country_code
-        # print(decom_df[])
-        reserve_columns = ["amazon-order-id","merchant-order-id","purchase-date","last-updated-date","order-status",
-                        "fulfillment-channel","sales-channel","order-channel","ship-service-level","product-name",
-                        "sku","asin","item-status","quantity","currency","item-price","item-tax","shipping-price",
-                        "shipping-tax","gift-wrap-price","gift-wrap-tax","item-promotion-discount",
-                        "ship-promotion-discount","ship-city","ship-state","ship-postal-code","ship-country",
-                        "promotion-ids","is-business-order","purchase-order-number","price-designation","ReportDate",
-                        "timezone","seller_id","country_code"
-                        ]
-        list_df = decom_df[reserve_columns].to_numpy().tolist()
-        # print(list_df)
-        # print(list_df[0])
-        # tuple_data = [tuple(i) for i in list_df]
-        try:
-            conn = self.mysql_connect_auth_lst()
-            cursor = conn.cursor()
-        except:
-            time.sleep(5)
-            conn = self.mysql_connect_auth_lst()
-            cursor = conn.cursor()
-        # print(list(conn.query("select * from amz_sp_api.orderReport")))
-        sql = f"""
-            insert into asj_ads.orderReport
-            values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s)
-        """ #ok
-        # print(sql)
-        try:
-            conn.begin()
-            cursor.executemany(sql,list_df)
-            conn.commit()
-            print("插入完成")
-            conn.close()
-            time.sleep(1)
-        except Exception as e:
-            conn.rollback()
-            print(e)
-
-
+from sync_amz_data.public import sp_api_client
 def func_run():
-    SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL")
-
-
+    try:
+        for days in (-2,-3):
+            sp_api_client.SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",days=days,**{})
+    except Exception as e:
+        print(e)
+    try:
+        for days in (-2, -3):
+           sp_api_client.SpApiRequest.get_allShops("GET_SALES_AND_TRAFFIC_REPORT",days=days,**{"level":"SKU"})
+    except Exception as e:
+        print(e)
+
+
+# func_run()
 if __name__ == '__main__':
-    # func_run()
+
     sched = BlockingScheduler()
     sched.add_job(func_run, 'cron', hour=0, minute=0,
                   second=30)
     sched.start()
 
-
-"""
-create database amz_sp_api;
-"""
-"""
-
-        create table amz_sp_api.productInfo
-        (
-        `item-name`	VARCHAR(300),
-        `item-description`	VARCHAR(1000),
-        `listing-id`	VARCHAR(50),
-        `seller-sku`	VARCHAR(50),
-        `price`	FLOAT,
-        `quantity`	INT,
-        `open-date`	VARCHAR(70),
-        `image-url`	VARCHAR(300),
-        `item-is-marketplace`	VARCHAR(50),
-        `product-id-type`	INT,
-        `item-note`	VARCHAR(300),
-        `item-condition`	INT,
-        `asin1`	VARCHAR(50),
-        `asin2`	VARCHAR(50),
-        `asin3`	VARCHAR(50),
-        `will-ship-internationally`	VARCHAR(50),
-        `expedited-shipping`	VARCHAR(50),
-        `product-id`	VARCHAR(50),
-        `bid-for-featured-placement`	FLOAT,
-        `add-delete`	VARCHAR(50),
-        `pending-quantity`	INT,
-        `fulfillment-channel`	VARCHAR(50),
-        `merchant-shipping-group`	VARCHAR(50),
-        `status`	VARCHAR(50),
-        `mainImageUrl`	VARCHAR(300),
-        `opendate_date`	Date,
-        `updateTime`	Date,
-        `timezone`	VARCHAR(30)
-        )
-"""
-"""
-create table amz_sp_api.orderReport
-(`amazon-order-id` VARCHAR(40),
-`merchant-order-id` VARCHAR(40),
-`purchase-date` DATETIME,
-`last-updated-date` DATETIME,
-`order-status` VARCHAR(40),
-`fulfillment-channel` VARCHAR(40),
-`sales-channel` VARCHAR(40),
-`order-channel` VARCHAR(40),
-`ship-service-level` VARCHAR(40),
-`product-name` VARCHAR(250),
-`sku` VARCHAR(50),
-`asin` VARCHAR(40),
-`item-status` VARCHAR(40),
-`quantity` INT,
-`currency` VARCHAR(40),
-`item-price` FLOAT,
-`item-tax` FLOAT,
-`shipping-price` FLOAT,
-`shipping-tax` FLOAT,
-`gift-wrap-price` FLOAT,
-`gift-wrap-tax` FLOAT,
-`item-promotion-discount` FLOAT,
-`ship-promotion-discount` FLOAT,
-`ship-city` VARCHAR(40),
-`ship-state` VARCHAR(40),
-`ship-postal-code` VARCHAR(40),
-`ship-country` VARCHAR(40),
-`promotion-ids` VARCHAR(50),
-`cpf` VARCHAR(40),
-`is-business-order` BOOL,
-`purchase-order-number` VARCHAR(50),
-`price-designation` VARCHAR(40),
-`signature-confirmation-recommended` BOOL,
-`ReportDate` DATE not null,
-`timezone` VARCHAR(20) not null
-);
-"""