123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804 |
- import clickhouse_connect
- import time
- import warnings
- warnings.filterwarnings('ignore')
- import numpy as np
- from pymysql import Timestamp
- from sp_api.util import throttle_retry, load_all_pages
- from sp_api.api import Orders,ListingsItems,Inventories,Reports,CatalogItems
- from sp_api.base import Marketplaces,ReportType,ProcessingStatus
- import pandas as pd
- from random import shuffle
- import gzip
- from io import BytesIO,StringIO
- from datetime import datetime, timedelta,timezone
- import pytz
- import time
- from dateutil.parser import parse
- import pymysql
- from typing import List, Literal
- from apscheduler.schedulers.blocking import BlockingScheduler
- class SpApiRequest:
- def __init__(self, credentials,marketplace):
- self.credentials = credentials
- self.marketplace = marketplace
- # self.shopInfo = shop_infos('3006125408623189')
- # self.timezone = self.shopInfo['time_zone']
- # self.profileid = '3006125408623189'
- @classmethod
- def mysql_connect_auth(cls):
- conn = pymysql.connect(user="admin",
- password="NSYbBSPbkGQUbOSNOeyy",
- host="retail-data.cnrgrbcygoap.us-east-1.rds.amazonaws.com",
- database="ansjer_dvadmin",
- port=3306)
- return conn
- @classmethod
- def mysql_connect_auth_lst(cls):
- conn = pymysql.connect(user="root",
- password="sandbox",
- host="192.168.1.225",
- database="asj_ads",
- port=3306)
- # conn = pymysql.connect(user="huangyifan",
- # password="123456",
- # host="127.0.0.1",
- # database="amz_sp_api",
- # port=3306)
- return conn
- @classmethod
- def mysql_connect(cls):
- conn = pymysql.connect(user="huangyifan",
- password="123456",
- host="127.0.0.1",
- database="amz_sp_api",
- port=3306)
- return conn
- @classmethod
- def mysql_adTest_connect(cls):
- conn = pymysql.connect(user="root",
- password="sandbox",
- host="192.168.1.225",
- database="asj_ads",
- port=3306)
- return conn
- @classmethod
- def get_catelog(cls,account_name,country=Marketplaces.US,asin=None):
- if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
- region = 'NA'
- elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL,
- Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
- Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
- ]:
- region = 'EU'
- else:
- region = str(country)[-2:]
- df = cls.auth_info()
- try:
- refresh_token = df.query("account_name==@account_name and region==@region")['refresh_token'].values[0]
- except:
- print("请输入正确的account name与Marketplace")
- return '获取失败'
- cred = {
- 'refresh_token': refresh_token,
- 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证
- 'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
- 'aws_access_key': 'AKIARBAGHTGOZC7544GN',
- 'aws_secret_key': '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',
- 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
- }
- cate_item = CatalogItems(credentials=cred, marketplace=country)
- images_info = cate_item.get_catalog_item(asin=asin,**{"includedData":['images']})
- images = images_info.images[0].get('images')[0]['link']
- title_info = cate_item.get_catalog_item(asin=asin)
- title = title_info.payload['summaries'][0]['itemName']
- return {'images':images,'title':title}
- def create_report(self,**kwargs): # 创建报告
- reportType = kwargs['reportType']
- reportOptions =kwargs.get("reportOptions")
- # give a time period
- dataStartTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataStartTime") is None else kwargs.get("dataStartTime")+"T00:00:00"
- dataEndTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataEndTime") is None else kwargs.get("dataEndTime")+"T23:59:59"
- report = Reports(credentials=self.credentials, marketplace=self.marketplace)
- rel = report.create_report(
- reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id],
- reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime
- )
- reportId = rel.payload.get("reportId")
- # print(reportId)
- return reportId
- def decompression(self,reportId): #解压生成的报告
- report = Reports(credentials=self.credentials, marketplace=self.marketplace)
- while True:
- reportId_info = report.get_report(reportId=reportId)
- # print(reportId_info.payload)
- print("please wait...")
- if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
- reportDocumentId = reportId_info.payload.get("reportDocumentId")
- rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
- print(rp_table)
- if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
- df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
- return df
- elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
- df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'},
- encoding='Shift-JIS')
- # df.columns =
- return df
- elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
- df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1')
- return df
- elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
- df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS')
- return df
- elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
- print(reportId_info)
- print("取消或失败")
- break
- time.sleep(15)
- print("please wait...")
- def data_deal(self,decom_df,seller_id): # 数据处理-for listing SUB
- decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
- url_columns = [i for i in decom_df.columns if "url" in i.lower()]
- if len(url_columns) > 0:
- decom_df[url_columns] = decom_df[url_columns].astype("string")
- asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()]
- if len(asin_columns) > 0:
- decom_df[asin_columns] = decom_df[asin_columns].astype("string")
- if 'pending-quantity' in decom_df.columns:
- decom_df['pending-quantity'] = decom_df['pending-quantity'].map(
- lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32")
- deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()]
- decom_df.drop(columns=deletecolumns, inplace=True)
- if 'quantity' in decom_df.columns:
- decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype(
- "int32")
- decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x))
- if 'add-delete' in decom_df.columns:
- decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore')
- if 'will-ship-internationally' in decom_df.columns:
- decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',errors='ignore')
- if 'expedited-shipping' in decom_df.columns:
- decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string',errors='ignore')
- decom_df['updateTime'] = datetime.now()
- decom_df['timezone'] = "UTC"
- decom_df['seller_id'] = seller_id
- #
- decom_df['item-description'] = decom_df['item-description'].str.slice(0,500)
- decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
- decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
- decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
- 'string')
- decom_df.fillna('', inplace=True)
- # print(decom_df.info())
- return decom_df
- def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # 获取listing信息 SUB
- start = time.time()
- para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
- reportid = self.create_report(**para)
- decom_df = self.decompression(reportid)
- print("连接数据库")
- conn = self.mysql_connect()
- print("连接成功")
- cursor = conn.cursor()
- timezone = "UTC" #pytz.timezone(self.timezone)
- bondary_date = (datetime.now()).strftime("%Y-%m-%d") #+ timedelta(days=-28)
- cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and
- (`seller-sku` not in ('',' ') and `seller-sku` is not null) and
- `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl`
- col = [i[0] for i in cursor.description]
- query_rel = cursor.fetchall()
- if len(query_rel)!=0:
- print(query_rel[0])
- df = pd.DataFrame(query_rel,columns=col)
- listingid = df['listing-id'].to_numpy().tolist()
- decom_df = decom_df.query("`listing-id` not in @listingid")
- print("数据条数: ",len(decom_df))
- # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}")
- # conn.commit()
- if len(decom_df)==0:
- return "Done"
- if limit != None:
- decom_df = decom_df.iloc[:limit,:]
- print("getting mainImageInfo...")
- rowcount = 0
- while rowcount < len(decom_df):
- df_insert = decom_df.copy()
- df_insert = df_insert.iloc[rowcount:rowcount + 200, :]
- df_insert = self.data_deal(df_insert)
- list_df = df_insert.to_numpy().tolist()
- # print(list(conn.query("select * from amz_sp_api.orderReport")))
- sql = f"""
- insert into amz_sp_api.productInfo
- values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
- """ #ok
- # print(sql)
- conn = self.mysql_connect()
- cursor = conn.cursor()
- try:
- conn.begin()
- cursor.executemany(sql, list_df)
- conn.commit()
- print("插入中...")
- insert_listingid = df_insert['listing-id'].to_numpy().tolist()
- cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
- conn.commit()
- rowcount += 200
- except Exception as e:
- conn.rollback()
- print(e)
- try:
- conn = self.mysql_connect()
- cursor = conn.cursor()
- conn.begin()
- cursor.executemany(sql, list_df)
- conn.commit()
- insert_listingid = df_insert['listing-id'].to_numpy().tolist()
- cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
- conn.commit()
- except Exception as e:
- conn.rollback()
- print(e)
- break
- # break
- conn.close()
- print("全部完成")
- end =time.time()
- print("duration:",end-start)
- return decom_df
- def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,conn=None,seller_id=None): # MAIN
- para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
- reportid = self.create_report(**para)
- df = self.decompression(reportid)
- if len(df)>0:
- if self.marketplace.marketplace_id =='A1VC38T7YXB528':
- df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description',
- 'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status']
- df['seller_id'] = seller_id
- df['marketplace_id'] = self.marketplace.marketplace_id
- df['country_code'] = str(self.marketplace)[-2:]
- if 'fulfilment-channel' in df.columns:
- print("changed fulfilment-channel:")
- print(seller_id,self.marketplace)
- df['fulfillment-channel'] = df['fulfilment-channel'].copy()
- df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
- df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x)
- if 'asin1' not in df.columns:
- df['asin1'] = ''
- if 'product-id' not in df.columns:
- df['product-id'] = ''
- # 空值处理
- df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
- df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
- df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
- df.fillna('',inplace=True)
- # 时间处理
- df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x))
- df['update_datetime'] = datetime.now(pytz.UTC).date()
- origin_columns = ['listing-id','seller_id',
- 'asin1','seller-sku','title','image_link','country_code',
- 'marketplace_id','quantity','fulfillment_channel',
- 'price','opendate','status','update_datetime','product-id','product-id-type'
- ]
- # DONE 连数据库
- conn = SpApiRequest.mysql_connect_auth_lst()
- cursor = conn.cursor()
- # DONE 修改查询条件
- cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings_dailyUP where asin is not null
- and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
- query_ = cursor.fetchall()
- col_name = [i[0] for i in cursor.description]
- df_datatable = pd.DataFrame(query_, columns=col_name)
- merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
- print(merged_df.head())
- def func_(asin,asin1,product_id,cred,market_p,seller_id,sku): # 创建功能函数
- if 'B0' in str(product_id)[:3]:
- return str(product_id)
- if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''):
- if 'B0' in asin[:3]:
- return asin
- elif (pd.isna(asin1)==False and asin1!=''):
- if 'B0' in asin1[:3]:
- return asin1
- listingClient = ListingsItems(credentials=cred, marketplace=market_p) # 获取listing 信息
- try:
- r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- print(r1.payload)
- asin = r1.payload.get("summaries")[0].get("asin")
- return asin
- except Exception as e:
- print("获取图片url过程错误重试, 错误message: ", e)
- time.sleep(3)
- r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- print(r1.payload)
- asin = r1.payload.get("summaries")[0].get("asin")
- return asin
- # 应用自定函数
- merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
- print("获取listing Info...")
- merged_df['temp_columns'] = merged_df.apply(lambda x: self.get_listing_info(x['seller-sku'],seller_id),axis=1)
- merged_df[['image_link','title']] = merged_df['temp_columns'].str.split("-----",expand=True)
- # merged_df['image_link'] = ''
- # merged_df['title'] = ''
- merged_df.fillna('',inplace=True)
- df1 = merged_df.copy()
- print(df1[origin_columns].head(1))
- # DONE 更新数据
- update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
- if len(update_df)==0:
- return '无更新数据插入'
- # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"})
- # DONE 连接数据库
- conn = SpApiRequest.mysql_connect_auth_lst()
- cursor = conn.cursor()
- try:
- insertsql = """insert into
- asj_ads.seller_listings_dailyUP(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity,
- fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type)
- values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- conn.begin()
- cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
- conn.commit()
- print("插入完成")
- return '插入完成'
- except Exception as e:
- print("插入错误:",e)
- conn.rollback()
- return '出错回滚'
- def get_listing_info(self, sku,seller_id): # MAIN—GET LISTING
- listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace)
- try:
- r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- # print(r1.payload)
- json_content = r1.payload.get("summaries")[0]
- item_name = json_content.get("itemName")
- item_name ='###' if item_name==None else item_name
- img = json_content.get("mainImage")
- img_url = '###' if img is None else img.get("link")
- # print(str(img_url)+"-----"+ str(item_name))
- return str(img_url)+"-----"+ str(item_name)
- except Exception as e:
- try:
- print("获取图片url过程错误重试, 错误message: ",e)
- time.sleep(3)
- r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- print(r1.payload)
- json_content = r1.payload.get("summaries")[0]
- item_name = json_content.get("itemName")
- item_name = '###' if item_name == None else item_name
- img = json_content.get("mainImage")
- img_url = '###' if img is None else img.get("link")
- return str(img_url)+"-----"+ str(item_name)
- except Exception as e:
- print(e)
- return "###-----###"
- def datetime_deal(self,timestring): #时间处理-Main
- timezone_ = {"AEST":"Australia/Sydney",
- "AEDT":"Australia/Sydney",
- "PST":"America/Los_Angeles",
- "PDT":"America/Los_Angeles",
- "CST":"America/Chicago",
- "CDT":"America/Chicago",
- "MET":"MET",
- "MEST":"MET",
- "BST":"Europe/London",
- "GMT":"GMT",
- "CET":"CET",
- "CEST":"CET",
- "JST":"Asia/Tokyo",
- "BRT":"America/Sao_Paulo"}
- date_list = str.split(timestring,sep = ' ')
- if len(date_list)<3:
- try:
- return datetime.strptime(date_list[0],"%Y-%m-%d")
- except:
- try:
- return datetime.strptime(date_list[0], "%Y/%m/%d")
- except:
- try:
- return datetime.strptime(date_list[0], "%d/%m/%Y")
- except Exception as e:
- print(e)
- return datetime(1999, 12, 31, 0, 0, 0)
- try:
- time_date = datetime.strptime(date_list[0]+date_list[1],"%Y-%m-%d%H:%M:%S")
- timezone = pytz.timezone(timezone_[date_list[2]])
- time_ = timezone.localize(time_date)
- return time_.astimezone(pytz.UTC)
- except:
- try:
- time_date = datetime.strptime(date_list[0] + date_list[1], "%d/%m/%Y%H:%M:%S")
- timezone = pytz.timezone(timezone_[date_list[2]])
- time_ = timezone.localize(time_date)
- return time_.astimezone(pytz.UTC)
- except :
- try:
- time_date = datetime.strptime(date_list[0] + date_list[1], "%Y/%m/%d%H:%M:%S")
- timezone = pytz.timezone(timezone_[date_list[2]])
- time_ = timezone.localize(time_date)
- return time_.astimezone(pytz.UTC)
- except Exception as e1:
- print(e1)
- return datetime(1999,12,31,0,0,0)
- def update_data(self,df,seller_id,country_code,conn): # 更新-Main
- conn = SpApiRequest.mysql_connect_auth_lst()
- cursor = conn.cursor()
- columns = ['listing-id', 'seller_id',
- 'asin1', 'seller-sku', 'title', 'image_link', 'country_code',
- 'marketplace_id', 'quantity', 'fulfillment_channel',
- 'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type'
- ]
- if country_code=='GB':
- country_code="UK"
- df['country_code'] = "UK"
- df_data = pd.DataFrame(columns=columns)
- delete_list = []
- marketplace_id = self.marketplace.marketplace_id
- try:
- # DONE 更改查询语句
- cursor.execute(f"""select * from
- asj_ads.seller_listings_dailyUP where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""")
- col = [i[0] for i in cursor.description]
- query_rel = cursor.fetchall()
- df_rel = pd.DataFrame(query_rel, columns=col)
- df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64')
- df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64')
- df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64')
- df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]')
- df['quantity'] = df['quantity'].fillna(0).astype('int64')
- df['price']= df['price'].fillna(0.0).astype('float64')
- # print(df_rel.dtypes)
- # print(df[columns].dtypes)
- row = 0
- while row < len(df):
- temp_df = df.iloc[row, :]
- listing_id = temp_df['listing-id']
- asin = temp_df['asin1']
- sku = temp_df['seller-sku']
- quantity = temp_df['quantity']
- fulfillment_channel = temp_df['fulfillment_channel']
- price = temp_df['price']
- product_id = temp_df['product-id']
- title = temp_df['title']
- imageurl = temp_df['image_link']
- temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl""")
- print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
- df_data = df_data.append(temp_df, ignore_index=True)
- delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id))
- row += 1
- print("判断不同数据条数",len(delete_list))
- print("预计更新数据条数",len(df_data))
- try:
- # print(tuple(delete_list))
- if len(delete_list)>0:
- query = f"""delete from asj_ads.seller_listings_dailyUP
- where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s"""
- cursor.execute(query,(delete_list,))
- conn.commit()
- print(delete_list)
- print("进行中...")
- except Exception as e:
- print(e)
- conn.rollback()
- return df_data
- except Exception as e:
- print("错误:", e)
- return df
- @staticmethod
- def auth_info():
- auth_conn = SpApiRequest.mysql_connect_auth()
- cursor = auth_conn.cursor()
- cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
- columns_name = [i[0] for i in cursor.description]
- rel = cursor.fetchall()
- df = pd.DataFrame(rel, columns=columns_name)
- return df
- @classmethod
- def get_orders_allShops(cls):
- pass
- @staticmethod
- def data_judge_secondTry(sp_api,data_type,seller_id,auth_conn):
- try:
- SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
- except:
- time.sleep(3)
- SpApiRequest.data_judge(sp_api, data_type, seller_id, auth_conn)
- @staticmethod
- def data_judge(sp_api,data_type,seller_id,auth_conn):
- if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA":
- return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn,seller_id)
- elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL":
- return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id)
- elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE":
- return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(seller_id)
- else:
- return ""
- @classmethod
- def get_refreshtoken(cls):
- df = cls.auth_info()
- refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
- return refreshtoken_list
- @classmethod
- def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"]):
- df = cls.auth_info()
- refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
- shuffle(refreshtoken_list)
- for refresh_token in refreshtoken_list:
- aws_credentials = {
- 'refresh_token': refresh_token,
- 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证
- 'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
- 'aws_access_key': 'AKIARBAGHTGOZC7544GN',
- 'aws_secret_key': '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',
- 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
- }
- single_info = df.query("refresh_token==@refresh_token")
- region_circle = single_info['region'].values[0]
- seller_id = single_info['selling_partner_id'].values[0]
- account_name = single_info['account_name'].values[0]
- if region_circle == 'NA':
- pass
- for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
- sp_api = SpApiRequest(aws_credentials, marketplace)
- try:
- auth_conn = SpApiRequest.mysql_connect_auth()
- cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
- ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
- except Exception as e:
- print(e)
- elif region_circle == 'EU':
- pass
- for marketplace in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL,
- Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
- Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
- ]:
- sp_api = SpApiRequest(aws_credentials, marketplace)
- try:
- auth_conn = SpApiRequest.mysql_connect_auth()
- cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
- ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
- except Exception as e:
- print(e)
- else:
- # if region_circle not in ['NA','EU']:
- auth_conn = SpApiRequest.mysql_connect_auth()
- print(region_circle)
- marketplace = eval(f'Marketplaces.{region_circle}')
- sp_api = SpApiRequest(aws_credentials, marketplace)
- cls.data_judge_secondTry(sp_api, data_type, seller_id, auth_conn)
- ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
- def timeDeal(self, orgTime):
- orgTime = parse(orgTime)
- timezone = pytz.timezone("UTC")
- shopTime = orgTime.astimezone(timezone)
- shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute,
- shopTime.second)
- return shopTime_datetime
- def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id):
- shopReportday = (datetime.now() + timedelta(days=-2)).strftime("%Y-%m-%d")
- # print(shopReportday)
- para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
- "dataStartTime": shopReportday, "dataEndTime": shopReportday,
- }
- reportid = self.create_report(**para) # {"ShowSalesChannel":"true"}
- decom_df = self.decompression(reportid)
- print(decom_df)
- # print(decom_df.columns)
- def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self,seller_id):
- # timezone_ = pytz.timezone(self.timezone)
- shopReportday = (datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d")
- # print(shopReportday)
- para = {"reportType":ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,"dataStartTime":shopReportday,"dataEndTime":shopReportday,"reportOptions":{"ShowSalesChannel":"true"}}
- reportid = self.create_report(**para) #{"ShowSalesChannel":"true"}
- decom_df = self.decompression(reportid)
- decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
- decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
- decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype('string')
- if "purchase-order-number" in decom_df.columns:
- decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string")
- decom_df.fillna('',inplace=True)
- # decom_df.to_csv('order.csv')
- decom_df["ReportDate"] = parse(shopReportday)
- # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
- decom_df['timezone'] = "UTC"
- print("==========================================================")
- decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap(
- lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x)
- if 'is-business-order' not in decom_df.columns:
- decom_df['is-business-order'] = None
- if 'purchase-order-number' not in decom_df.columns:
- decom_df['purchase-order-number'] = '-'
- if 'price-designation' not in decom_df.columns:
- decom_df['price-designation'] = '-'
- decom_df['seller_id'] = seller_id
- country_code = str(self.marketplace)[-2:]
- if country_code=='GB':
- country_code="UK"
- # decom_df['country_code'] = "UK"
- decom_df['country_code'] = country_code
- # print(decom_df[])
- reserve_columns = ["amazon-order-id","merchant-order-id","purchase-date","last-updated-date","order-status",
- "fulfillment-channel","sales-channel","order-channel","ship-service-level","product-name",
- "sku","asin","item-status","quantity","currency","item-price","item-tax","shipping-price",
- "shipping-tax","gift-wrap-price","gift-wrap-tax","item-promotion-discount",
- "ship-promotion-discount","ship-city","ship-state","ship-postal-code","ship-country",
- "promotion-ids","is-business-order","purchase-order-number","price-designation","ReportDate",
- "timezone","seller_id","country_code"
- ]
- list_df = decom_df[reserve_columns].to_numpy().tolist()
- # print(list_df)
- # print(list_df[0])
- # tuple_data = [tuple(i) for i in list_df]
- conn = self.mysql_connect()
- cursor = conn.cursor()
- # print(list(conn.query("select * from amz_sp_api.orderReport")))
- sql = f"""
- insert into amz_sp_api.orderreport_renew1
- values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s)
- """ #ok
- # print(sql)
- try:
- conn.begin()
- cursor.executemany(sql,list_df)
- conn.commit()
- print("插入完成")
- except Exception as e:
- conn.rollback()
- print(e)
- def func_run():
- SpApiRequest.get_allShops("GET_FLAT_FILE_OPEN_LISTINGS_DATA")
- # func_run()
- if __name__ == '__main__':
- sched = BlockingScheduler()
- sched.add_job(func_run,'cron',hour=1,minute=30,second=0)
- sched.start()
- """
- create database amz_sp_api;
- """
- """
- create table amz_sp_api.productInfo
- (
- `item-name` VARCHAR(300),
- `item-description` VARCHAR(1000),
- `listing-id` VARCHAR(50),
- `seller-sku` VARCHAR(50),
- `price` FLOAT,
- `quantity` INT,
- `open-date` VARCHAR(70),
- `image-url` VARCHAR(300),
- `item-is-marketplace` VARCHAR(50),
- `product-id-type` INT,
- `item-note` VARCHAR(300),
- `item-condition` INT,
- `asin1` VARCHAR(50),
- `asin2` VARCHAR(50),
- `asin3` VARCHAR(50),
- `will-ship-internationally` VARCHAR(50),
- `expedited-shipping` VARCHAR(50),
- `product-id` VARCHAR(50),
- `bid-for-featured-placement` FLOAT,
- `add-delete` VARCHAR(50),
- `pending-quantity` INT,
- `fulfillment-channel` VARCHAR(50),
- `merchant-shipping-group` VARCHAR(50),
- `status` VARCHAR(50),
- `mainImageUrl` VARCHAR(300),
- `opendate_date` Date,
- `updateTime` Date,
- `timezone` VARCHAR(30)
- )
- """
- """
- create table amz_sp_api.orderReport
- (`amazon-order-id` VARCHAR(40),
- `merchant-order-id` VARCHAR(40),
- `purchase-date` DATETIME,
- `last-updated-date` DATETIME,
- `order-status` VARCHAR(40),
- `fulfillment-channel` VARCHAR(40),
- `sales-channel` VARCHAR(40),
- `order-channel` VARCHAR(40),
- `ship-service-level` VARCHAR(40),
- `product-name` VARCHAR(250),
- `sku` VARCHAR(50),
- `asin` VARCHAR(40),
- `item-status` VARCHAR(40),
- `quantity` INT,
- `currency` VARCHAR(40),
- `item-price` FLOAT,
- `item-tax` FLOAT,
- `shipping-price` FLOAT,
- `shipping-tax` FLOAT,
- `gift-wrap-price` FLOAT,
- `gift-wrap-tax` FLOAT,
- `item-promotion-discount` FLOAT,
- `ship-promotion-discount` FLOAT,
- `ship-city` VARCHAR(40),
- `ship-state` VARCHAR(40),
- `ship-postal-code` VARCHAR(40),
- `ship-country` VARCHAR(40),
- `promotion-ids` VARCHAR(50),
- `cpf` VARCHAR(40),
- `is-business-order` BOOL,
- `purchase-order-number` VARCHAR(50),
- `price-designation` VARCHAR(40),
- `signature-confirmation-recommended` BOOL,
- `ReportDate` DATE not null,
- `timezone` VARCHAR(20) not null
- );
- """
|