import clickhouse_connect import time import warnings warnings.filterwarnings('ignore') import numpy as np from pymysql import Timestamp from sp_api.util import throttle_retry, load_all_pages from sp_api.api import Orders,ListingsItems,Inventories,Reports,CatalogItems from sp_api.base import Marketplaces,ReportType,ProcessingStatus import pandas as pd import gzip from io import BytesIO,StringIO from datetime import datetime, timedelta,timezone import pytz import time from dateutil.parser import parse import pymysql from typing import List, Literal from apscheduler.schedulers.blocking import BlockingScheduler class SpApiRequest: def __init__(self, credentials,marketplace): self.credentials = credentials self.marketplace = marketplace # self.shopInfo = shop_infos('3006125408623189') # self.timezone = self.shopInfo['time_zone'] # self.profileid = '3006125408623189' @classmethod def mysql_connect_auth(cls): conn = pymysql.connect(user="admin", password="NSYbBSPbkGQUbOSNOeyy", host="retail-data.cnrgrbcygoap.us-east-1.rds.amazonaws.com", database="ansjer_dvadmin", port=3306) return conn @classmethod def mysql_connect_auth_lst(cls): conn = pymysql.connect(user="root", password="sandbox", host="192.168.1.225", database="asj_ads", port=3306) # conn = pymysql.connect(user="huangyifan", # password="123456", # host="127.0.0.1", # database="amz_sp_api", # port=3306) return conn @classmethod def mysql_connect(cls): conn = pymysql.connect(user="huangyifan", password="123456", host="127.0.0.1", database="amz_sp_api", port=3306) return conn @classmethod def mysql_adTest_connect(cls): conn = pymysql.connect(user="root", password="sandbox", host="192.168.1.225", database="asj_ads", port=3306) return conn @classmethod def get_catelog(cls,account_name,country=Marketplaces.US,asin=None): if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]: region = 'NA' elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL, Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT, Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR, ]: region = 'EU' else: region = str(country)[-2:] df = cls.auth_info() try: refresh_token = df.query("account_name==@account_name and region==@region")['refresh_token'].values[0] except: print("请输入正确的account name与Marketplace") return '获取失败' cred = { 'refresh_token': refresh_token, 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证 'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490', 'aws_access_key': 'AKIARBAGHTGOZC7544GN', 'aws_secret_key': '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', 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE' } cate_item = CatalogItems(credentials=cred, marketplace=country) images_info = cate_item.get_catalog_item(asin=asin,**{"includedData":['images']}) images = images_info.images[0].get('images')[0]['link'] title_info = cate_item.get_catalog_item(asin=asin) title = title_info.payload['summaries'][0]['itemName'] return {'images':images,'title':title} def create_report(self,**kwargs): # 创建报告 reportType = kwargs['reportType'] reportOptions =kwargs.get("reportOptions") # give a time period dataStartTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataStartTime") is None else kwargs.get("dataStartTime")+"T00:00:00" dataEndTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataEndTime") is None else kwargs.get("dataEndTime")+"T23:59:59" report = Reports(credentials=self.credentials, marketplace=self.marketplace) rel = report.create_report( reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id], reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime ) reportId = rel.payload.get("reportId") # print(reportId) return reportId def decompression(self,reportId): #解压生成的报告 report = Reports(credentials=self.credentials, marketplace=self.marketplace) while True: reportId_info = report.get_report(reportId=reportId) # print(reportId_info.payload) print("please wait...") if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE: reportDocumentId = reportId_info.payload.get("reportDocumentId") rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False) print(rp_table) if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:# df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1') return df elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']: df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'}, encoding='Shift-JIS') # df.columns = return df elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']: df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1') return df elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']: df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS') return df elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]: print(reportId_info) print("取消或失败") break time.sleep(15) print("please wait...") def data_deal(self,decom_df,seller_id): # 数据处理-for listing SUB decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x)) url_columns = [i for i in decom_df.columns if "url" in i.lower()] if len(url_columns) > 0: decom_df[url_columns] = decom_df[url_columns].astype("string") asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()] if len(asin_columns) > 0: decom_df[asin_columns] = decom_df[asin_columns].astype("string") if 'pending-quantity' in decom_df.columns: decom_df['pending-quantity'] = decom_df['pending-quantity'].map( lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32") deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()] decom_df.drop(columns=deletecolumns, inplace=True) if 'quantity' in decom_df.columns: decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype( "int32") decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x)) if 'add-delete' in decom_df.columns: decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore') if 'will-ship-internationally' in decom_df.columns: decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',errors='ignore') if 'expedited-shipping' in decom_df.columns: decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string',errors='ignore') decom_df['updateTime'] = datetime.now() decom_df['timezone'] = "UTC" decom_df['seller_id'] = seller_id # decom_df['item-description'] = decom_df['item-description'].str.slice(0,500) decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0) decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0) decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype( 'string') decom_df.fillna('', inplace=True) # print(decom_df.info()) return decom_df def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # 获取listing信息 SUB start = time.time() para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA} reportid = self.create_report(**para) decom_df = self.decompression(reportid) print("连接数据库") conn = self.mysql_connect() print("连接成功") cursor = conn.cursor() timezone = "UTC" #pytz.timezone(self.timezone) bondary_date = (datetime.now()).strftime("%Y-%m-%d") #+ timedelta(days=-28) cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and (`seller-sku` not in ('',' ') and `seller-sku` is not null) and `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl` col = [i[0] for i in cursor.description] query_rel = cursor.fetchall() if len(query_rel)!=0: print(query_rel[0]) df = pd.DataFrame(query_rel,columns=col) listingid = df['listing-id'].to_numpy().tolist() decom_df = decom_df.query("`listing-id` not in @listingid") print("数据条数: ",len(decom_df)) # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}") # conn.commit() if len(decom_df)==0: return "Done" if limit != None: decom_df = decom_df.iloc[:limit,:] print("getting mainImageInfo...") rowcount = 0 while rowcount < len(decom_df): df_insert = decom_df.copy() df_insert = df_insert.iloc[rowcount:rowcount + 200, :] df_insert = self.data_deal(df_insert) list_df = df_insert.to_numpy().tolist() # print(list(conn.query("select * from amz_sp_api.orderReport"))) sql = f""" insert into amz_sp_api.productInfo values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s) """ #ok # print(sql) conn = self.mysql_connect() cursor = conn.cursor() try: conn.begin() cursor.executemany(sql, list_df) conn.commit() print("插入中...") insert_listingid = df_insert['listing-id'].to_numpy().tolist() cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'") conn.commit() rowcount += 200 except Exception as e: conn.rollback() print(e) try: conn = self.mysql_connect() cursor = conn.cursor() conn.begin() cursor.executemany(sql, list_df) conn.commit() insert_listingid = df_insert['listing-id'].to_numpy().tolist() cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'") conn.commit() except Exception as e: conn.rollback() print(e) break # break conn.close() print("全部完成") end =time.time() print("duration:",end-start) return decom_df def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,conn=None,seller_id=None): # MAIN para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA} reportid = self.create_report(**para) df = self.decompression(reportid) if len(df)>0: if self.marketplace.marketplace_id =='A1VC38T7YXB528': df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description', 'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status'] df['seller_id'] = seller_id df['marketplace_id'] = self.marketplace.marketplace_id df['country_code'] = str(self.marketplace)[-2:] if 'fulfilment-channel' in df.columns: print("changed fulfilment-channel:") print(seller_id,self.marketplace) df['fulfillment-channel'] = df['fulfilment-channel'].copy() df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x) df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x) if 'asin1' not in df.columns: df['asin1'] = '' if 'product-id' not in df.columns: df['product-id'] = '' # 空值处理 df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore') df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore') df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore') df.fillna('',inplace=True) # 时间处理 df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x)) df['update_datetime'] = datetime.now(pytz.UTC).date() origin_columns = ['listing-id','seller_id', 'asin1','seller-sku','title','image_link','country_code', 'marketplace_id','quantity','fulfillment_channel', 'price','opendate','status','update_datetime','product-id','product-id-type' ] # DONE 连数据库 conn = SpApiRequest.mysql_connect_auth_lst() cursor = conn.cursor() # DONE 修改查询条件 cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings_dailyUP where asin is not null and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""") query_ = cursor.fetchall() col_name = [i[0] for i in cursor.description] df_datatable = pd.DataFrame(query_, columns=col_name) merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id') print(merged_df.head()) def func_(asin,asin1,product_id,cred,market_p,seller_id,sku): # 创建功能函数 if 'B0' in str(product_id)[:3]: return str(product_id) if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''): if 'B0' in asin[:3]: return asin elif (pd.isna(asin1)==False and asin1!=''): if 'B0' in asin1[:3]: return asin1 listingClient = ListingsItems(credentials=cred, marketplace=market_p) # 获取listing 信息 try: r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) print(r1.payload) asin = r1.payload.get("summaries")[0].get("asin") return asin except Exception as e: print("获取图片url过程错误重试, 错误message: ", e) time.sleep(3) r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) print(r1.payload) asin = r1.payload.get("summaries")[0].get("asin") return asin # 应用自定函数 merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1'] print("获取listing Info...") merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1) merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True) # merged_df['image_link'] = '' # merged_df['title'] = '' merged_df.fillna('',inplace=True) df1 = merged_df.copy() print(df1[origin_columns].head(1)) # DONE 更新数据 update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn) if len(update_df)==0: return '无更新数据插入' # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"}) # DONE 连接数据库 conn = SpApiRequest.mysql_connect_auth_lst() cursor = conn.cursor() try: insertsql = """insert into asj_ads.seller_listings_dailyUP(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity, fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" conn.begin() cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist())) conn.commit() print("插入完成") return '插入完成' except Exception as e: print("插入错误:",e) conn.rollback() return '出错回滚' def get_listing_info(self, sku,seller_id): # MAIN—GET LISTING listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace) try: r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) # print(r1.payload) json_content = r1.payload.get("summaries")[0] item_name = json_content.get("itemName") item_name ='###' if item_name==None else item_name img = json_content.get("mainImage") img_url = '###' if img is None else img.get("link") # print(str(img_url)+"-----"+ str(item_name)) return str(img_url)+"-----"+ str(item_name) except Exception as e: try: print("获取图片url过程错误重试, 错误message: ",e) time.sleep(3) r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku) print(r1.payload) json_content = r1.payload.get("summaries")[0] item_name = json_content.get("itemName") item_name = '###' if item_name == None else item_name img = json_content.get("mainImage") img_url = '###' if img is None else img.get("link") return str(img_url)+"-----"+ str(item_name) except Exception as e: print(e) return "###-----###" def datetime_deal(self,timestring): #时间处理-Main timezone_ = {"AEST":"Australia/Sydney", "AEDT":"Australia/Sydney", "PST":"America/Los_Angeles", "PDT":"America/Los_Angeles", "CST":"America/Chicago", "CDT":"America/Chicago", "MET":"MET", "MEST":"MET", "BST":"Europe/London", "GMT":"GMT", "CET":"CET", "CEST":"CET", "JST":"Asia/Tokyo", "BRT":"America/Sao_Paulo"} date_list = str.split(timestring,sep = ' ') if len(date_list)<3: try: return datetime.strptime(date_list[0],"%Y-%m-%d") except: try: return datetime.strptime(date_list[0], "%Y/%m/%d") except: try: return datetime.strptime(date_list[0], "%d/%m/%Y") except Exception as e: print(e) return datetime(1999, 12, 31, 0, 0, 0) try: time_date = datetime.strptime(date_list[0]+date_list[1],"%Y-%m-%d%H:%M:%S") timezone = pytz.timezone(timezone_[date_list[2]]) time_ = timezone.localize(time_date) return time_.astimezone(pytz.UTC) except: try: time_date = datetime.strptime(date_list[0] + date_list[1], "%d/%m/%Y%H:%M:%S") timezone = pytz.timezone(timezone_[date_list[2]]) time_ = timezone.localize(time_date) return time_.astimezone(pytz.UTC) except : try: time_date = datetime.strptime(date_list[0] + date_list[1], "%Y/%m/%d%H:%M:%S") timezone = pytz.timezone(timezone_[date_list[2]]) time_ = timezone.localize(time_date) return time_.astimezone(pytz.UTC) except Exception as e1: print(e1) return datetime(1999,12,31,0,0,0) def update_data(self,df,seller_id,country_code,conn): # 更新-Main conn = SpApiRequest.mysql_connect_auth_lst() cursor = conn.cursor() columns = ['listing-id', 'seller_id', 'asin1', 'seller-sku', 'title', 'image_link', 'country_code', 'marketplace_id', 'quantity', 'fulfillment_channel', 'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type' ] if country_code=='GB': country_code="UK" df['country_code'] = "UK" df_data = pd.DataFrame(columns=columns) delete_list = [] marketplace_id = self.marketplace.marketplace_id try: # DONE 更改查询语句 cursor.execute(f"""select * from asj_ads.seller_listings_dailyUP where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""") col = [i[0] for i in cursor.description] query_rel = cursor.fetchall() df_rel = pd.DataFrame(query_rel, columns=col) df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64') df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64') df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64') df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]') df['quantity'] = df['quantity'].fillna(0).astype('int64') df['price']= df['price'].fillna(0.0).astype('float64') # print(df_rel.dtypes) # print(df[columns].dtypes) row = 0 while row < len(df): temp_df = df.iloc[row, :] listing_id = temp_df['listing-id'] asin = temp_df['asin1'] sku = temp_df['seller-sku'] quantity = temp_df['quantity'] fulfillment_channel = temp_df['fulfillment_channel'] price = temp_df['price'] product_id = temp_df['product-id'] title = temp_df['title'] imageurl = temp_df['image_link'] temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl""") print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1 if len(temp)>1: temp = temp.head(1).to_numpy().tolist() df_data = df_data.append(temp_df, ignore_index=True) delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id)) # print(len(temp)) if len(temp)==0: df_data = df_data.append(temp_df,ignore_index=True) delete_list.append((seller_id,marketplace_id,sku,listing_id,product_id)) row += 1 print("判断不同数据条数",len(delete_list)) print("预计更新数据条数",len(df_data)) try: # print(tuple(delete_list)) if len(delete_list)>0: query = f"""delete from asj_ads.seller_listings_dailyUP where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s""" cursor.execute(query,(delete_list,)) conn.commit() print(delete_list) print("进行中...") except Exception as e: print(e) conn.rollback() return df_data except Exception as e: print("错误:", e) return df @staticmethod def auth_info(): auth_conn = SpApiRequest.mysql_connect_auth() cursor = auth_conn.cursor() cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;") columns_name = [i[0] for i in cursor.description] rel = cursor.fetchall() df = pd.DataFrame(rel, columns=columns_name) return df @classmethod def get_orders_allShops(cls): pass @staticmethod def data_judge_secondTry(sp_api,data_type,seller_id,auth_conn): try: SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn) except: time.sleep(3) SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn) @staticmethod def data_judge(sp_api,data_type,seller_id,auth_conn): if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA": return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn,seller_id) elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL": return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id) elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE": return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(seller_id) else: return "" @classmethod def get_refreshtoken(cls): df = cls.auth_info() refreshtoken_list = (df['refresh_token'].to_numpy().tolist()) return refreshtoken_list @classmethod def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"]): df = cls.auth_info() refreshtoken_list = (df['refresh_token'].to_numpy().tolist()) refreshtoken_list.reverse() for refresh_token in refreshtoken_list: aws_credentials = { 'refresh_token': refresh_token, 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证 'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490', 'aws_access_key': 'AKIARBAGHTGOZC7544GN', 'aws_secret_key': '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', 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE' } single_info = df.query("refresh_token==@refresh_token") region_circle = single_info['region'].values[0] seller_id = single_info['selling_partner_id'].values[0] account_name = single_info['account_name'].values[0] if region_circle == 'NA': pass for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]: sp_api = SpApiRequest(aws_credentials, marketplace) try: auth_conn = SpApiRequest.mysql_connect_auth() cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn) ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id) except Exception as e: print(e) elif region_circle == 'EU': pass for marketplace in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL, Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT, Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR, ]: sp_api = SpApiRequest(aws_credentials, marketplace) try: auth_conn = SpApiRequest.mysql_connect_auth() cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn) ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id) except Exception as e: print(e) else: # if region_circle not in ['NA','EU']: auth_conn = SpApiRequest.mysql_connect_auth() print(region_circle) marketplace = eval(f'Marketplaces.{region_circle}') sp_api = SpApiRequest(aws_credentials, marketplace) cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn) ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id) def timeDeal(self, orgTime): orgTime = parse(orgTime) timezone = pytz.timezone("UTC") shopTime = orgTime.astimezone(timezone) shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute, shopTime.second) return shopTime_datetime def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id): shopReportday = (datetime.now() + timedelta(days=-2)).strftime("%Y-%m-%d") # print(shopReportday) para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA, "dataStartTime": shopReportday, "dataEndTime": shopReportday, } reportid = self.create_report(**para) # {"ShowSalesChannel":"true"} decom_df = self.decompression(reportid) print(decom_df) # print(decom_df.columns) def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self,seller_id): # timezone_ = pytz.timezone(self.timezone) shopReportday = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d") # print(shopReportday) para = {"reportType":ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,"dataStartTime":shopReportday,"dataEndTime":shopReportday,"reportOptions":{"ShowSalesChannel":"true"}} reportid = self.create_report(**para) #{"ShowSalesChannel":"true"} decom_df = self.decompression(reportid) decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0) decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0) decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype('string') if "purchase-order-number" in decom_df.columns: decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string") decom_df.fillna('',inplace=True) # decom_df.to_csv('order.csv') decom_df["ReportDate"] = parse(shopReportday) # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill') decom_df['timezone'] = "UTC" print("==========================================================") decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap( lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x) if 'is-business-order' not in decom_df.columns: decom_df['is-business-order'] = None if 'purchase-order-number' not in decom_df.columns: decom_df['purchase-order-number'] = '-' if 'price-designation' not in decom_df.columns: decom_df['price-designation'] = '-' decom_df['seller_id'] = seller_id country_code = str(self.marketplace)[-2:] if country_code=='GB': country_code="UK" # decom_df['country_code'] = "UK" decom_df['country_code'] = country_code # print(decom_df[]) reserve_columns = ["amazon-order-id","merchant-order-id","purchase-date","last-updated-date","order-status", "fulfillment-channel","sales-channel","order-channel","ship-service-level","product-name", "sku","asin","item-status","quantity","currency","item-price","item-tax","shipping-price", "shipping-tax","gift-wrap-price","gift-wrap-tax","item-promotion-discount", "ship-promotion-discount","ship-city","ship-state","ship-postal-code","ship-country", "promotion-ids","is-business-order","purchase-order-number","price-designation","ReportDate", "timezone","seller_id","country_code" ] list_df = decom_df[reserve_columns].to_numpy().tolist() # print(list_df) # print(list_df[0]) # tuple_data = [tuple(i) for i in list_df] conn = self.mysql_connect_auth_lst() cursor = conn.cursor() # print(list(conn.query("select * from amz_sp_api.orderReport"))) sql = f""" insert into asj_ads.orderReport values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s) """ #ok # print(sql) try: conn.begin() cursor.executemany(sql,list_df) conn.commit() print("插入完成") except Exception as e: conn.rollback() print(e) def func_run(): SpApiRequest.get_allShops("GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL") if __name__ == '__main__': func_run() # sched = BlockingScheduler() # sched.add_job(func_run, 'cron', hour=0, minute=0, # second=30) # sched.start() """ create database amz_sp_api; """ """ create table amz_sp_api.productInfo ( `item-name` VARCHAR(300), `item-description` VARCHAR(1000), `listing-id` VARCHAR(50), `seller-sku` VARCHAR(50), `price` FLOAT, `quantity` INT, `open-date` VARCHAR(70), `image-url` VARCHAR(300), `item-is-marketplace` VARCHAR(50), `product-id-type` INT, `item-note` VARCHAR(300), `item-condition` INT, `asin1` VARCHAR(50), `asin2` VARCHAR(50), `asin3` VARCHAR(50), `will-ship-internationally` VARCHAR(50), `expedited-shipping` VARCHAR(50), `product-id` VARCHAR(50), `bid-for-featured-placement` FLOAT, `add-delete` VARCHAR(50), `pending-quantity` INT, `fulfillment-channel` VARCHAR(50), `merchant-shipping-group` VARCHAR(50), `status` VARCHAR(50), `mainImageUrl` VARCHAR(300), `opendate_date` Date, `updateTime` Date, `timezone` VARCHAR(30) ) """ """ create table amz_sp_api.orderReport (`amazon-order-id` VARCHAR(40), `merchant-order-id` VARCHAR(40), `purchase-date` DATETIME, `last-updated-date` DATETIME, `order-status` VARCHAR(40), `fulfillment-channel` VARCHAR(40), `sales-channel` VARCHAR(40), `order-channel` VARCHAR(40), `ship-service-level` VARCHAR(40), `product-name` VARCHAR(250), `sku` VARCHAR(50), `asin` VARCHAR(40), `item-status` VARCHAR(40), `quantity` INT, `currency` VARCHAR(40), `item-price` FLOAT, `item-tax` FLOAT, `shipping-price` FLOAT, `shipping-tax` FLOAT, `gift-wrap-price` FLOAT, `gift-wrap-tax` FLOAT, `item-promotion-discount` FLOAT, `ship-promotion-discount` FLOAT, `ship-city` VARCHAR(40), `ship-state` VARCHAR(40), `ship-postal-code` VARCHAR(40), `ship-country` VARCHAR(40), `promotion-ids` VARCHAR(50), `cpf` VARCHAR(40), `is-business-order` BOOL, `purchase-order-number` VARCHAR(50), `price-designation` VARCHAR(40), `signature-confirmation-recommended` BOOL, `ReportDate` DATE not null, `timezone` VARCHAR(20) not null ); """