import clickhouse_connect import time import warnings warnings.filterwarnings('ignore') import numpy as np from pymysql import Timestamp from sp_api.util import throttle_retry, load_all_pages from sp_api.api import Orders,ListingsItems,Inventories,Reports,CatalogItems,Products from sp_api.base import Marketplaces,ReportType,ProcessingStatus import pandas as pd import gzip from io import BytesIO,StringIO from datetime import datetime, timedelta,timezone import pytz import time from dateutil.parser import parse import pymysql from typing import List, Literal from random import shuffle from retry import retry import requests from urllib import request import json from clickhouse_connect import get_client try: from ..settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF except: from sync_amz_data.sync_amz_data.settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF class SpApiRequest: def __init__(self, credentials,marketplace): self.credentials = credentials self.marketplace = marketplace @classmethod @retry(tries=3, delay=5, backoff=2, ) def Token_auth(cls): # AUTH - mysql_connect_auth conn = pymysql.connect(**MYSQL_AUTH_CONF) return conn @classmethod @retry(tries=3, delay=5, backoff=2, ) def Data_auth(cls): # DATA - mysql_connect_auth_lst conn = pymysql.connect(**MYSQL_DATA_CONF) return conn @classmethod @retry(tries=3, delay=5, backoff=2, ) def LocalHost_Auth(cls): # local database- conn = pymysql.connect(user="huangyifan", password="123456", host="127.0.0.1", database="amz_sp_api", port=3306) return conn @staticmethod @retry(tries=3, delay=5, backoff=2, ) def auth_info(): # get Auth-data from all of shops - 此连接获取refreshtoken供账户授权使用 auth_conn = SpApiRequest.Token_auth() cursor = auth_conn.cursor() cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;") columns_name = [i[0] for i in cursor.description] rel = cursor.fetchall() df = pd.DataFrame(rel, columns=columns_name) return df @classmethod @retry(tries=3, delay=5, backoff=2, ) def get_refreshtoken(cls): # accroding to differnt shop get diffrent refreshtoken - 获取授权后拿到refreshtoken df = cls.auth_info() refreshtoken_list = (df['refresh_token'].to_numpy().tolist()) return refreshtoken_list @classmethod def get_catelog(cls,account_name,country=Marketplaces.US,asin=None): # active - 可以使用但未进行使用 if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]: region = 'NA' elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL, Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT, Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR, ]: region = 'EU' else: region = str(country)[-2:] df = cls.auth_info() try: refresh_token = df.query("account_name==@account_name and region==@region")['refresh_token'].values[0] except: print("请输入正确的account name与Marketplace") return '获取失败' cred = { 'refresh_token': refresh_token, 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证 'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490', 'aws_access_key': 'AKIARBAGHTGOZC7544GN', 'aws_secret_key': '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', 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE' } # cate_item = CatalogItems(credentials=cred, marketplace=country) # images_info = cate_item.get_catalog_item(asin=asin,**{"includedData":['images']}) # print(images_info) # images = images_info.images[0].get('images')[0]['link'] # title_info = cate_item.get_catalog_item(asin=asin) # print(title_info) # title = title_info.payload['summaries'][0]['itemName'] cate_item = CatalogItems(credentials=cred, marketplace=country,version='2022-04-01') test_bundle = cate_item.get_catalog_item(asin=asin,**{"includedData":['salesRanks']}) print(test_bundle) # return {'images':images,'title':title} @retry(tries=3, delay=5, backoff=2,) def create_report(self,**kwargs): # Main-CreateReport-Function - 创建报告请求 reportType = kwargs['reportType'] reportOptions =kwargs.get("reportOptions") dataStartTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataStartTime") is None else kwargs.get("dataStartTime")+"T00:00:00" dataEndTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataEndTime") is None else kwargs.get("dataEndTime")+"T23:59:59" report = Reports(credentials=self.credentials, marketplace=self.marketplace) rel = report.create_report( reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id], reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime ) reportId = rel.payload.get("reportId") # print(reportId) return reportId # @retry(tries=2, delay=3, backoff=2,) def decompression(self,reportId): # After-CreateReportFunc-simpleDeal - 根据获取到的报告id进行解压获取 report = Reports(credentials=self.credentials, marketplace=self.marketplace) while True: reportId_info = report.get_report(reportId=reportId) # print(reportId_info.payload) print("please wait...") if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE: reportDocumentId = reportId_info.payload.get("reportDocumentId") rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False) print(rp_table) if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:# try: df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1') except: df = pd.read_csv(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'}, encoding='iso-8859-1') return df elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']: df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'}, encoding='Shift-JIS') # df.columns = return df elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']: df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1') return df elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']: df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS') return df elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]: try: print(reportId_info) reportDocumentId = reportId_info.payload.get("reportDocumentId") rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=True) print("取消或失败",rp_table) return pd.DataFrame() except Exception as e: print(e) return pd.DataFrame() time.sleep(15) print("please wait...") def decompression2(self,reportId): # After-CreateReportFunc-simpleDeal - 根据获取到的报告id进行解压获取 report = Reports(credentials=self.credentials, marketplace=self.marketplace) while True: reportId_info = report.get_report(reportId=reportId) # print(reportId_info.payload) print("please wait...") if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE: reportDocumentId = reportId_info.payload.get("reportDocumentId") rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False) print(rp_table) if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:# # df = pd.read_json(path_or_buf=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1') response = request.urlopen(rp_table.payload['url']) response.encoding='iso-8859-1' df = json.loads(response.read().decode()) # pd.json_normalize() return df elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']: df = pd.read_json(path_or_buf=rp_table.payload['url'], compression={"method": 'gzip'}, encoding='Shift-JIS') # df.columns = return df elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']: df = pd.read_json(path_or_buf=rp_table.payload.get("url"),encoding='iso-8859-1') return df elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']: df = pd.read_json(path_or_buf=rp_table.payload.get("url"),encoding='Shift-JIS') return df elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]: print(reportId_info) reportDocumentId = reportId_info.payload.get("reportDocumentId") rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=True) print("取消或失败",rp_table) return pd.DataFrame() time.sleep(15) print("please wait...") # def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # Not be used # start = time.time() # para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA} # reportid = self.create_report(**para) # decom_df = self.decompression(reportid) # print("连接数据库") # conn = self.LocalHost_Auth() # print("连接成功") # cursor = conn.cursor() # timezone = "UTC" #pytz.timezone(self.timezone) # bondary_date = (datetime.now()).strftime("%Y-%m-%d") #+ timedelta(days=-28) # cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and # (`seller-sku` not in ('',' ') and `seller-sku` is not null) and # `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl` # col = [i[0] for i in cursor.description] # query_rel = cursor.fetchall() # # if len(query_rel)!=0: # print(query_rel[0]) # df = pd.DataFrame(query_rel,columns=col) # listingid = df['listing-id'].to_numpy().tolist() # decom_df = decom_df.query("`listing-id` not in @listingid") # print("数据条数: ",len(decom_df)) # # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}") # # # conn.commit() # # if len(decom_df)==0: # return "Done" # # if limit != None: # decom_df = decom_df.iloc[:limit,:] # print("getting mainImageInfo...") # rowcount = 0 # while rowcount < len(decom_df): # df_insert = decom_df.copy() # df_insert = df_insert.iloc[rowcount:rowcount + 200, :] # # df_insert = self.data_deal(df_insert) # list_df = df_insert.to_numpy().tolist() # # # print(list(conn.query("select * from amz_sp_api.orderReport"))) # sql = f""" # insert into amz_sp_api.productInfo # values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s) # """ #ok # # print(sql) # conn = self.LocalHost_Auth() # cursor = conn.cursor() # try: # conn.begin() # cursor.executemany(sql, list_df) # conn.commit() # print("插入中...") # insert_listingid = df_insert['listing-id'].to_numpy().tolist() # cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'") # conn.commit() # rowcount += 200 # except Exception as e: # conn.rollback() # print(e) # try: # conn = self.LocalHost_Auth() # cursor = conn.cursor() # conn.begin() # cursor.executemany(sql, list_df) # conn.commit() # insert_listingid = df_insert['listing-id'].to_numpy().tolist() # cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'") # conn.commit() # except Exception as e: # conn.rollback() # print(e) # break # # break # conn.close() # print("全部完成") # end =time.time() # print("duration:",end-start) # return decom_df # def data_deal(self, decom_df, seller_id): # desprecated # decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x)) # url_columns = [i for i in decom_df.columns if "url" in i.lower()] # if len(url_columns) > 0: # decom_df[url_columns] = decom_df[url_columns].astype("string") # asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()] # if len(asin_columns) > 0: # decom_df[asin_columns] = decom_df[asin_columns].astype("string") # if 'pending-quantity' in decom_df.columns: # decom_df['pending-quantity'] = decom_df['pending-quantity'].map( # lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32") # deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()] # decom_df.drop(columns=deletecolumns, inplace=True) # if 'quantity' in decom_df.columns: # decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype( # "int32") # decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x)) # if 'add-delete' in decom_df.columns: # decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore') # if 'will-ship-internationally' in decom_df.columns: # decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string', # errors='ignore') # if 'expedited-shipping' in decom_df.columns: # decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string', errors='ignore') # decom_df['updateTime'] = datetime.now() # decom_df['timezone'] = "UTC" # decom_df['seller_id'] = seller_id # # # decom_df['item-description'] = decom_df['item-description'].str.slice(0, 500) # decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0) # decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0) # decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype( # 'string') # decom_df.fillna('', inplace=True) # # print(decom_df.info()) # return decom_df def fba_inventorySQL(self,conn,seller_id): cursor = conn.cursor() # 执行语句 cursor.execute(f""" select asin,sku,seller_id,marketplace_id,country_code,quantity,fulfillment_channel from (select asin,sku,seller_id,marketplace_id,country_code,quantity,fulfillment_channel, ROW_NUMBER() over(partition by asin,sku,seller_id,marketplace_id,country_code order by update_datetime desc) as row_ from asj_ads.seller_listings) as t where row_=1 and seller_id='{seller_id}' and marketplace_id='{self.marketplace.marketplace_id}' and fulfillment_channel='FBA' """)#,quantity query_ = cursor.fetchall() col_name = [i[0] for i in cursor.description] df_datatable = pd.DataFrame(query_, columns=col_name) df_datatable.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_', 'afn-fulfillable-quantity'] return df_datatable def get_fba_neccessary_segment(self,conn,seller_id): para = {"reportType": ReportType.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA} reportid = self.create_report(**para) df = self.decompression(reportid) if len(df) == 0: return self.fba_inventorySQL(conn, seller_id) # pd.DataFrame() df['seller_id'] = seller_id df['marketplace_id'] = self.marketplace.marketplace_id df['country_code'] = str(self.marketplace)[-2:] df_rel = df.query("condition=='New'") df_rel = df_rel.groupby(['asin', 'sku', 'seller_id', 'marketplace_id', 'country_code']).agg( {'afn-fulfillable-quantity': sum}).reset_index() df_rel.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_', 'afn-fulfillable-quantity'] print(f"{seller_id}_{str(self.marketplace)[-2:]}_FBA_Inventory_OK") return df_rel def GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(self,refresh_token,conn=None,seller_id=None,days=-1,**kwargs): # FBA库存信息 try: return self.get_fba_neccessary_segment(conn,seller_id) except Exception as e: print(e) try: time.sleep(15) return self.get_fba_neccessary_segment(conn, seller_id) except Exception as e: print(e) df_rel = pd.DataFrame(columns=['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_','afn-fulfillable-quantity']) return df_rel def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,refresh_token,conn=None,seller_id=None,days=-1): # To datatable asj_ads.seller_listings- listing信息,包括fbm库存 para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA} reportid = self.create_report(**para) df = self.decompression(reportid) if len(df)>0: if self.marketplace.marketplace_id =='A1VC38T7YXB528': # 该站点的数据列有所不同 df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description', 'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status'] df['seller_id'] = seller_id df['marketplace_id'] = self.marketplace.marketplace_id df['country_code'] = str(self.marketplace)[-2:] if 'fulfilment-channel' in df.columns: # 判断是否存在’fulfilment‘字段(1个film),如果存在则添加一个’fulfillment‘字段(两个fillm) df['fulfillment-channel'] = df['fulfilment-channel'].copy() # 如果是amazon,则字段改为FBA df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x) # 如果是DEFAULT,则字段该为FBM df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x) if 'asin1' not in df.columns: # 如果不存在asin1,则添加asin1字段 df['asin1'] = '' if 'product-id' not in df.columns: # 如果不存在product-id,则添加product-id字段 df['product-id'] = '' # 空值处理 # df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore') df['quantity'] = df['quantity'].map(lambda x:0 if pd.isna(x)==True else int(x)) #库存数量格式处理 df['quantity'] = df['quantity'].astype('int64') # 填充NA值 df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore') # 为NA的价格填充0 df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore') df.fillna('',inplace=True) # 时间处理 df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x)) df['update_datetime'] = datetime.now(pytz.UTC).date() # 保留列 origin_columns = ['listing-id','seller_id', 'asin1','seller-sku','title','image_link','country_code', 'marketplace_id','quantity','fulfillment_channel', 'price','opendate','status','update_datetime','product-id','product-id-type','modifier' ] # 连接数据库 conn = SpApiRequest.Data_auth() cursor = conn.cursor() # 执行语句,筛选出asin不为空并且product_id不为空的两列唯一数据。 cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings where asin is not null and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""") query_ = cursor.fetchall() col_name = [i[0] for i in cursor.description] df_datatable = pd.DataFrame(query_, columns=col_name) # 合并数据,左表为新下载的数据,右表为数据库查询的数据 merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id') # 功能函数,提取asin def func_(asin,asin1,product_id,cred,market_p,seller_id,sku): if 'B0' in str(product_id)[:3]: return str(product_id) if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''): if 'B0' in asin[:3]: return asin elif (pd.isna(asin1)==False and asin1!=''): if 'B0' in asin1[:3]: return asin1 listingClient = ListingsItems(credentials=cred, marketplace=market_p) try: r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) print(r1.payload) asin = r1.payload.get("summaries")[0].get("asin") return asin except Exception as e: print("获取图片url过程错误重试, 错误message: ", e) time.sleep(3) r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) print(r1.payload) asin = r1.payload.get("summaries")[0].get("asin") return asin # 应用处理函数,返回asin merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1'] merged_df['image_link'] = '' # 暂时将refresh_token添加在title列,后面获取asin详细数据时需要用到 merged_df['title'] = refresh_token merged_df['modifier'] = '' # 填充NA值 merged_df.fillna('',inplace=True) df1 = merged_df.copy() # df1.to_csv("第一次合并处理后.csv") #获取FBA库存数据 df_fbaInventory = self.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token, conn, seller_id, days) # 合并fba库存数据 if len(df_fbaInventory)>0: df1 = df1.merge(df_fbaInventory,how='left',left_on=['asin1','seller-sku','seller_id','marketplace_id','country_code'],right_on=['asin_','sku_','seller_id_','marketplace_id_','country_code_']) df1['quantity'] = df1.apply(lambda x:x['afn-fulfillable-quantity'] if x['fulfillment_channel']=='FBA' or pd.isna(['afn-fulfillable-quantity'])==False else x['quantity'],axis=1) df1['quantity'] = df1['quantity'].map(lambda x:0 if pd.isna(x) else int(x)) df1['quantity'] = df1['quantity'].fillna(0) # df1.to_csv("第二次合并处理后的数据.csv") # 判断更新数据 update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn) if len(update_df)==0: return '无更新数据插入' # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"}) conn = SpApiRequest.Data_auth() cursor = conn.cursor() # 插入更新的数据 try: insertsql = """insert into asj_ads.seller_listings(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity, fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type,modifier) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" conn.begin() cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist())) conn.commit() print("插入完成") delete_sql = f"""delete from asj_ads.seller_listings where update_datetime<'{(datetime.today()+timedelta(days=-3)).date().isoformat()}'""" cursor.execute(delete_sql) conn.commit() return '插入完成' except Exception as e: print("插入错误:",e) conn.rollback() return '出错回滚' return '' # def get_listing_info(self, sku,seller_id): # desprecated # listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace) # try: # r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) # # print(r1.payload) # json_content = r1.payload.get("summaries")[0] # item_name = json_content.get("itemName") # item_name ='###' if item_name==None else item_name # img = json_content.get("mainImage") # img_url = '###' if img is None else img.get("link") # return str(img_url)+"-----"+ str(item_name) # except Exception as e: # try: # print("获取图片url过程错误重试, 错误message: ",e) # time.sleep(3) # r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) # print(r1.payload) # json_content = r1.payload.get("summaries")[0] # # item_name = json_content.get("itemName") # item_name = '###' if item_name == None else item_name # img = json_content.get("mainImage") # img_url = '###' if img is None else img.get("link") # return str(img_url)+"-----"+ str(item_name) # except Exception as e: # print(e) # return "###-----###" def datetime_deal(self,timestring): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, time deal -时间处理函数 timezone_ = {"AEST":"Australia/Sydney","AEDT":"Australia/Sydney","PST":"America/Los_Angeles", "PDT":"America/Los_Angeles","CST":"America/Chicago","CDT":"America/Chicago", "MET":"MET","MEST":"MET","BST":"Europe/London","GMT":"GMT","CET":"CET", "CEST":"CET","JST":"Asia/Tokyo","BRT":"America/Sao_Paulo"} date_list = str.split(timestring,sep = ' ') if len(date_list)<3: try: return datetime.strptime(date_list[0],"%Y-%m-%d") except: try: return datetime.strptime(date_list[0], "%Y/%m/%d") except: try: return datetime.strptime(date_list[0], "%d/%m/%Y") except Exception as e: print(e) return datetime(1999, 12, 31, 0, 0, 0) try: time_date = datetime.strptime(date_list[0]+date_list[1],"%Y-%m-%d%H:%M:%S") timezone = pytz.timezone(timezone_[date_list[2]]) time_ = timezone.localize(time_date) return time_.astimezone(pytz.UTC) except: try: time_date = datetime.strptime(date_list[0] + date_list[1], "%d/%m/%Y%H:%M:%S") timezone = pytz.timezone(timezone_[date_list[2]]) time_ = timezone.localize(time_date) return time_.astimezone(pytz.UTC) except : try: time_date = datetime.strptime(date_list[0] + date_list[1], "%Y/%m/%d%H:%M:%S") timezone = pytz.timezone(timezone_[date_list[2]]) time_ = timezone.localize(time_date) return time_.astimezone(pytz.UTC) except Exception as e1: print(e1) return datetime(1999,12,31,0,0,0) def update_data(self,df,seller_id,country_code,conn): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, data compare conn = SpApiRequest.Data_auth() cursor = conn.cursor() columns = ['listing-id', 'seller_id', 'asin1', 'seller-sku', 'title', 'image_link', 'country_code', 'marketplace_id', 'quantity', 'fulfillment_channel', 'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type','modifier' ] if country_code=='GB': country_code="UK" df['country_code'] = "UK" df_data = pd.DataFrame(columns=columns) delete_list = [] marketplace_id = self.marketplace.marketplace_id try: cursor.execute(f"""select * from asj_ads.seller_listings where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""") col = [i[0] for i in cursor.description] query_rel = cursor.fetchall() df_rel = pd.DataFrame(query_rel, columns=col) # df_rel.to_csv("数据库数据.csv") #数据库数据 df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64') df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64') df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64') # 新数据 df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]') df['quantity'] = df['quantity'].fillna(0).astype('int64') df['price']= df['price'].fillna(0.0).astype('float64') row = 0 while row < len(df): temp_df = df.iloc[row, :] temp_d = df.iloc[row:row+1, :] listing_id = temp_df['listing-id'] asin = temp_df['asin1'] sku = temp_df['seller-sku'] quantity = temp_df['quantity'] fulfillment_channel = temp_df['fulfillment_channel'] price = temp_df['price'] product_id = temp_df['product-id'] title = temp_df['title'] imageurl = temp_df['image_link'] modifier = temp_df['modifier'] temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl and modifier==@modifier""") # print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1 if len(temp)>1: # temp = temp.head(1).to_numpy().tolist() df_data = pd.concat((df_data,temp_d),ignore_index=True) #df_data.append(temp_df, ignore_index=True) delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id)) # print(len(temp)) elif len(temp)==0: df_data = pd.concat((df_data,temp_d),ignore_index=True) delete_list.append((seller_id,marketplace_id,sku,listing_id,product_id)) row += 1 print("判断不同数据条数",len(delete_list)) print("预计更新数据条数",len(df_data)) try: # print(tuple(delete_list)) if len(delete_list)>0: query = f"""delete from asj_ads.seller_listings where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s""" cursor.execute(query,(delete_list,)) conn.commit() # print(delete_list) print("进行中...") except Exception as e: print(e) conn.rollback() return df_data except Exception as e: print("错误:", e) return df # def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id,days=-2): # not be used,退货报告,空数据 # shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") # # print(shopReportday) # para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA, # "dataStartTime": shopReportday, "dataEndTime": shopReportday, # } # reportid = self.create_report(**para) # {"ShowSalesChannel":"true"} # decom_df = self.decompression(reportid) # print(decom_df) # print(decom_df.columns) def GET_SALES_AND_TRAFFIC_REPORT(self, refresh_token,seller_id,days=-2,**kwargs): # To datatable asj_ads.SalesAndTrafficByAsin,销售流量表 # ,level:Literal["PARENT","CHILD","SKU"]="PARENT") level = "PARENT" if len(kwargs.get("level"))==0 else kwargs.get("level") countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode") # print(level) shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") print(shopReportday,countryCode,seller_id) try: conn = self.Data_auth() cursor = conn.cursor() except: time.sleep(5) conn = self.Data_auth() cursor = conn.cursor() if level == 'SKU': query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where seller_id='{seller_id}' and data_date='{shopReportday}' and countryCode='{countryCode}' and childAsin is not Null and sku is not Null""" elif level == 'CHILD': query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where seller_id='{seller_id}' and data_date='{shopReportday}' and countryCode='{countryCode}' and sku is null and childAsin is not null""" elif level == 'PARENT': query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where seller_id='{seller_id}' and data_date='{shopReportday}' and countryCode='{countryCode}' and sku is null and childAsin is null""" else: return '' print(query_judge) cursor.execute(query_judge) rel = cursor.fetchall() # print() if rel[0][0]!=0: return '已存在' # print(shopReportday) para = {"reportType": ReportType.GET_SALES_AND_TRAFFIC_REPORT, "dataStartTime": shopReportday, "dataEndTime": shopReportday, "reportOptions":{"dateGranularity":"DAY","asinGranularity":level} } reportid = self.create_report(**para) # {"ShowSalesChannel":"true"} decom_df = self.decompression(reportid) # print(decom_df.columns[0]) data_rel = self.sales_traffic_datadeal(decom_df.columns[0],seller_id,countryCode) try: conn = self.Data_auth() cursor = conn.cursor() except: time.sleep(5) conn = self.Data_auth() cursor = conn.cursor() # print(list(conn.query("select * from amz_sp_api.orderReport"))) sql = f""" insert into asj_ads.SalesAndTrafficByAsin(data_date,data_marketpalceId,parent_asin, childAsin,sku,sBA_unitsOrdered,sBA_unitsOrderedB2B,sBA_amount, currencyCode,totalOrderItems,totalOrderItemsB2B,tBA_browserSessions, tBA_browserSessionsB2B,tBA_mobileAppSessions,tBA_mobileAppSessionsB2B, tBA_sessions,tBA_sessionsB2B,tBA_browserSessionPercentage, tBA_browserSessionPercentageB2B,tBA_mobileAppSessionPercentage, tBA_mobileAppSessionPercentageB2B,tBA_sessionPercentage, tBA_sessionPercentageB2B,tBA_browserPageViews,tBA_browserPageViewsB2B, tBA_mobileAppPageViews,tBA_mobileAppPageViewsB2B,tBA_pageViews, tBA_pageViewsB2B,tBA_browserPageViewsPercentage,tBA_browserPageViewsPercentageB2B, tBA_mobileAppPageViewsPercentage,tBA_mobileAppPageViewsPercentageB2B, tBA_pageViewsPercentage,tBA_pageViewsPercentageB2B,tBA_buyBoxPercentage, tBA_buyBoxPercentageB2B,tBA_unitSessionPercentage,tBA_unitSessionPercentageB2B,seller_id,countryCode) values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s) """ # ok try: # TODO conn.begin() cursor.executemany(sql, data_rel) conn.commit() print("插入完成") conn.close() time.sleep(1) except Exception as e: conn.rollback() print(e) def sales_traffic_datadeal(self,data,seller_id,countryCode): # used in GET_SALES_AND_TRAFFIC_REPORT data = eval(data) if len(data['salesAndTrafficByAsin'])==0: return [] data_list = [] data_date = data["reportSpecification"]["dataEndTime"] data_marketpalceId = data["reportSpecification"]["marketplaceIds"][0] # print(data_marketpalceId) for single_item in data["salesAndTrafficByAsin"]: # print(single_item) parent_asin = single_item.get("parentAsin") childAsin = single_item.get("childAsin") sku = single_item.get("sku") salesByAsin = single_item.get("salesByAsin") # if salesByAsin is not None: sBA_unitsOrdered = salesByAsin.get("unitsOrdered") if salesByAsin is not None else '' sBA_unitsOrderedB2B = salesByAsin.get("unitsOrderedB2B") if salesByAsin is not None else '' orderedProductSales = salesByAsin.get("orderedProductSales") sBA_amount = orderedProductSales.get("amount") if orderedProductSales is not None else '' currencyCode = orderedProductSales.get("currencyCode") if orderedProductSales is not None else '' orderedProductSalesB2B = salesByAsin.get("orderedProductSalesB2B") if salesByAsin is not None else None # if orderedProductSalesB2B is not None: oPS_amount = orderedProductSalesB2B.get("amount") if orderedProductSalesB2B is not None else '' totalOrderItems = salesByAsin.get("totalOrderItems") if salesByAsin is not None else '' totalOrderItemsB2B = salesByAsin.get("totalOrderItemsB2B") if salesByAsin is not None else '' trafficByAsin = single_item.get("trafficByAsin") # if trafficByAsin is not None: tBA_browserSessions = trafficByAsin.get("browserSessions") if trafficByAsin is not None else '' tBA_browserSessionsB2B = trafficByAsin.get("browserSessionsB2B") if trafficByAsin is not None else '' tBA_mobileAppSessions = trafficByAsin.get("mobileAppSessions") if trafficByAsin is not None else '' tBA_mobileAppSessionsB2B = trafficByAsin.get("mobileAppSessionsB2B") if trafficByAsin is not None else '' tBA_sessions = trafficByAsin.get("sessions") if trafficByAsin is not None else '' tBA_sessionsB2B = trafficByAsin.get("sessionsB2B") if trafficByAsin is not None else '' tBA_browserSessionPercentage = trafficByAsin.get("browserSessionPercentage") if trafficByAsin is not None else '' tBA_browserSessionPercentageB2B = trafficByAsin.get("browserSessionPercentageB2B") if trafficByAsin is not None else '' tBA_mobileAppSessionPercentage = trafficByAsin.get("mobileAppSessionPercentage") if trafficByAsin is not None else '' tBA_mobileAppSessionPercentageB2B = trafficByAsin.get("mobileAppSessionPercentageB2B") if trafficByAsin is not None else '' tBA_sessionPercentage = trafficByAsin.get("sessionPercentage") if trafficByAsin is not None else '' tBA_sessionPercentageB2B = trafficByAsin.get("sessionPercentageB2B") if trafficByAsin is not None else '' tBA_browserPageViews = trafficByAsin.get("browserPageViews") if trafficByAsin is not None else '' tBA_browserPageViewsB2B = trafficByAsin.get("browserPageViewsB2B") if trafficByAsin is not None else '' tBA_mobileAppPageViews = trafficByAsin.get("mobileAppPageViews") if trafficByAsin is not None else '' tBA_mobileAppPageViewsB2B = trafficByAsin.get("mobileAppPageViewsB2B") if trafficByAsin is not None else '' tBA_pageViews = trafficByAsin.get("pageViews") if trafficByAsin is not None else '' tBA_pageViewsB2B = trafficByAsin.get("pageViewsB2B") if trafficByAsin is not None else '' tBA_browserPageViewsPercentage = trafficByAsin.get("browserPageViewsPercentage") if trafficByAsin is not None else '' tBA_browserPageViewsPercentageB2B = trafficByAsin.get("browserPageViewsPercentageB2B") if trafficByAsin is not None else '' tBA_mobileAppPageViewsPercentage = trafficByAsin.get("mobileAppPageViewsPercentage") if trafficByAsin is not None else '' tBA_mobileAppPageViewsPercentageB2B = trafficByAsin.get("mobileAppPageViewsPercentageB2B") if trafficByAsin is not None else '' tBA_pageViewsPercentage = trafficByAsin.get("pageViewsPercentage") if trafficByAsin is not None else '' tBA_pageViewsPercentageB2B = trafficByAsin.get("pageViewsPercentageB2B") if trafficByAsin is not None else '' tBA_buyBoxPercentage = trafficByAsin.get("buyBoxPercentage") if trafficByAsin is not None else '' tBA_buyBoxPercentageB2B = trafficByAsin.get("buyBoxPercentageB2B") if trafficByAsin is not None else '' tBA_unitSessionPercentage = trafficByAsin.get("unitSessionPercentage") if trafficByAsin is not None else '' tBA_unitSessionPercentageB2B = trafficByAsin.get("unitSessionPercentageB2B") if trafficByAsin is not None else '' data_list.append([data_date,data_marketpalceId,parent_asin, childAsin,sku,sBA_unitsOrdered,sBA_unitsOrderedB2B,sBA_amount, currencyCode,totalOrderItems,totalOrderItemsB2B,tBA_browserSessions, tBA_browserSessionsB2B,tBA_mobileAppSessions,tBA_mobileAppSessionsB2B, tBA_sessions,tBA_sessionsB2B,tBA_browserSessionPercentage, tBA_browserSessionPercentageB2B,tBA_mobileAppSessionPercentage, tBA_mobileAppSessionPercentageB2B,tBA_sessionPercentage, tBA_sessionPercentageB2B,tBA_browserPageViews,tBA_browserPageViewsB2B, tBA_mobileAppPageViews,tBA_mobileAppPageViewsB2B,tBA_pageViews, tBA_pageViewsB2B,tBA_browserPageViewsPercentage,tBA_browserPageViewsPercentageB2B, tBA_mobileAppPageViewsPercentage,tBA_mobileAppPageViewsPercentageB2B, tBA_pageViewsPercentage,tBA_pageViewsPercentageB2B,tBA_buyBoxPercentage, tBA_buyBoxPercentageB2B,tBA_unitSessionPercentage,tBA_unitSessionPercentageB2B,seller_id,countryCode ]) # print(data_list) return data_list def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self, refresh_token,seller_id,days=-1,**kwargs): # To datatable asj_ads.orderReport_ - 获取订单报告 countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode") shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") print(shopReportday) try: conn = self.Data_auth() cursor = conn.cursor() except: time.sleep(5) conn = self.Data_auth() cursor = conn.cursor() query_judge = f"""select count(*) from asj_ads.orderReport_ where ReportDate='{shopReportday}' and country_code='{countryCode}'""" print(query_judge) cursor.execute(query_judge) rel = cursor.fetchall() # print() if rel[0][0]!=0: print("已存在") return '已存在' para = {"reportType": ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL, "dataStartTime": shopReportday, "dataEndTime": shopReportday, "reportOptions": {"ShowSalesChannel": "true"}} reportid = self.create_report(**para) # {"ShowSalesChannel":"true"} decom_df = self.decompression(reportid) decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0) decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0) decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype( 'string') if "purchase-order-number" in decom_df.columns: decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string") decom_df.fillna('', inplace=True) # decom_df.to_csv('order.csv') decom_df["ReportDate"] = parse(shopReportday) # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill') decom_df['timezone'] = "UTC" print("==========================================================") decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap( lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x) if 'is-business-order' not in decom_df.columns: decom_df['is-business-order'] = None if 'purchase-order-number' not in decom_df.columns: decom_df['purchase-order-number'] = '-' if 'price-designation' not in decom_df.columns: decom_df['price-designation'] = '-' decom_df['seller_id'] = seller_id country_code = str(self.marketplace)[-2:] if country_code == 'GB': country_code = "UK" # decom_df['country_code'] = "UK" decom_df['country_code'] = country_code decom_df['insert_time'] = datetime.now() # print(decom_df[]) reserve_columns = ["amazon-order-id", "merchant-order-id", "purchase-date", "last-updated-date", "order-status", "fulfillment-channel", "sales-channel", "order-channel", "ship-service-level", "product-name", "sku", "asin", "item-status", "quantity", "currency", "item-price", "item-tax", "shipping-price", "shipping-tax", "gift-wrap-price", "gift-wrap-tax", "item-promotion-discount", "ship-promotion-discount", "ship-city", "ship-state", "ship-postal-code", "ship-country", "promotion-ids", "is-business-order", "purchase-order-number", "price-designation", "ReportDate", "timezone", "seller_id", "country_code",'insert_time' ] list_df = decom_df[reserve_columns].to_numpy().tolist() try: conn = self.Data_auth() cursor = conn.cursor() except: time.sleep(5) conn = self.Data_auth() cursor = conn.cursor() # print(list(conn.query("select * from amz_sp_api.orderReport"))) sql = f""" insert into asj_ads.orderReport_(`amazon_order_id`, `merchant_order_id`, `purchase_date`, `last_updated_date`, `order_status`, `fulfillment_channel`, `sales_channel`, `order_channel`, `ship_service_level`, `product_name`, `sku`, `asin`, `item_status`, `quantity`, `currency`, `item_price`, `item_tax`, `shipping_price`, `shipping_tax`, `gift_wrap_price`, `gift_wrap_tax`, `item_promotion_discount`, `ship_promotion_discount`, `ship_city`, `ship_state`, `ship_postal_code`, `ship_country`, `promotion_ids`, `is_business_order`, `purchase_order_number`, `price_designation`, `ReportDate`, `timezone`, `seller_id`, `country_code`,`insert_time`) values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s) """ # ok try: conn.begin() cursor.executemany(sql, list_df) conn.commit() print("插入完成") conn.close() time.sleep(1) except Exception as e: conn.rollback() print(e) def timeDeal(self, orgTime): # time-deal orgTime = parse(orgTime) timezone = pytz.timezone("UTC") shopTime = orgTime.astimezone(timezone) shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute, shopTime.second) return shopTime_datetime @classmethod def listing_infoTable(cls): # To datatable asj_ads.Goods - Goods表,包括排名,图片,父子关系 conn = SpApiRequest.Data_auth() cursor = conn.cursor() # cursor.execute(f"""select seller_id,country_code,asin,title from asj_ads.seller_listings where title is not null and title <>'' and (seller_id,country_code,asin) not in (select seller_id,countryCode,asin from asj_ads.Goods where update_time>='{datetime.today().date()}') group by seller_id,title,country_code,asin order by country_code desc""") query_ = cursor.fetchall() # print(query_) col_name = [i[0] for i in cursor.description] df_datatable = pd.DataFrame(query_, columns=col_name) count=0 distance = 50 print(len(df_datatable)) while count='{datetime.today().date()}'""") query_d = cursor.fetchall() # print(query_d) try: # print(tuple(delete_list)) query = f"""delete from asj_ads.Goods where update_time<'{datetime.today().date()}' and (seller_id,countryCode,asin) in {query_d} """ #where (seller_id,country_code) in %s""" cursor.execute(query) conn.commit() # print(delete_list) # print("进行中...") except Exception as e: print(e) conn.rollback() print("Success") conn.close() @classmethod def Goods_drop_duplicates(cls): conn = SpApiRequest.Data_auth() cursor = conn.cursor() cursor.execute( f"""select seller_id,countryCode,asin as count_ from asj_ads.Goods group by seller_id,countryCode,asin having count(asin)>=2""") query_ = cursor.fetchall() print(len(query_)) if len(query_)>0: query_sql2 = cursor.execute( f"""select distinct main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time from asj_ads.Goods where (seller_id,countryCode,asin) in {query_}""")# query_2 = cursor.fetchall() try: query = f"""delete from asj_ads.Goods where (seller_id,countryCode,asin) in {query_} """ # where (seller_id,country_code) in %s""" cursor.execute(query) conn.commit() except Exception as e: print("错误过程1",e) conn.rollback() sql = """insert into asj_ads.Goods(main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" try: conn.begin() cursor.executemany(sql, query_2) conn.commit() print("插入完成") conn.close() time.sleep(1) except Exception as e: conn.rollback() print("错误过程2",e) @staticmethod def get_listing_info01(refresh_token, countryCode, asin, seller_id): # used in listing_infoTable # print(refresh_token) aws_credentials = { 'refresh_token': refresh_token, 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证 'lwa_client_secret': 'amzn1.oa2-cs.v1.13ebfa8ea7723b478215a2b61f4dbf5ca6f6927fa097c2dd0941a66d510aca7f', 'aws_access_key': 'AKIARBAGHTGOZC7544GN', 'aws_secret_key': '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', 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE' } mak = {'AE': Marketplaces.AE, 'BE': Marketplaces.BE, 'DE': Marketplaces.DE, 'PL': Marketplaces.PL, 'EG': Marketplaces.EG, 'ES': Marketplaces.ES, 'FR': Marketplaces.FR, 'GB': Marketplaces.GB, 'IN': Marketplaces.IN, 'IT': Marketplaces.IT, 'NL': Marketplaces.NL, 'SA': Marketplaces.SA, 'SE': Marketplaces.SE, 'TR': Marketplaces.TR, 'UK': Marketplaces.UK, 'AU': Marketplaces.AU, 'JP': Marketplaces.JP, 'SG': Marketplaces.SG, 'US': Marketplaces.US, 'BR': Marketplaces.BR, 'CA': Marketplaces.CA, 'MX': Marketplaces.MX } cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode],version='2022-04-01') try: variations_info = SpApiRequest.variations_judge(cate_item, asin) except: time.sleep(2.5) variations_info = SpApiRequest.variations_judge(cate_item, asin) try: cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode], version='2020-12-01') detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode) except: time.sleep(2.5) cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode], version='2020-12-01') detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode) detail_info.update(variations_info) detail_info['asin'] = asin detail_info['countryCode'] = countryCode detail_info['marketplace_id'] = mak[countryCode].marketplace_id detail_info['seller_id'] = seller_id detail_info['update_time'] = datetime.now() return detail_info @staticmethod def variations_judge(cate_item, asin): # used in listing_infoTable, 判断是否有父子关系 def temp_func(cate_item, asin): variations = cate_item.get_catalog_item(asin=asin, **{"includedData": ['relationships']}) # 'variations', var_info = variations.payload # print(variations) IsParent = 'Y' parent_asin = '' try: relationships = var_info.get("relationships")[0]['relationships'] except: relationships = [] if len(relationships) > 0: variationType = relationships[0]['type'] if relationships[0].get('parentAsins') is not None: parent_asin = relationships[0]['parentAsins'][0] IsParent = 'N' elif relationships[0].get('childAsins') is not None: IsParent = 'Y' parent_asin = asin else: parent_asin = 'Erro_01' else: IsParent = 'SG' parent_asin = asin print(IsParent, '父asin:', parent_asin, '子asin', asin) return {"IsParent": IsParent, "parent_asin": parent_asin} try: return temp_func(cate_item, asin) except: try: time.sleep(12) return temp_func(cate_item, asin) except Exception as e: print("判断是否为父子asin时出错:", e) return {"IsParent": 'Erro', "parent_asin": 'Erro'} @staticmethod def Goods_insert(conn,detail_info,detail_info_k): # To datatable asj.Goods # print(detail_info) df = pd.DataFrame(detail_info) # print(df.columns) try: cursor = conn.cursor() except: time.sleep(2.5) conn = SpApiRequest.Data_auth() cursor = conn.cursor() query_sql = "select * from asj_ads.Goods" cursor.execute(query_sql) col = [i[0] for i in cursor.description] query_rel = cursor.fetchall() df_query = pd.DataFrame(query_rel, columns=col) merge_df = df.merge(df_query,how='left',on=['asin', 'countryCode', 'marketplace_id', 'seller_id'],suffixes=['','_right']) merge_df['IsParent'] = merge_df.apply(lambda x:x['IsParent_right'] if x['IsParent']=='Erro' else x['IsParent'],axis=1) merge_df['parent_asin'] = merge_df.apply(lambda x: x['parent_asin_right'] if x['parent_asin'] == 'Erro' else x['parent_asin'], axis=1) merge_df = merge_df.query("IsParent!='Erro' and parent_asin!='Erro'") detail_info_value = merge_df[['main_image', 'productTypes', 'BigCat_rank', 'BigCat_title', 'SmallCat_rank', 'SmallCat_title', 'brandName', 'browseNode', 'itemName', 'IsParent', 'parent_asin', 'asin', 'countryCode', 'marketplace_id', 'seller_id', 'update_time']].to_numpy().tolist() try: insertsql = """insert into asj_ads.Goods(main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" conn.begin() cursor.executemany(insertsql, tuple(detail_info_value)) conn.commit() print("插入完成Goods") except Exception as e: print("插入错误Goods:", e) conn.rollback() sales_rankData = [] for i in detail_info_value: tmp_list = [] for j in [2,3,4,5,6,8,11,12,13,14]: tmp_list.extend([i[j]]) tmp_list.extend([datetime.now(),datetime.utcnow()]) sales_rankData.append(tmp_list) # print(sales_rankData,len(sales_rankData)) try: insertsql = """insert into asj_ads.ProductRank(BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, itemName, asin, countryCode, marketplace_id, seller_id, update_time,time_UTC) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" conn.begin() cursor.executemany(insertsql, tuple(sales_rankData)) conn.commit() print("插入完成rank") # return '插入完成' except Exception as e: print("插入错误rank:", e) conn.rollback() @staticmethod def get_detail_cat(cate_item, asin, mak, countryCode): # used in listing_infoTable, category sp-api - 获取listing的信息数据 try: detail_info = cate_item.get_catalog_item(asin=asin, **{ "includedData": ["images,productTypes,salesRanks,summaries"], "marketplaceIds": [str(mak[countryCode].marketplace_id)]}) payload = detail_info.payload # print(payload) try: main_image = payload['images'][0]['images'][0]['link'] if len(payload['images']) > 0 else "#" except: main_image = '#-' try: productTypes = payload['productTypes'][0]['productType'] if len(payload['productTypes'][0]) > 0 else "#" except: productTypes = '#-' try: # print(payload['ranks'][0]) if len(payload['ranks'][0]) > 0: BigCat_rank = payload['ranks'][0]['ranks'][0]['rank'] BigCat_title = payload['ranks'][0]['ranks'][0]['title'] SmallCat_rank = payload['ranks'][0]['ranks'][1]['rank'] SmallCat_title = payload['ranks'][0]['ranks'][1]['title'] else: BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title = 0, '#', 0, '#' except: BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title = 0, '#-', 0, '#-' try: if len(payload['summaries'][0]) > 0: brandName = payload['summaries'][0]['brandName'] browseNode = payload['summaries'][0]['browseNode'] itemName = payload['summaries'][0]['itemName'] else: brandName, browseNode, itemName = '#', '#', '#' except: brandName, browseNode, itemName = '#-', '#-', '#-' return {'main_image': main_image, 'productTypes': productTypes, 'BigCat_rank': BigCat_rank, 'BigCat_title': BigCat_title, 'SmallCat_rank': SmallCat_rank, 'SmallCat_title': SmallCat_title, 'brandName': brandName, 'browseNode': browseNode, 'itemName': itemName} except: return {'main_image': '', 'productTypes': '', 'BigCat_rank': 0, 'BigCat_title': '', 'SmallCat_rank': 0, 'SmallCat_title': '', 'brandName': '', 'browseNode': '', 'itemName': ''} def GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT(self, refresh_token,seller_id,days=-3,**kwargs): shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-04-21") shopReportday_E = (datetime.now() + timedelta(days=days)).strftime("%Y-04-27") print(shopReportday) para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT,"reportOptions":{"reportPeriod": "WEEK"},"dataStartTime": shopReportday, "dataEndTime": shopReportday_E,} reportid = self.create_report(**para) @staticmethod def GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT_DAY(self,**kwargs): shopReportday = "2024-09-04" shopReportday_E = "2024-09-04" print(shopReportday) para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT,"reportOptions":{"reportPeriod": "DAY"},"dataStartTime": shopReportday, "dataEndTime": shopReportday_E,} reportid = self.create_report(**para) def BRAND_ANALYTICS_TEXT_deal(self,text): pass def GET_SELLER_FEEDBACK_DATA(self,refresh_token,seller_id,days,**a_kw): kwargs = a_kw country_code = str(self.marketplace)[-2:] if country_code == 'GB': country_code = "UK" insert_time = datetime.now() shopReportday = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d") shopReportday_E = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d") print(shopReportday) para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,#GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA "dataStartTime": shopReportday, "dataEndTime": shopReportday_E, "reportOptions":{"reportPeriod": "DAY"}} try: reportid = self.create_report(**para) decom_df = self.decompression(reportid) except: return pd.DataFrame() if len(decom_df) == 0: return reportid decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap( lambda x: str(x) if not pd.isna(x) else '') decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap( lambda x: str(x) if not pd.isna(x) else '') decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[ decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '') decom_df.fillna('', inplace=True) # decom_df.to_csv('order.csv') decom_df["ReportDate"] = parse(shopReportday) # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill') decom_df['timezone'] = "UTC" # print("==========================================================") decom_df['seller_id'] = seller_id decom_df['country_code'] = country_code decom_df['insert_time'] = datetime.now() df_feedback = decom_df.copy() df_feedback[["date", "rating", "comment", "order_id"]] = df_feedback[["Date", "Rating", "Comments", "Order ID"]] df_feedback['isFeedback'] = 1 df_feedback['date'] = df_feedback['date'].map(lambda x: pd.to_datetime(x).date()) df_feedback['seller_id'] = seller_id df_feedback['country_code'] = country_code client = get_client(host='3.93.43.158', port=8123, username='root', password='6f0eyLuiVn3slzbGWpzI') try: client.insert_df(table='ams.performance', df=df_feedback[ ["date", "rating", "comment", "order_id", "isFeedback", 'seller_id', 'country_code','insert_time']], column_names=["date", "rating", "comment", "order_id", "isFeedback", 'seller_id', 'country_code','insert_time']) except Exception as e: print(e) client.close() # print(decom_df.columns) # print(decom_df) return reportid def GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA(self,refresh_token,seller_id,days,**a_kw): kwargs = a_kw country_code = str(self.marketplace)[-2:] if country_code == 'GB': country_code = "UK" insert_time = datetime.now() shopReportday = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d") shopReportday_E = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d") # print(shopReportday) # print("1"*50) para = {"reportType": ReportType.GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA,#GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA "dataStartTime": shopReportday, "dataEndTime": shopReportday_E, "reportOptions":{"reportPeriod": "DAY"}} try: reportid = self.create_report(**para) decom_df = self.decompression(reportid) except: return pd.DataFrame() if len(decom_df) == 0: return reportid # print("2"*100) decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap( lambda x: str(x) if not pd.isna(x) else '') decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap( lambda x: str(x) if not pd.isna(x) else '') decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[ decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '') decom_df.fillna('', inplace=True) # decom_df.to_csv('order.csv') decom_df["ReportDate"] = parse(shopReportday) # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill') decom_df['timezone'] = "UTC" print("==========================================================") decom_df['seller_id'] = seller_id decom_df['country_code'] = country_code decom_df['insert_time'] = datetime.now() df_fbaR = decom_df.copy() df_fbaR[['date', 'order_id', 'product_name', 'detailed_disposition', 'comment']] = df_fbaR[ ['return-date', 'order-id', 'product-name', 'detailed-disposition', 'customer-comments']] # df_fbaR[['return-date','order-id','sku','asin','fnsku','product-name','quantity','reason','detailed-disposition','customer-comments']] df_fbaR['isFeedback'] = 0 df_fbaR['date'] = df_fbaR['date'].map(lambda x: pd.to_datetime(x).date()) df_fbaR['comment'].fillna("", inplace=True) # df_fbaR[['order_id','sku','asin','fnsku','detailed_disposition','product_name','isFeedback','date']] client = get_client(host='3.93.43.158', port=8123, username='root', password='6f0eyLuiVn3slzbGWpzI') try: client.insert_df(table='ams.performance', df=df_fbaR[ ['order_id', 'sku', 'asin', 'fnsku', 'detailed_disposition', 'product_name', 'isFeedback', 'date', 'comment', 'seller_id', 'country_code','insert_time']], column_names=['order_id', 'sku', 'asin', 'fnsku', 'detailed_disposition', 'product_name', 'isFeedback', 'date', 'comment', 'seller_id', 'country_code','insert_time']) except Exception as e: print(e) client.close() # print(decom_df.columns) # print(decom_df) return reportid def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self, refresh_token, seller_id, days, **a_kw): kwargs = a_kw country_code = str(self.marketplace)[-2:] if country_code == 'GB': country_code = "UK" insert_time = datetime.now() shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") shopReportday_E = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") para = {"reportType": ReportType.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE, # GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA "dataStartTime": shopReportday, "dataEndTime": shopReportday_E, "reportOptions": {"reportPeriod": "DAY"}} try: reportid = self.create_report(**para) decom_df = self.decompression(reportid) except: return pd.DataFrame() if len(decom_df) == 0: return reportid decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap(lambda x: str(x) if not pd.isna(x) else '') decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap(lambda x: str(x) if not pd.isna(x) else '') decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '') decom_df.fillna('', inplace=True) # decom_df.to_csv('order.csv') decom_df["ReportDate"] = parse(shopReportday) # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill') decom_df['timezone'] = "UTC" decom_df['seller_id'] = seller_id decom_df['country_code'] = country_code decom_df['insert_time'] = datetime.now() decom_df['Order date'] = decom_df['Order date'].map(lambda x:'' if pd.isna(x) or x=='' else parse(x)) decom_df['Return request date'] = decom_df['Return request date'].map(lambda x:'' if pd.isna(x) or x=='' else parse(x)) client = get_client(host='3.93.43.158', port=8123, username='root', password='6f0eyLuiVn3slzbGWpzI') # print(decom_df.dtypes) # print("======================-------------==================") # decom_df['SafeT claim reimbursement amount'] = decom_df['SafeT claim reimbursement amount'].map(str) # decom_df['Refunded Amount'] = decom_df['Refunded Amount'].map(str) # decom_df['Label cost'] = decom_df['Label cost'].astype('string') # print(decom_df.dtypes) print("="*100) try: client.insert_df('ams.return_record',df=decom_df[['Order ID', 'Order date', 'Return request date', 'Return request status', 'Amazon RMA ID', 'Merchant RMA ID', 'Label type', 'Label cost', 'Currency code', 'Return carrier', 'Tracking ID', 'Label to be paid by', 'A-to-Z Claim', 'Is prime', 'ASIN', 'Merchant SKU', 'Item Name', 'Return quantity', 'Return Reason', 'In policy', 'Return type', 'Resolution', 'Invoice number', 'Return delivery date', 'Order Amount', 'Order quantity', 'SafeT Action reason', 'SafeT claim id', 'SafeT claim state', 'SafeT claim creation time', 'SafeT claim reimbursement amount', 'Refunded Amount', 'ReportDate', 'timezone', 'seller_id', 'country_code', 'insert_time']],column_names=['Order_ID', 'Order_date', 'Return_request_date','Return_request_status', 'Amazon_RMA_ID', 'Merchant_RMA_ID','Label_type', 'Label_cost', 'Currency_code', 'Return_carrier','Tracking_ID', 'Label_to_be_paid_by', 'A-to-Z_Claim', 'Is_prime', 'ASIN', 'Merchant_SKU', 'Item_Name', 'Return_quantity', 'Return_Reason', 'In_policy', 'Return_type', 'Resolution', 'Invoice_number','Return_delivery_date', 'Order_Amount', 'Order_quantity','SafeT_Action_reason', 'SafeT_claim_id', 'SafeT_claim_state','SafeT_claim_creation_time', 'SafeT_claim_reimbursement_amount', 'Refunded_Amount', 'ReportDate', 'timezone', 'seller_id','country_code', 'insert_time']) except Exception as e: print(e) client.close() return reportid # def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self, refresh_token,seller_id,days=-1,**kwargs): # countryCode = None if kwargs.get("countryCode") == None else kwargs.get("countryCode") # shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") # print(shopReportday) # para = {"reportType": ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL, # "dataStartTime": shopReportday, "dataEndTime": shopReportday, # "reportOptions": {"ShowSalesChannel": "true"}} # reportid = self.create_report(**para) # {"ShowSalesChannel":"true"} # decom_df = self.decompression(reportid) # decom_df['seller_id'] = seller_id # country_code = str(self.marketplace)[-2:] # if country_code == 'GB': # country_code = "UK" # # decom_df['country_code'] = "UK" # decom_df['country_code'] = country_code # decom_df['insert_time'] = datetime.now() # shopReportday = "2024-11-04" # shopReportday_E = "2024-11-04" # print(shopReportday) # para = {"reportType": ReportType.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE, # # GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA # "dataStartTime": shopReportday, # "dataEndTime": shopReportday_E, "reportOptions": {"reportPeriod": "DAY"}} # reportid = self.create_report(**para) # print(reportid) # return reportid @staticmethod def data_judge_secondTry(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # Main-retry, 重试函数,重试次数2次 a_kw = kwargs try: SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw) except Exception as e: try: if e.args[0][0]['code']=='InvalidInput': return '' print('first time to try...') time.sleep(15) SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw) except Exception as e: print(e) print('twice time to try...') time.sleep(20) SpApiRequest.data_judge(refresh_token, sp_api, data_type, seller_id, auth_conn, days=days, **a_kw) @staticmethod def data_judge(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # select Report type - 报告获取类型判断 a_kw = kwargs if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA": return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(refresh_token,auth_conn,seller_id,days) elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL": # for day_ in range(31,1): # sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id,days=day_*-1) return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(refresh_token,seller_id,days,**a_kw) # elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE": # return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(refresh_token,seller_id,days) elif data_type =="GET_SALES_AND_TRAFFIC_REPORT": return sp_api.GET_SALES_AND_TRAFFIC_REPORT(refresh_token,seller_id,days,**a_kw) elif data_type == "GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA": return sp_api.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token,auth_conn,seller_id,days,**a_kw) elif data_type=="GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT": return sp_api.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT(refresh_token,seller_id,days,**a_kw) elif data_type=="GET_SELLER_FEEDBACK_DATA": return sp_api.GET_SELLER_FEEDBACK_DATA(refresh_token,seller_id,days,**a_kw) elif data_type=="GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA": return sp_api.GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA(refresh_token,seller_id,days,**a_kw) elif data_type=="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE": return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(refresh_token,seller_id,days,**a_kw) else: return "" @classmethod def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"],days=-1,**kwargs): # Main-AllCountries-AuthAndPost, 所有店铺数据报告获取的主要函数 df = cls.auth_info() zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist() # print(zosi) refreshtoken_list = df.query("account_name!='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist() zosi.extend(refreshtoken_list) refreshtoken_list = zosi.copy() print(len(refreshtoken_list)) # refreshtoken_list = refreshtoken_list+refreshtoken_li a_kw = kwargs for refresh_token in refreshtoken_list: aws_credentials = { 'refresh_token': refresh_token, 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证 'lwa_client_secret': 'amzn1.oa2-cs.v1.13ebfa8ea7723b478215a2b61f4dbf5ca6f6927fa097c2dd0941a66d510aca7f', 'aws_access_key': 'AKIARBAGHTGOZC7544GN', 'aws_secret_key': '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', 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE' } single_info = df.query("refresh_token==@refresh_token") region_circle = single_info['region'].values[0] seller_id = single_info['selling_partner_id'].values[0] account_name = single_info['account_name'].values[0] if region_circle == 'NA': pass for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]: sp_api = SpApiRequest(aws_credentials, marketplace) a_kw['countryCode'] = str(marketplace)[-2:] # print(a_kw) try: print("refresh_token:",refresh_token,'data_type:',data_type,'seller_id:',seller_id,'marketplace:',marketplace.marketplace_id,'country_code:',a_kw['countryCode'],sep='\n') auth_conn = SpApiRequest.Token_auth() print("CHECK>>>>>>>>>>>>>>>>>>>>>>>>>") # print(a_kw) cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw) ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id) except Exception as e: print(e) time.sleep(3.5) elif region_circle == 'EU': pass for marketplace in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL, Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT, Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR, ]: sp_api = SpApiRequest(aws_credentials, marketplace) a_kw['countryCode'] = str(marketplace)[-2:] try: auth_conn = SpApiRequest.Token_auth() cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw) ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id) except Exception as e: print(e) time.sleep(3.5) else: # if region_circle not in ['NA','EU']: auth_conn = SpApiRequest.Token_auth() print(region_circle) marketplace = eval(f'Marketplaces.{region_circle}') sp_api = SpApiRequest(aws_credentials, marketplace) a_kw['countryCode'] = str(marketplace)[-2:] cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw) ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id) def price_tracker(self,asin): base_product = Products(credentials=self.credentials,marketplace=Marketplaces.US) return base_product.get_product_pricing_for_asins(asin) if __name__ == '__main__': for days in range(-35,-2): SpApiRequest.get_allShops("GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE",days=days,**{}) # SpApiRequest.listing_infoTable() # rel = SpApiRequest.get_catelog(account_name='AM-ZOSI-US',country=Marketplaces.US,asin='B0B8CPHSL4') # print(rel) # SpApiRequest ###TEST # df = SpApiRequest.auth_info() # print(df) # zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist() # # print(zosi) # refreshtoken_list = df.query("account_name!='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist() # zosi.extend(refreshtoken_list) # refreshtoken_list = zosi.copy() # aws_credentials = { # 'refresh_token': refreshtoken_list[0], # 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证 # 'lwa_client_secret': 'amzn1.oa2-cs.v1.13ebfa8ea7723b478215a2b61f4dbf5ca6f6927fa097c2dd0941a66d510aca7f', # 'aws_access_key': 'AKIARBAGHTGOZC7544GN', # 'aws_secret_key': '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', # 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE' # } # sp_api = SpApiRequest(aws_credentials, Marketplaces.US) # id = sp_api.GET_SELLER_FEEDBACK_DATA() # print(id) # df= sp_api.decompression(str(id)) # print(df) # df.to_csv('GET_SELLER_FEEDBACK_DATA.csv') #### END # print(sp_api.price_tracker(['B00L3W2QJ2'])) # shopReportday = "2024-08-25" # shopReportday_E = "2024-08-31" # print(shopReportday) # # para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT, "reportOptions": {"reportPeriod": "WEEK"}, # # "dataStartTime": shopReportday, "dataEndTime": shopReportday_E, } # # report = Reports(credentials=aws, marketplace=self.marketplace) # report = Reports(credentials=aws_credentials, marketplace=Marketplaces.US) # rel = report.create_report( # reportType=ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT, marketplaceIds=['ATVPDKIKX0DER'], # reportOptions={"reportPeriod": "WEEK"}, dataStartTime=shopReportday, dataEndTime=shopReportday_E # ) # reportId = rel.payload.get("reportId") # # while True: # reportId_info = report.get_report(reportId=reportId) # # print(reportId_info.payload) # print("please wait...",reportId_info.payload.get("processingStatus")) # # print(reportId_info) # if reportId_info.payload.get("processingStatus") == ProcessingStatus.DONE: # print(reportId_info.payload) # reportDocumentId = reportId_info.payload.get("reportDocumentId") # rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=False) # print(rp_table) # elif reportId_info.payload.get("processingStatus") == ProcessingStatus.FATAL: # reportDocumentId = reportId_info.payload.get("reportDocumentId") # rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=False) # import requests # # requests.get(rp_table['payload']['url']) # # df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'}, # encoding='iso-8859-1') # # df = pandas.read_csv(rp_table['payload']['url'],) # print(df.to_dict(orient='records')) # break