12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685 |
- import clickhouse_connect
- import time
- import warnings
- warnings.filterwarnings('ignore')
- import numpy as np
- from pymysql import Timestamp
- from sp_api.util import throttle_retry, load_all_pages
- from sp_api.api import Orders,ListingsItems,Inventories,Reports,CatalogItems,Products
- from sp_api.base import Marketplaces,ReportType,ProcessingStatus
- import pandas as pd
- import gzip
- from io import BytesIO,StringIO
- from datetime import datetime, timedelta,timezone
- import pytz
- import time
- from dateutil.parser import parse
- import pymysql
- from typing import List, Literal
- from random import shuffle
- from retry import retry
- import requests
- from urllib import request
- import json
- from clickhouse_connect import get_client
- try:
- from ..settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF
- except:
- from sync_amz_data.sync_amz_data.settings import MYSQL_AUTH_CONF, MYSQL_DATA_CONF
- class SpApiRequest:
- def __init__(self, credentials,marketplace):
- self.credentials = credentials
- self.marketplace = marketplace
- @classmethod
- @retry(tries=3, delay=5, backoff=2, )
- def Token_auth(cls): # AUTH - mysql_connect_auth
- conn = pymysql.connect(**MYSQL_AUTH_CONF)
- return conn
- @classmethod
- @retry(tries=3, delay=5, backoff=2, )
- def Data_auth(cls): # DATA - mysql_connect_auth_lst
- conn = pymysql.connect(**MYSQL_DATA_CONF)
- return conn
- @classmethod
- @retry(tries=3, delay=5, backoff=2, )
- def LocalHost_Auth(cls): # local database-
- conn = pymysql.connect(user="huangyifan",
- password="123456",
- host="127.0.0.1",
- database="amz_sp_api",
- port=3306)
- return conn
- @staticmethod
- @retry(tries=3, delay=5, backoff=2, )
- def auth_info(): # get Auth-data from all of shops - 此连接获取refreshtoken供账户授权使用
- auth_conn = SpApiRequest.Token_auth()
- cursor = auth_conn.cursor()
- cursor.execute("select * from amazon_sp_report.amazon_sp_auth_info;")
- columns_name = [i[0] for i in cursor.description]
- rel = cursor.fetchall()
- df = pd.DataFrame(rel, columns=columns_name)
- return df
- @classmethod
- @retry(tries=3, delay=5, backoff=2, )
- def get_refreshtoken(cls): # accroding to differnt shop get diffrent refreshtoken - 获取授权后拿到refreshtoken
- df = cls.auth_info()
- refreshtoken_list = (df['refresh_token'].to_numpy().tolist())
- return refreshtoken_list
- @classmethod
- def get_catelog(cls,account_name,country=Marketplaces.US,asin=None): # active - 可以使用但未进行使用
- if country in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
- region = 'NA'
- elif country in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL,
- Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
- Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
- ]:
- region = 'EU'
- else:
- region = str(country)[-2:]
- df = cls.auth_info()
- try:
- refresh_token = df.query("account_name==@account_name and region==@region")['refresh_token'].values[0]
- except:
- print("请输入正确的account name与Marketplace")
- return '获取失败'
- cred = {
- 'refresh_token': refresh_token,
- 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证
- 'lwa_client_secret': 'amzn1.oa2-cs.v1.3af0f5649f5b8e151cd5bd25c10f2bf3113172485cd6ffc52ccc6a5e8512b490',
- 'aws_access_key': 'AKIARBAGHTGOZC7544GN',
- 'aws_secret_key': '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',
- 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
- }
- # cate_item = CatalogItems(credentials=cred, marketplace=country)
- # images_info = cate_item.get_catalog_item(asin=asin,**{"includedData":['images']})
- # print(images_info)
- # images = images_info.images[0].get('images')[0]['link']
- # title_info = cate_item.get_catalog_item(asin=asin)
- # print(title_info)
- # title = title_info.payload['summaries'][0]['itemName']
- cate_item = CatalogItems(credentials=cred, marketplace=country,version='2022-04-01')
- test_bundle = cate_item.get_catalog_item(asin=asin,**{"includedData":['salesRanks']})
- print(test_bundle)
- # return {'images':images,'title':title}
- @retry(tries=3, delay=5, backoff=2,)
- def create_report(self,**kwargs): # Main-CreateReport-Function - 创建报告请求
- reportType = kwargs['reportType']
- reportOptions =kwargs.get("reportOptions")
- dataStartTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataStartTime") is None else kwargs.get("dataStartTime")+"T00:00:00"
- dataEndTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if kwargs.get("dataEndTime") is None else kwargs.get("dataEndTime")+"T23:59:59"
- report = Reports(credentials=self.credentials, marketplace=self.marketplace)
- rel = report.create_report(
- reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id],
- reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime
- )
- reportId = rel.payload.get("reportId")
- # print(reportId)
- return reportId
- # @retry(tries=2, delay=3, backoff=2,)
- def decompression(self,reportId): # After-CreateReportFunc-simpleDeal - 根据获取到的报告id进行解压获取
- report = Reports(credentials=self.credentials, marketplace=self.marketplace)
- while True:
- reportId_info = report.get_report(reportId=reportId)
- # print(reportId_info.payload)
- print("please wait...")
- if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
- reportDocumentId = reportId_info.payload.get("reportDocumentId")
- rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
- print(rp_table,rp_table.payload.get("url"))
- if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
- try:
- df = pd.read_table(filepath_or_buffer=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
- except:
- df = pd.read_csv(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'},
- encoding='iso-8859-1')
- return df
- elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
- df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'},
- encoding='Shift-JIS')
- # df.columns =
- return df
- elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
- df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1')
- print(df.columns)
- return df
- elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
- try:
- df = pd.read_table(rp_table.payload.get("url"),encoding='Shift-JIS')
- print(df.columns)
- return df
- except:
- print("==========!!!!!!!!")
- try:
- df = pd.read_table(rp_table.payload.get("url"),encoding='iso-8859-1')
- print(df.columns)
- return df
- except:
- print("!!!!!!!!!!!!!!!!!==========")
- try:
- df = pd.read_table(rp_table.payload.get("url"),encoding='utf-8')
- return df
- except:
- print("失败")
- return pd.DataFrame()
- elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
- try:
- print(reportId_info)
- reportDocumentId = reportId_info.payload.get("reportDocumentId")
- rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=True)
- print("取消或失败",rp_table)
- return pd.DataFrame()
- except Exception as e:
- print(e)
- return pd.DataFrame()
- time.sleep(15)
- print("please wait...")
- def decompression2(self,reportId): # After-CreateReportFunc-simpleDeal - 根据获取到的报告id进行解压获取
- report = Reports(credentials=self.credentials, marketplace=self.marketplace)
- while True:
- reportId_info = report.get_report(reportId=reportId)
- # print(reportId_info.payload)
- print("please wait...")
- if reportId_info.payload.get("processingStatus")==ProcessingStatus.DONE:
- reportDocumentId = reportId_info.payload.get("reportDocumentId")
- rp_table = report.get_report_document(reportDocumentId=reportDocumentId,download=False)
- print(rp_table)
- if rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:#
- # df = pd.read_json(path_or_buf=rp_table.payload['url'],compression={"method":'gzip'},encoding='iso-8859-1')
- response = request.urlopen(rp_table.payload['url'])
- response.encoding='iso-8859-1'
- df = json.loads(response.read().decode())
- # pd.json_normalize()
- return df
- elif rp_table.payload.get('compressionAlgorithm') is not None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
- df = pd.read_json(path_or_buf=rp_table.payload['url'], compression={"method": 'gzip'},
- encoding='Shift-JIS')
- # df.columns =
- return df
- elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id not in ['A1VC38T7YXB528']:
- df = pd.read_json(path_or_buf=rp_table.payload.get("url"),encoding='iso-8859-1')
- return df
- elif rp_table.payload.get('compressionAlgorithm') is None and self.marketplace.marketplace_id in ['A1VC38T7YXB528']:
- df = pd.read_json(path_or_buf=rp_table.payload.get("url"),encoding='Shift-JIS')
- return df
- elif reportId_info.payload.get("processingStatus") in [ProcessingStatus.CANCELLED,ProcessingStatus.FATAL]:
- print(reportId_info)
- reportDocumentId = reportId_info.payload.get("reportDocumentId")
- rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=True)
- print("取消或失败",rp_table)
- return pd.DataFrame()
- time.sleep(15)
- print("please wait...")
- # def GET_MERCHANT_LISTINGS_ALL_DATA(self,limit=None): # Not be used
- # start = time.time()
- # para = {"reportType":ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
- # reportid = self.create_report(**para)
- # decom_df = self.decompression(reportid)
- # print("连接数据库")
- # conn = self.LocalHost_Auth()
- # print("连接成功")
- # cursor = conn.cursor()
- # timezone = "UTC" #pytz.timezone(self.timezone)
- # bondary_date = (datetime.now()).strftime("%Y-%m-%d") #+ timedelta(days=-28)
- # cursor.execute(f"""select * from amz_sp_api.productInfo where (mainImageUrl is not null and mainImageUrl not in ('', ' ')) and
- # (`seller-sku` not in ('',' ') and `seller-sku` is not null) and
- # `updateTime`>='{bondary_date}'""") #`seller-sku`,`updateTime`,`mainImageUrl`
- # col = [i[0] for i in cursor.description]
- # query_rel = cursor.fetchall()
- #
- # if len(query_rel)!=0:
- # print(query_rel[0])
- # df = pd.DataFrame(query_rel,columns=col)
- # listingid = df['listing-id'].to_numpy().tolist()
- # decom_df = decom_df.query("`listing-id` not in @listingid")
- # print("数据条数: ",len(decom_df))
- # # print(f"delete * from amz_sp_api.productInfo where `listing-id` not in {tuple(listingid)}")
- #
- # # conn.commit()
- #
- # if len(decom_df)==0:
- # return "Done"
- #
- # if limit != None:
- # decom_df = decom_df.iloc[:limit,:]
- # print("getting mainImageInfo...")
- # rowcount = 0
- # while rowcount < len(decom_df):
- # df_insert = decom_df.copy()
- # df_insert = df_insert.iloc[rowcount:rowcount + 200, :]
- #
- # df_insert = self.data_deal(df_insert)
- # list_df = df_insert.to_numpy().tolist()
- #
- # # print(list(conn.query("select * from amz_sp_api.orderReport")))
- # sql = f"""
- # insert into amz_sp_api.productInfo
- # values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
- # """ #ok
- # # print(sql)
- # conn = self.LocalHost_Auth()
- # cursor = conn.cursor()
- # try:
- # conn.begin()
- # cursor.executemany(sql, list_df)
- # conn.commit()
- # print("插入中...")
- # insert_listingid = df_insert['listing-id'].to_numpy().tolist()
- # cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
- # conn.commit()
- # rowcount += 200
- # except Exception as e:
- # conn.rollback()
- # print(e)
- # try:
- # conn = self.LocalHost_Auth()
- # cursor = conn.cursor()
- # conn.begin()
- # cursor.executemany(sql, list_df)
- # conn.commit()
- # insert_listingid = df_insert['listing-id'].to_numpy().tolist()
- # cursor.execute(f"delete from amz_sp_api.productInfo where `listing-id` not in {tuple(insert_listingid)} and `updateTime`<'{bondary_date}'")
- # conn.commit()
- # except Exception as e:
- # conn.rollback()
- # print(e)
- # break
- # # break
- # conn.close()
- # print("全部完成")
- # end =time.time()
- # print("duration:",end-start)
- # return decom_df
- # def data_deal(self, decom_df, seller_id): # desprecated
- # decom_df['mainImageUrl'] = decom_df['seller-sku'].map(lambda x: self.get_mainImage_url(x))
- # url_columns = [i for i in decom_df.columns if "url" in i.lower()]
- # if len(url_columns) > 0:
- # decom_df[url_columns] = decom_df[url_columns].astype("string")
- # asin_columns = [i for i in decom_df.columns if 'asin' in i.lower()]
- # if len(asin_columns) > 0:
- # decom_df[asin_columns] = decom_df[asin_columns].astype("string")
- # if 'pending-quantity' in decom_df.columns:
- # decom_df['pending-quantity'] = decom_df['pending-quantity'].map(
- # lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype("int32")
- # deletecolumns = [i for i in decom_df.columns if 'zshop' in i.lower()]
- # decom_df.drop(columns=deletecolumns, inplace=True)
- # if 'quantity' in decom_df.columns:
- # decom_df['quantity'] = decom_df['quantity'].map(lambda x: 0 if pd.isna(x) or np.isinf(x) else x).astype(
- # "int32")
- # decom_df['opendate_date'] = decom_df['open-date'].map(lambda x: self.datetime_deal(x))
- # if 'add-delete' in decom_df.columns:
- # decom_df['add-delete'] = decom_df['add-delete'].astype('string', errors='ignore')
- # if 'will-ship-internationally' in decom_df.columns:
- # decom_df['will-ship-internationally'] = decom_df['will-ship-internationally'].astype('string',
- # errors='ignore')
- # if 'expedited-shipping' in decom_df.columns:
- # decom_df['expedited-shipping'] = decom_df['expedited-shipping'].astype('string', errors='ignore')
- # decom_df['updateTime'] = datetime.now()
- # decom_df['timezone'] = "UTC"
- # decom_df['seller_id'] = seller_id
- # #
- # decom_df['item-description'] = decom_df['item-description'].str.slice(0, 500)
- # decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
- # decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
- # decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
- # 'string')
- # decom_df.fillna('', inplace=True)
- # # print(decom_df.info())
- # return decom_df
- def fba_inventorySQL(self,conn,seller_id):
- cursor = conn.cursor()
- # 执行语句
- cursor.execute(f""" select asin,sku,seller_id,marketplace_id,country_code,quantity,fulfillment_channel
- from
- (select asin,sku,seller_id,marketplace_id,country_code,quantity,fulfillment_channel,
- ROW_NUMBER() over(partition by asin,sku,seller_id,marketplace_id,country_code order by update_datetime desc) as row_
- from asj_ads.seller_listings) as t
- where row_=1 and seller_id='{seller_id}' and marketplace_id='{self.marketplace.marketplace_id}'
- and fulfillment_channel='FBA'
- """)#,quantity
- query_ = cursor.fetchall()
- col_name = [i[0] for i in cursor.description]
- df_datatable = pd.DataFrame(query_, columns=col_name)
- df_datatable.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_',
- 'afn-fulfillable-quantity']
- return df_datatable
- def get_fba_neccessary_segment(self,conn,seller_id):
- para = {"reportType": ReportType.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA}
- reportid = self.create_report(**para)
- df = self.decompression(reportid)
- if len(df) == 0:
- return self.fba_inventorySQL(conn, seller_id)
- # pd.DataFrame()
- df['seller_id'] = seller_id
- df['marketplace_id'] = self.marketplace.marketplace_id
- df['country_code'] = str(self.marketplace)[-2:]
- df_rel = df.query("condition=='New'")
- df_rel = df_rel.groupby(['asin', 'sku', 'seller_id', 'marketplace_id', 'country_code']).agg(
- {'afn-fulfillable-quantity': sum}).reset_index()
- df_rel.columns = ['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_', 'afn-fulfillable-quantity']
- print(f"{seller_id}_{str(self.marketplace)[-2:]}_FBA_Inventory_OK")
- return df_rel
- def GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(self,refresh_token,conn=None,seller_id=None,days=-1,**kwargs): # FBA库存信息
- try:
- return self.get_fba_neccessary_segment(conn,seller_id)
- except Exception as e:
- print(e)
- try:
- time.sleep(15)
- return self.get_fba_neccessary_segment(conn, seller_id)
- except Exception as e:
- print(e)
- df_rel = pd.DataFrame(columns=['asin_', 'sku_', 'seller_id_', 'marketplace_id_', 'country_code_','afn-fulfillable-quantity'])
- return df_rel
- def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self,refresh_token,conn=None,seller_id=None,days=-1): # To datatable asj_ads.seller_listings- listing信息,包括fbm库存
- para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
- reportid = self.create_report(**para)
- df = self.decompression(reportid)
- if len(df)>0:
- if self.marketplace.marketplace_id =='A1VC38T7YXB528': # 该站点的数据列有所不同
- df.columns = ['item-name','listing-id','seller-sku','price','quantity','open-date','product-id-type','item-description',
- 'item-condition','overseas shipping','fast shipping','asin1','stock_number','fulfillment-channel','merchant-shipping-group','status']
- df['seller_id'] = seller_id
- df['marketplace_id'] = self.marketplace.marketplace_id
- df['country_code'] = str(self.marketplace)[-2:]
- if 'fulfilment-channel' in df.columns: # 判断是否存在’fulfilment‘字段(1个film),如果存在则添加一个’fulfillment‘字段(两个fillm)
- df['fulfillment-channel'] = df['fulfilment-channel'].copy()
- # 如果是amazon,则字段改为FBA
- df['fulfillment_channel'] = df['fulfillment-channel'].map(lambda x:"FBA" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "AMAZON" else x)
- # 如果是DEFAULT,则字段该为FBM
- df['fulfillment_channel'] = df['fulfillment_channel'].map(lambda x: "FBM" if not pd.isna(x) and len(x)>0 and str(x)[1:4] in "DEFAULT" else x)
- if 'asin1' not in df.columns: # 如果不存在asin1,则添加asin1字段
- df['asin1'] = ''
- if 'product-id' not in df.columns: # 如果不存在product-id,则添加product-id字段
- df['product-id'] = ''
- # 空值处理
- # df['quantity'] = df['quantity'].fillna(0).astype('int64',errors='ignore')
- df['quantity'] = df['quantity'].map(lambda x:0 if pd.isna(x)==True else int(x))
- #库存数量格式处理
- df['quantity'] = df['quantity'].astype('int64')
- # 填充NA值
- df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']] = df[['listing-id','seller_id','asin1','seller-sku','country_code','marketplace_id','fulfillment_channel','status','product-id']].fillna('').astype('string',errors='ignore')
- # 为NA的价格填充0
- df['price'] = df['price'].fillna(0.0).astype('float64',errors='ignore')
- df.fillna('',inplace=True)
- # 时间处理
- df['opendate'] = df['open-date'].map(lambda x: self.datetime_deal(x))
- df['update_datetime'] = datetime.now(pytz.UTC).date()
- # 保留列
- origin_columns = ['listing-id','seller_id',
- 'asin1','seller-sku','title','image_link','country_code',
- 'marketplace_id','quantity','fulfillment_channel',
- 'price','opendate','status','update_datetime','product-id','product-id-type','modifier'
- ]
- # 连接数据库
- conn = SpApiRequest.Data_auth()
- cursor = conn.cursor()
- # 执行语句,筛选出asin不为空并且product_id不为空的两列唯一数据。
- cursor.execute("""select product_id,asin from (select * from asj_ads.seller_listings where asin is not null
- and asin<>'' and product_id is not null and product_id <>'') t1 group by product_id,asin""")
- query_ = cursor.fetchall()
- col_name = [i[0] for i in cursor.description]
- df_datatable = pd.DataFrame(query_, columns=col_name)
- # 合并数据,左表为新下载的数据,右表为数据库查询的数据
- merged_df = df.merge(df_datatable[['product_id','asin']],how='left',left_on='product-id',right_on='product_id')
- # 功能函数,提取asin
- def func_(asin,asin1,product_id,cred,market_p,seller_id,sku):
- if 'B0' in str(product_id)[:3]:
- return str(product_id)
- if (pd.isna(asin1) or asin1=='') and (pd.isna(asin)==False and asin !=''):
- if 'B0' in asin[:3]:
- return asin
- elif (pd.isna(asin1)==False and asin1!=''):
- if 'B0' in asin1[:3]:
- return asin1
- listingClient = ListingsItems(credentials=cred, marketplace=market_p)
- try:
- r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- print(r1.payload)
- asin = r1.payload.get("summaries")[0].get("asin")
- return asin
- except Exception as e:
- print("获取图片url过程错误重试, 错误message: ", e)
- time.sleep(3)
- r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- print(r1.payload)
- asin = r1.payload.get("summaries")[0].get("asin")
- return asin
- # 应用处理函数,返回asin
- merged_df['asin1'] = merged_df.apply(lambda x:func_(x['asin'],x['asin1'],x['product-id'],self.credentials,self.marketplace,seller_id,x['seller-sku']),axis=1) #x['asin'] if pd.isna(x['asin1']) or x['asin1']=='' else x['asin1']
- merged_df['image_link'] = ''
- # 暂时将refresh_token添加在title列,后面获取asin详细数据时需要用到
- merged_df['title'] = refresh_token
- merged_df['modifier'] = ''
- # 填充NA值
- merged_df.fillna('',inplace=True)
- df1 = merged_df.copy()
- # df1.to_csv("第一次合并处理后.csv")
- #获取FBA库存数据
- df_fbaInventory = self.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token, conn, seller_id, days)
- # 合并fba库存数据
- if len(df_fbaInventory)>0:
- df1 = df1.merge(df_fbaInventory,how='left',left_on=['asin1','seller-sku','seller_id','marketplace_id','country_code'],right_on=['asin_','sku_','seller_id_','marketplace_id_','country_code_'])
- df1['quantity'] = df1.apply(lambda x:x['afn-fulfillable-quantity'] if x['fulfillment_channel']=='FBA' or pd.isna(['afn-fulfillable-quantity'])==False else x['quantity'],axis=1)
- df1['quantity'] = df1['quantity'].map(lambda x:0 if pd.isna(x) else int(x))
- df1['quantity'] = df1['quantity'].fillna(0)
- # df1.to_csv("第二次合并处理后的数据.csv")
- # 判断更新数据
- update_df = self.update_data(df1,seller_id,str(self.marketplace)[-2:],conn)
- if len(update_df)==0:
- return '无更新数据插入'
- # update_df['country_code'] = update_df['country_code'].map({"GB":"UK"})
- conn = SpApiRequest.Data_auth()
- cursor = conn.cursor()
- # 插入更新的数据
- try:
- insertsql = """insert into
- asj_ads.seller_listings(listing_id,seller_id,asin,sku,title,image_link,country_code,marketplace_id,quantity,
- fulfillment_channel,price,launch_datetime,status,update_datetime,product_id,product_id_type,modifier)
- values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- conn.begin()
- cursor.executemany(insertsql,tuple(update_df[origin_columns].to_numpy().tolist()))
- conn.commit()
- print("插入完成")
- delete_sql = f"""delete from asj_ads.seller_listings where update_datetime<'{(datetime.today()+timedelta(days=-3)).date().isoformat()}'"""
- cursor.execute(delete_sql)
- conn.commit()
- return '插入完成'
- except Exception as e:
- print("插入错误:",e)
- conn.rollback()
- return '出错回滚'
- return ''
- # def get_listing_info(self, sku,seller_id): # desprecated
- # listingClient = ListingsItems(credentials=self.credentials, marketplace=self.marketplace)
- # try:
- # r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- # # print(r1.payload)
- # json_content = r1.payload.get("summaries")[0]
- # item_name = json_content.get("itemName")
- # item_name ='###' if item_name==None else item_name
- # img = json_content.get("mainImage")
- # img_url = '###' if img is None else img.get("link")
- # return str(img_url)+"-----"+ str(item_name)
- # except Exception as e:
- # try:
- # print("获取图片url过程错误重试, 错误message: ",e)
- # time.sleep(3)
- # r1 = listingClient.get_listings_item(sellerId=seller_id, sku=sku)
- # print(r1.payload)
- # json_content = r1.payload.get("summaries")[0]
- #
- # item_name = json_content.get("itemName")
- # item_name = '###' if item_name == None else item_name
- # img = json_content.get("mainImage")
- # img_url = '###' if img is None else img.get("link")
- # return str(img_url)+"-----"+ str(item_name)
- # except Exception as e:
- # print(e)
- # return "###-----###"
- def datetime_deal(self,timestring): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, time deal -时间处理函数
- timezone_ = {"AEST":"Australia/Sydney","AEDT":"Australia/Sydney","PST":"America/Los_Angeles",
- "PDT":"America/Los_Angeles","CST":"America/Chicago","CDT":"America/Chicago",
- "MET":"MET","MEST":"MET","BST":"Europe/London","GMT":"GMT","CET":"CET",
- "CEST":"CET","JST":"Asia/Tokyo","BRT":"America/Sao_Paulo"}
- date_list = str.split(timestring,sep = ' ')
- if len(date_list)<3:
- try:
- return datetime.strptime(date_list[0],"%Y-%m-%d")
- except:
- try:
- return datetime.strptime(date_list[0], "%Y/%m/%d")
- except:
- try:
- return datetime.strptime(date_list[0], "%d/%m/%Y")
- except Exception as e:
- print(e)
- return datetime(1999, 12, 31, 0, 0, 0)
- try:
- time_date = datetime.strptime(date_list[0]+date_list[1],"%Y-%m-%d%H:%M:%S")
- timezone = pytz.timezone(timezone_[date_list[2]])
- time_ = timezone.localize(time_date)
- return time_.astimezone(pytz.UTC)
- except:
- try:
- time_date = datetime.strptime(date_list[0] + date_list[1], "%d/%m/%Y%H:%M:%S")
- timezone = pytz.timezone(timezone_[date_list[2]])
- time_ = timezone.localize(time_date)
- return time_.astimezone(pytz.UTC)
- except :
- try:
- time_date = datetime.strptime(date_list[0] + date_list[1], "%Y/%m/%d%H:%M:%S")
- timezone = pytz.timezone(timezone_[date_list[2]])
- time_ = timezone.localize(time_date)
- return time_.astimezone(pytz.UTC)
- except Exception as e1:
- print(e1)
- return datetime(1999,12,31,0,0,0)
- def update_data(self,df,seller_id,country_code,conn): # used in GET_FLAT_FILE_OPEN_LISTINGS_DATA, data compare
- conn = SpApiRequest.Data_auth()
- cursor = conn.cursor()
- columns = ['listing-id', 'seller_id',
- 'asin1', 'seller-sku', 'title', 'image_link', 'country_code',
- 'marketplace_id', 'quantity', 'fulfillment_channel',
- 'price', 'opendate', 'status', 'update_datetime', 'product-id', 'product-id-type','modifier'
- ]
- if country_code=='GB':
- country_code="UK"
- df['country_code'] = "UK"
- df_data = pd.DataFrame(columns=columns)
- delete_list = []
- marketplace_id = self.marketplace.marketplace_id
- try:
- cursor.execute(f"""select * from
- asj_ads.seller_listings where seller_id='{seller_id}' and marketplace_id='{marketplace_id}'""")
- col = [i[0] for i in cursor.description]
- query_rel = cursor.fetchall()
- df_rel = pd.DataFrame(query_rel, columns=col)
- # df_rel.to_csv("数据库数据.csv")
- #数据库数据
- df_rel['quantity'] = df_rel['quantity'].fillna(0).astype('int64')
- df_rel['price'] = df_rel['price'].fillna(0.0).astype('float64')
- df_rel['product_id_type'] = df_rel['product_id_type'].astype('int64')
- # 新数据
- df['update_datetime'] =df['update_datetime'].astype('datetime64[ns]')
- df['quantity'] = df['quantity'].fillna(0).astype('int64')
- df['price']= df['price'].fillna(0.0).astype('float64')
- row = 0
- while row < len(df):
- temp_df = df.iloc[row, :]
- temp_d = df.iloc[row:row+1, :]
- listing_id = temp_df['listing-id']
- asin = temp_df['asin1']
- sku = temp_df['seller-sku']
- quantity = temp_df['quantity']
- fulfillment_channel = temp_df['fulfillment_channel']
- price = temp_df['price']
- product_id = temp_df['product-id']
- title = temp_df['title']
- imageurl = temp_df['image_link']
- modifier = temp_df['modifier']
- temp = df_rel.query("""listing_id==@listing_id and asin==@asin and sku==@sku and quantity==@quantity and fulfillment_channel==@fulfillment_channel and price==@price and product_id==@product_id and country_code==@country_code and seller_id==@seller_id and title==@title and image_link==@imageurl and modifier==@modifier""")
- # print("需要关注数据(是否异常):",len(temp),temp.to_numpy().tolist()) if len(temp)>1 else 1
- if len(temp)>1:
- # temp = temp.head(1).to_numpy().tolist()
- df_data = pd.concat((df_data,temp_d),ignore_index=True) #df_data.append(temp_df, ignore_index=True)
- delete_list.append((seller_id, marketplace_id, sku, listing_id, product_id))
- # print(len(temp))
- elif len(temp)==0:
- df_data = pd.concat((df_data,temp_d),ignore_index=True)
- delete_list.append((seller_id,marketplace_id,sku,listing_id,product_id))
- row += 1
- print("判断不同数据条数",len(delete_list))
- print("预计更新数据条数",len(df_data))
- try:
- # print(tuple(delete_list))
- if len(delete_list)>0:
- query = f"""delete from asj_ads.seller_listings
- where (seller_id,marketplace_id,sku,listing_id,product_id) in %s""" #where (seller_id,country_code) in %s"""
- cursor.execute(query,(delete_list,))
- conn.commit()
- # print(delete_list)
- print("进行中...")
- except Exception as e:
- print(e)
- conn.rollback()
- return df_data
- except Exception as e:
- print("错误:", e)
- return df
- # def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self,seller_id,days=-2): # not be used,退货报告,空数据
- # shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
- # # print(shopReportday)
- # para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,
- # "dataStartTime": shopReportday, "dataEndTime": shopReportday,
- # }
- # reportid = self.create_report(**para) # {"ShowSalesChannel":"true"}
- # decom_df = self.decompression(reportid)
- # print(decom_df)
- # print(decom_df.columns)
- def GET_SALES_AND_TRAFFIC_REPORT(self, refresh_token,seller_id,days=-2,**kwargs): # To datatable asj_ads.SalesAndTrafficByAsin,销售流量表
- # ,level:Literal["PARENT","CHILD","SKU"]="PARENT")
- level = "PARENT" if len(kwargs.get("level"))==0 else kwargs.get("level")
- countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode")
- # print(level)
- shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
- print(shopReportday,countryCode,seller_id)
- try:
- conn = self.Data_auth()
- cursor = conn.cursor()
- except:
- time.sleep(5)
- conn = self.Data_auth()
- cursor = conn.cursor()
- if level == 'SKU':
- query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where seller_id='{seller_id}' and data_date='{shopReportday}' and countryCode='{countryCode}' and childAsin is not Null and sku is not Null"""
- elif level == 'CHILD':
- query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where seller_id='{seller_id}' and data_date='{shopReportday}' and countryCode='{countryCode}' and sku is null and childAsin is not null"""
- elif level == 'PARENT':
- query_judge = f"""select count(*) from asj_ads.SalesAndTrafficByAsin where seller_id='{seller_id}' and data_date='{shopReportday}' and countryCode='{countryCode}' and sku is null and childAsin is null"""
- else:
- return ''
- print(query_judge)
- cursor.execute(query_judge)
- rel = cursor.fetchall()
- # print()
- if rel[0][0]!=0:
- return '已存在'
- # print(shopReportday)
- para = {"reportType": ReportType.GET_SALES_AND_TRAFFIC_REPORT,
- "dataStartTime": shopReportday, "dataEndTime": shopReportday,
- "reportOptions":{"dateGranularity":"DAY","asinGranularity":level}
- }
- reportid = self.create_report(**para) # {"ShowSalesChannel":"true"}
- decom_df = self.decompression(reportid)
- # print(decom_df.columns[0])
- data_rel = self.sales_traffic_datadeal(decom_df.columns[0],seller_id,countryCode)
- try:
- conn = self.Data_auth()
- cursor = conn.cursor()
- except:
- time.sleep(5)
- conn = self.Data_auth()
- cursor = conn.cursor()
- # print(list(conn.query("select * from amz_sp_api.orderReport")))
- sql = f"""
- insert into asj_ads.SalesAndTrafficByAsin(data_date,data_marketpalceId,parent_asin,
- childAsin,sku,sBA_unitsOrdered,sBA_unitsOrderedB2B,sBA_amount,
- currencyCode,totalOrderItems,totalOrderItemsB2B,tBA_browserSessions,
- tBA_browserSessionsB2B,tBA_mobileAppSessions,tBA_mobileAppSessionsB2B,
- tBA_sessions,tBA_sessionsB2B,tBA_browserSessionPercentage,
- tBA_browserSessionPercentageB2B,tBA_mobileAppSessionPercentage,
- tBA_mobileAppSessionPercentageB2B,tBA_sessionPercentage,
- tBA_sessionPercentageB2B,tBA_browserPageViews,tBA_browserPageViewsB2B,
- tBA_mobileAppPageViews,tBA_mobileAppPageViewsB2B,tBA_pageViews,
- tBA_pageViewsB2B,tBA_browserPageViewsPercentage,tBA_browserPageViewsPercentageB2B,
- tBA_mobileAppPageViewsPercentage,tBA_mobileAppPageViewsPercentageB2B,
- tBA_pageViewsPercentage,tBA_pageViewsPercentageB2B,tBA_buyBoxPercentage,
- tBA_buyBoxPercentageB2B,tBA_unitSessionPercentage,tBA_unitSessionPercentageB2B,seller_id,countryCode)
- values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s)
- """ # ok
- try:
- # TODO
- conn.begin()
- cursor.executemany(sql, data_rel)
- conn.commit()
- print("插入完成")
- conn.close()
- time.sleep(1)
- except Exception as e:
- conn.rollback()
- print(e)
- def sales_traffic_datadeal(self,data,seller_id,countryCode): # used in GET_SALES_AND_TRAFFIC_REPORT
- data = eval(data)
- if len(data['salesAndTrafficByAsin'])==0:
- return []
- data_list = []
- data_date = data["reportSpecification"]["dataEndTime"]
- data_marketpalceId = data["reportSpecification"]["marketplaceIds"][0]
- # print(data_marketpalceId)
- for single_item in data["salesAndTrafficByAsin"]:
- # print(single_item)
- parent_asin = single_item.get("parentAsin")
- childAsin = single_item.get("childAsin")
- sku = single_item.get("sku")
- salesByAsin = single_item.get("salesByAsin")
- # if salesByAsin is not None:
- sBA_unitsOrdered = salesByAsin.get("unitsOrdered") if salesByAsin is not None else ''
- sBA_unitsOrderedB2B = salesByAsin.get("unitsOrderedB2B") if salesByAsin is not None else ''
- orderedProductSales = salesByAsin.get("orderedProductSales")
- sBA_amount = orderedProductSales.get("amount") if orderedProductSales is not None else ''
- currencyCode = orderedProductSales.get("currencyCode") if orderedProductSales is not None else ''
- orderedProductSalesB2B = salesByAsin.get("orderedProductSalesB2B") if salesByAsin is not None else None
- # if orderedProductSalesB2B is not None:
- oPS_amount = orderedProductSalesB2B.get("amount") if orderedProductSalesB2B is not None else ''
- totalOrderItems = salesByAsin.get("totalOrderItems") if salesByAsin is not None else ''
- totalOrderItemsB2B = salesByAsin.get("totalOrderItemsB2B") if salesByAsin is not None else ''
- trafficByAsin = single_item.get("trafficByAsin")
- # if trafficByAsin is not None:
- tBA_browserSessions = trafficByAsin.get("browserSessions") if trafficByAsin is not None else ''
- tBA_browserSessionsB2B = trafficByAsin.get("browserSessionsB2B") if trafficByAsin is not None else ''
- tBA_mobileAppSessions = trafficByAsin.get("mobileAppSessions") if trafficByAsin is not None else ''
- tBA_mobileAppSessionsB2B = trafficByAsin.get("mobileAppSessionsB2B") if trafficByAsin is not None else ''
- tBA_sessions = trafficByAsin.get("sessions") if trafficByAsin is not None else ''
- tBA_sessionsB2B = trafficByAsin.get("sessionsB2B") if trafficByAsin is not None else ''
- tBA_browserSessionPercentage = trafficByAsin.get("browserSessionPercentage") if trafficByAsin is not None else ''
- tBA_browserSessionPercentageB2B = trafficByAsin.get("browserSessionPercentageB2B") if trafficByAsin is not None else ''
- tBA_mobileAppSessionPercentage = trafficByAsin.get("mobileAppSessionPercentage") if trafficByAsin is not None else ''
- tBA_mobileAppSessionPercentageB2B = trafficByAsin.get("mobileAppSessionPercentageB2B") if trafficByAsin is not None else ''
- tBA_sessionPercentage = trafficByAsin.get("sessionPercentage") if trafficByAsin is not None else ''
- tBA_sessionPercentageB2B = trafficByAsin.get("sessionPercentageB2B") if trafficByAsin is not None else ''
- tBA_browserPageViews = trafficByAsin.get("browserPageViews") if trafficByAsin is not None else ''
- tBA_browserPageViewsB2B = trafficByAsin.get("browserPageViewsB2B") if trafficByAsin is not None else ''
- tBA_mobileAppPageViews = trafficByAsin.get("mobileAppPageViews") if trafficByAsin is not None else ''
- tBA_mobileAppPageViewsB2B = trafficByAsin.get("mobileAppPageViewsB2B") if trafficByAsin is not None else ''
- tBA_pageViews = trafficByAsin.get("pageViews") if trafficByAsin is not None else ''
- tBA_pageViewsB2B = trafficByAsin.get("pageViewsB2B") if trafficByAsin is not None else ''
- tBA_browserPageViewsPercentage = trafficByAsin.get("browserPageViewsPercentage") if trafficByAsin is not None else ''
- tBA_browserPageViewsPercentageB2B = trafficByAsin.get("browserPageViewsPercentageB2B") if trafficByAsin is not None else ''
- tBA_mobileAppPageViewsPercentage = trafficByAsin.get("mobileAppPageViewsPercentage") if trafficByAsin is not None else ''
- tBA_mobileAppPageViewsPercentageB2B = trafficByAsin.get("mobileAppPageViewsPercentageB2B") if trafficByAsin is not None else ''
- tBA_pageViewsPercentage = trafficByAsin.get("pageViewsPercentage") if trafficByAsin is not None else ''
- tBA_pageViewsPercentageB2B = trafficByAsin.get("pageViewsPercentageB2B") if trafficByAsin is not None else ''
- tBA_buyBoxPercentage = trafficByAsin.get("buyBoxPercentage") if trafficByAsin is not None else ''
- tBA_buyBoxPercentageB2B = trafficByAsin.get("buyBoxPercentageB2B") if trafficByAsin is not None else ''
- tBA_unitSessionPercentage = trafficByAsin.get("unitSessionPercentage") if trafficByAsin is not None else ''
- tBA_unitSessionPercentageB2B = trafficByAsin.get("unitSessionPercentageB2B") if trafficByAsin is not None else ''
- data_list.append([data_date,data_marketpalceId,parent_asin,
- childAsin,sku,sBA_unitsOrdered,sBA_unitsOrderedB2B,sBA_amount,
- currencyCode,totalOrderItems,totalOrderItemsB2B,tBA_browserSessions,
- tBA_browserSessionsB2B,tBA_mobileAppSessions,tBA_mobileAppSessionsB2B,
- tBA_sessions,tBA_sessionsB2B,tBA_browserSessionPercentage,
- tBA_browserSessionPercentageB2B,tBA_mobileAppSessionPercentage,
- tBA_mobileAppSessionPercentageB2B,tBA_sessionPercentage,
- tBA_sessionPercentageB2B,tBA_browserPageViews,tBA_browserPageViewsB2B,
- tBA_mobileAppPageViews,tBA_mobileAppPageViewsB2B,tBA_pageViews,
- tBA_pageViewsB2B,tBA_browserPageViewsPercentage,tBA_browserPageViewsPercentageB2B,
- tBA_mobileAppPageViewsPercentage,tBA_mobileAppPageViewsPercentageB2B,
- tBA_pageViewsPercentage,tBA_pageViewsPercentageB2B,tBA_buyBoxPercentage,
- tBA_buyBoxPercentageB2B,tBA_unitSessionPercentage,tBA_unitSessionPercentageB2B,seller_id,countryCode
- ])
- # print(data_list)
- return data_list
- def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self, refresh_token,seller_id,days=-1,**kwargs): # To datatable asj_ads.orderReport_ - 获取订单报告
- countryCode = None if kwargs.get("countryCode")==None else kwargs.get("countryCode")
- shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
- print(shopReportday)
- try:
- conn = self.Data_auth()
- cursor = conn.cursor()
- except Exception as e:
- print(e)
- time.sleep(5)
- conn = self.Data_auth()
- cursor = conn.cursor()
- query_judge = f"""select count(*) from asj_ads.orderReport_ where ReportDate='{shopReportday}' and country_code='{countryCode}' and seller_id='{seller_id}'"""
- print(query_judge)
- cursor.execute(query_judge)
- rel = cursor.fetchall()
- # print()
- if rel[0][0]!=0:
- print("已存在")
- return '已存在'
- para = {"reportType": ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,
- "dataStartTime": shopReportday, "dataEndTime": shopReportday,
- "reportOptions": {"ShowSalesChannel": "true"}}
- reportid = self.create_report(**para) # {"ShowSalesChannel":"true"}
- decom_df = self.decompression(reportid)
- # print(decom_df)
- # print(decom_df.info())
- decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].fillna(0.0)
- decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].fillna(0)
- decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].astype(
- 'string')
- if "purchase-order-number" in decom_df.columns:
- decom_df['purchase-order-number'] = decom_df['purchase-order-number'].astype("string")
- decom_df.fillna('', inplace=True)
- # decom_df.to_csv('order.csv')
- decom_df["ReportDate"] = parse(shopReportday)
- # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
- decom_df['timezone'] = "UTC"
- print("==========================================================")
- if len(decom_df)==0:
- return ""
- decom_df[["purchase-date", "last-updated-date"]] = decom_df[["purchase-date", "last-updated-date"]].applymap(
- lambda x: self.timeDeal(x) if pd.isna(x) == False or x != None else x)
- if 'is-business-order' not in decom_df.columns:
- decom_df['is-business-order'] = None
- if 'purchase-order-number' not in decom_df.columns:
- decom_df['purchase-order-number'] = '-'
- if 'price-designation' not in decom_df.columns:
- decom_df['price-designation'] = '-'
- decom_df['seller_id'] = seller_id
- country_code = str(self.marketplace)[-2:]
- if country_code == 'GB':
- country_code = "UK"
- # decom_df['country_code'] = "UK"
- decom_df['country_code'] = country_code
- decom_df['insert_time'] = datetime.now()
- # print(decom_df[])
- reserve_columns = ["amazon-order-id", "merchant-order-id", "purchase-date", "last-updated-date", "order-status",
- "fulfillment-channel", "sales-channel", "order-channel", "ship-service-level",
- "product-name",
- "sku", "asin", "item-status", "quantity", "currency", "item-price", "item-tax",
- "shipping-price",
- "shipping-tax", "gift-wrap-price", "gift-wrap-tax", "item-promotion-discount",
- "ship-promotion-discount", "ship-city", "ship-state", "ship-postal-code", "ship-country",
- "promotion-ids", "is-business-order", "purchase-order-number", "price-designation",
- "ReportDate",
- "timezone", "seller_id", "country_code",'insert_time'
- ]
- list_df = decom_df[reserve_columns].to_numpy().tolist()
- try:
- conn = self.Data_auth()
- cursor = conn.cursor()
- except:
- time.sleep(5)
- conn = self.Data_auth()
- cursor = conn.cursor()
- # print(list(conn.query("select * from amz_sp_api.orderReport")))
- sql = f"""
- insert into asj_ads.orderReport_(`amazon_order_id`, `merchant_order_id`, `purchase_date`, `last_updated_date`, `order_status`,
- `fulfillment_channel`, `sales_channel`, `order_channel`, `ship_service_level`,
- `product_name`,
- `sku`, `asin`, `item_status`, `quantity`, `currency`, `item_price`, `item_tax`,
- `shipping_price`,
- `shipping_tax`, `gift_wrap_price`, `gift_wrap_tax`, `item_promotion_discount`,
- `ship_promotion_discount`, `ship_city`, `ship_state`, `ship_postal_code`, `ship_country`,
- `promotion_ids`, `is_business_order`, `purchase_order_number`, `price_designation`,
- `ReportDate`,
- `timezone`, `seller_id`, `country_code`,`insert_time`)
- values (%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s)
- """ # ok
- try:
- conn.begin()
- cursor.executemany(sql, list_df)
- conn.commit()
- print("插入完成")
- conn.close()
- time.sleep(1)
- except Exception as e:
- conn.rollback()
- print(e)
- def timeDeal(self, orgTime): # time-deal
- orgTime = parse(orgTime)
- timezone = pytz.timezone("UTC")
- shopTime = orgTime.astimezone(timezone)
- shopTime_datetime = datetime(shopTime.year, shopTime.month, shopTime.day, shopTime.hour, shopTime.minute,
- shopTime.second)
- return shopTime_datetime
- @classmethod
- def listing_infoTable(cls): # To datatable asj_ads.Goods - Goods表,包括排名,图片,父子关系
- conn = SpApiRequest.Data_auth()
- cursor = conn.cursor()
- #
- cursor.execute(f"""select seller_id,country_code,asin,title from asj_ads.seller_listings where title is not null and title <>'' and (seller_id,country_code,asin) not in (select seller_id,countryCode,asin from asj_ads.Goods where update_time>='{datetime.today().date()}') group by seller_id,title,country_code,asin order by country_code desc""")
- query_ = cursor.fetchall()
- # print(query_)
- col_name = [i[0] for i in cursor.description]
- df_datatable = pd.DataFrame(query_, columns=col_name)
- count=0
- distance = 50
- print(len(df_datatable))
- while count<len(df_datatable):
- print(f"进度:{round(count / len(df_datatable) * 100, 2)}%")
- df = df_datatable.iloc[count:count+distance,:]
- count = count+distance
- df['detail_info'] = df.apply(lambda x: cls.get_listing_info01(x['title'],x['country_code'],x['asin'],x['seller_id']),axis=1)
- detail_info_k = df['detail_info'].map(lambda x: list(x.keys())).to_numpy().tolist()
- detail_info = df['detail_info'].to_numpy().tolist() #df['detail_info'].map(lambda x: list(x.values())).to_numpy().tolist()
- conn = SpApiRequest.Data_auth()
- print(count)
- SpApiRequest.Goods_insert(conn,detail_info,detail_info_k)
- if count%distance==0:
- cursor = conn.cursor()
- cursor.execute(
- f"""select seller_id,countryCode,asin from asj_ads.Goods where update_time>='{datetime.today().date()}'""")
- query_d = cursor.fetchall()
- # print(query_d)
- try:
- # print(tuple(delete_list))
- query = f"""delete from asj_ads.Goods where update_time<'{datetime.today().date()}'
- and (seller_id,countryCode,asin) in {query_d}
- """ #where (seller_id,country_code) in %s"""
- cursor.execute(query)
- conn.commit()
- # print(delete_list)
- # print("进行中...")
- except Exception as e:
- print(e)
- conn.rollback()
- print("Success")
- conn.close()
- @classmethod
- def Goods_drop_duplicates(cls):
- conn = SpApiRequest.Data_auth()
- cursor = conn.cursor()
- cursor.execute(
- f"""select seller_id,countryCode,asin as count_ from asj_ads.Goods group by seller_id,countryCode,asin having count(asin)>=2""")
- query_ = cursor.fetchall()
- print(len(query_))
- if len(query_)>0:
- query_sql2 = cursor.execute(
- f"""select distinct main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time from asj_ads.Goods where (seller_id,countryCode,asin) in {query_}""")#
- query_2 = cursor.fetchall()
- try:
- query = f"""delete from asj_ads.Goods where (seller_id,countryCode,asin) in {query_}
- """ # where (seller_id,country_code) in %s"""
- cursor.execute(query)
- conn.commit()
- except Exception as e:
- print("错误过程1",e)
- conn.rollback()
- sql = """insert into
- asj_ads.Goods(main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time)
- values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- try:
- conn.begin()
- cursor.executemany(sql, query_2)
- conn.commit()
- print("插入完成")
- conn.close()
- time.sleep(1)
- except Exception as e:
- conn.rollback()
- print("错误过程2",e)
- @staticmethod
- def get_listing_info01(refresh_token, countryCode, asin, seller_id): # used in listing_infoTable
- # print(refresh_token)
- aws_credentials = {
- 'refresh_token': refresh_token,
- 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证
- 'lwa_client_secret': 'amzn1.oa2-cs.v1.13ebfa8ea7723b478215a2b61f4dbf5ca6f6927fa097c2dd0941a66d510aca7f',
- 'aws_access_key': 'AKIARBAGHTGOZC7544GN',
- 'aws_secret_key': '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',
- 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
- }
- mak = {'AE': Marketplaces.AE, 'BE': Marketplaces.BE, 'DE': Marketplaces.DE,
- 'PL': Marketplaces.PL, 'EG': Marketplaces.EG, 'ES': Marketplaces.ES,
- 'FR': Marketplaces.FR, 'GB': Marketplaces.GB, 'IN': Marketplaces.IN,
- 'IT': Marketplaces.IT, 'NL': Marketplaces.NL, 'SA': Marketplaces.SA,
- 'SE': Marketplaces.SE, 'TR': Marketplaces.TR, 'UK': Marketplaces.UK,
- 'AU': Marketplaces.AU, 'JP': Marketplaces.JP, 'SG': Marketplaces.SG,
- 'US': Marketplaces.US,
- 'BR': Marketplaces.BR, 'CA': Marketplaces.CA, 'MX': Marketplaces.MX
- }
- cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode],version='2022-04-01')
- try:
- variations_info = SpApiRequest.variations_judge(cate_item, asin)
- except:
- time.sleep(2.5)
- variations_info = SpApiRequest.variations_judge(cate_item, asin)
- try:
- cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode], version='2020-12-01')
- detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode)
- except:
- time.sleep(2.5)
- cate_item = CatalogItems(credentials=aws_credentials, marketplace=mak[countryCode], version='2020-12-01')
- detail_info = SpApiRequest.get_detail_cat(cate_item, asin, mak, countryCode)
- detail_info.update(variations_info)
- detail_info['asin'] = asin
- detail_info['countryCode'] = countryCode
- detail_info['marketplace_id'] = mak[countryCode].marketplace_id
- detail_info['seller_id'] = seller_id
- detail_info['update_time'] = datetime.now()
- return detail_info
- @staticmethod
- def variations_judge(cate_item, asin): # used in listing_infoTable, 判断是否有父子关系
- def temp_func(cate_item, asin):
- variations = cate_item.get_catalog_item(asin=asin, **{"includedData": ['relationships']}) # 'variations',
- var_info = variations.payload
- # print(variations)
- IsParent = 'Y'
- parent_asin = ''
- try:
- relationships = var_info.get("relationships")[0]['relationships']
- except:
- relationships = []
- if len(relationships) > 0:
- variationType = relationships[0]['type']
- if relationships[0].get('parentAsins') is not None:
- parent_asin = relationships[0]['parentAsins'][0]
- IsParent = 'N'
- elif relationships[0].get('childAsins') is not None:
- IsParent = 'Y'
- parent_asin = asin
- else:
- parent_asin = 'Erro_01'
- else:
- IsParent = 'SG'
- parent_asin = asin
- print(IsParent, '父asin:', parent_asin, '子asin', asin)
- return {"IsParent": IsParent, "parent_asin": parent_asin}
- try:
- return temp_func(cate_item, asin)
- except:
- try:
- time.sleep(12)
- return temp_func(cate_item, asin)
- except Exception as e:
- print("判断是否为父子asin时出错:", e)
- return {"IsParent": 'Erro', "parent_asin": 'Erro'}
- @staticmethod
- def Goods_insert(conn,detail_info,detail_info_k): # To datatable asj.Goods
- # print(detail_info)
- df = pd.DataFrame(detail_info)
- # print(df.columns)
- try:
- cursor = conn.cursor()
- except:
- time.sleep(2.5)
- conn = SpApiRequest.Data_auth()
- cursor = conn.cursor()
- query_sql = "select * from asj_ads.Goods"
- cursor.execute(query_sql)
- col = [i[0] for i in cursor.description]
- query_rel = cursor.fetchall()
- df_query = pd.DataFrame(query_rel, columns=col)
- merge_df = df.merge(df_query,how='left',on=['asin', 'countryCode', 'marketplace_id', 'seller_id'],suffixes=['','_right'])
- merge_df['IsParent'] = merge_df.apply(lambda x:x['IsParent_right'] if x['IsParent']=='Erro' else x['IsParent'],axis=1)
- merge_df['parent_asin'] = merge_df.apply(lambda x: x['parent_asin_right'] if x['parent_asin'] == 'Erro' else x['parent_asin'], axis=1)
- merge_df = merge_df.query("IsParent!='Erro' and parent_asin!='Erro'")
- detail_info_value = merge_df[['main_image', 'productTypes', 'BigCat_rank', 'BigCat_title', 'SmallCat_rank',
- 'SmallCat_title', 'brandName', 'browseNode', 'itemName', 'IsParent', 'parent_asin',
- 'asin', 'countryCode', 'marketplace_id', 'seller_id', 'update_time']].to_numpy().tolist()
- try:
- insertsql = """insert into
- asj_ads.Goods(main_image, productTypes, BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, browseNode, itemName, IsParent, parent_asin, asin, countryCode, marketplace_id, seller_id, update_time)
- values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- conn.begin()
- cursor.executemany(insertsql, tuple(detail_info_value))
- conn.commit()
- print("插入完成Goods")
- except Exception as e:
- print("插入错误Goods:", e)
- conn.rollback()
- sales_rankData = []
- for i in detail_info_value:
- tmp_list = []
- for j in [2,3,4,5,6,8,11,12,13,14]:
- tmp_list.extend([i[j]])
- tmp_list.extend([datetime.now(),datetime.utcnow()])
- sales_rankData.append(tmp_list)
- # print(sales_rankData,len(sales_rankData))
- try:
- insertsql = """insert into
- asj_ads.ProductRank(BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title, brandName, itemName, asin, countryCode, marketplace_id, seller_id, update_time,time_UTC)
- values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- conn.begin()
- cursor.executemany(insertsql, tuple(sales_rankData))
- conn.commit()
- print("插入完成rank")
- # return '插入完成'
- except Exception as e:
- print("插入错误rank:", e)
- conn.rollback()
- @staticmethod
- def get_detail_cat(cate_item, asin, mak, countryCode): # used in listing_infoTable, category sp-api - 获取listing的信息数据
- try:
- detail_info = cate_item.get_catalog_item(asin=asin, **{
- "includedData": ["images,productTypes,salesRanks,summaries"],
- "marketplaceIds": [str(mak[countryCode].marketplace_id)]})
- payload = detail_info.payload
- # print(payload)
- try:
- main_image = payload['images'][0]['images'][0]['link'] if len(payload['images']) > 0 else "#"
- except:
- main_image = '#-'
- try:
- productTypes = payload['productTypes'][0]['productType'] if len(payload['productTypes'][0]) > 0 else "#"
- except:
- productTypes = '#-'
- try:
- # print(payload['ranks'][0])
- if len(payload['ranks'][0]) > 0:
- BigCat_rank = payload['ranks'][0]['ranks'][0]['rank']
- BigCat_title = payload['ranks'][0]['ranks'][0]['title']
- SmallCat_rank = payload['ranks'][0]['ranks'][1]['rank']
- SmallCat_title = payload['ranks'][0]['ranks'][1]['title']
- else:
- BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title = 0, '#', 0, '#'
- except:
- BigCat_rank, BigCat_title, SmallCat_rank, SmallCat_title = 0, '#-', 0, '#-'
- try:
- if len(payload['summaries'][0]) > 0:
- brandName = payload['summaries'][0]['brandName']
- browseNode = payload['summaries'][0]['browseNode']
- itemName = payload['summaries'][0]['itemName']
- else:
- brandName, browseNode, itemName = '#', '#', '#'
- except:
- brandName, browseNode, itemName = '#-', '#-', '#-'
- return {'main_image': main_image, 'productTypes': productTypes, 'BigCat_rank': BigCat_rank,
- 'BigCat_title': BigCat_title, 'SmallCat_rank': SmallCat_rank, 'SmallCat_title': SmallCat_title,
- 'brandName': brandName, 'browseNode': browseNode, 'itemName': itemName}
- except:
- return {'main_image': '', 'productTypes': '', 'BigCat_rank': 0,
- 'BigCat_title': '', 'SmallCat_rank': 0, 'SmallCat_title': '',
- 'brandName': '', 'browseNode': '', 'itemName': ''}
- def GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT(self, refresh_token,seller_id,days=-3,**kwargs):
- shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-04-21")
- shopReportday_E = (datetime.now() + timedelta(days=days)).strftime("%Y-04-27")
- print(shopReportday)
- para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT,"reportOptions":{"reportPeriod": "WEEK"},"dataStartTime": shopReportday, "dataEndTime": shopReportday_E,}
- reportid = self.create_report(**para)
- @staticmethod
- def GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT_DAY(self,**kwargs):
- shopReportday = "2024-09-04"
- shopReportday_E = "2024-09-04"
- print(shopReportday)
- para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT,"reportOptions":{"reportPeriod": "DAY"},"dataStartTime": shopReportday, "dataEndTime": shopReportday_E,}
- reportid = self.create_report(**para)
- def BRAND_ANALYTICS_TEXT_deal(self,text):
- pass
- def GET_SELLER_FEEDBACK_DATA(self,refresh_token,seller_id,days,**a_kw):
- kwargs = a_kw
- country_code = str(self.marketplace)[-2:]
- if country_code == 'GB':
- country_code = "UK"
- insert_time = datetime.now()
- shopReportday = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d")
- shopReportday_E = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d")
- client = get_client(host='3.93.43.158', port=8123, username='root',
- password='6f0eyLuiVn3slzbGWpzI')
- tmp_df = client.query_df(
- """select * from ams.performance where seller_id='%s' and country_code='%s' and date='%s' and isFeedback=1""" % (
- seller_id, country_code, shopReportday))
- if len(tmp_df) > 0:
- print("数据已存在")
- return 'Done'
- print(shopReportday)
- para = {"reportType": ReportType.GET_SELLER_FEEDBACK_DATA,#GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA
- "dataStartTime": shopReportday,
- "dataEndTime": shopReportday_E, "reportOptions":{"reportPeriod": "DAY"}}
- try:
- reportid = self.create_report(**para)
- decom_df = self.decompression(reportid)
- except:
- return pd.DataFrame()
- if len(decom_df) == 0:
- return reportid
- decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap(
- lambda x: str(x) if not pd.isna(x) else '')
- decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap(
- lambda x: str(x) if not pd.isna(x) else '')
- decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[
- decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
- decom_df.fillna('', inplace=True)
- # decom_df.to_csv('order.csv')
- decom_df["ReportDate"] = parse(shopReportday)
- # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
- decom_df['timezone'] = "UTC"
- # print("==========================================================")
- decom_df['seller_id'] = seller_id
- decom_df['country_code'] = country_code
- decom_df['insert_time'] = datetime.now()
- df_feedback = decom_df.copy()
- df_feedback[["date", "rating", "comment", "order_id"]] = df_feedback[["Date", "Rating", "Comments", "Order ID"]]
- df_feedback['isFeedback'] = 1
- df_feedback['date'] = df_feedback['date'].map(lambda x: pd.to_datetime(x).date())
- df_feedback['seller_id'] = seller_id
- df_feedback['country_code'] = country_code
- # client = get_client(host='3.93.43.158', port=8123, username='root',
- # password='6f0eyLuiVn3slzbGWpzI')
- try:
- client.insert_df(table='ams.performance', df=df_feedback[
- ["date", "rating", "comment", "order_id", "isFeedback", 'seller_id', 'country_code','insert_time']],
- column_names=["date", "rating", "comment", "order_id", "isFeedback", 'seller_id',
- 'country_code','insert_time'])
- except Exception as e:
- print(e)
- client.close()
- # print(decom_df.columns)
- # print(decom_df)
- return reportid
- def GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA(self,refresh_token,seller_id,days,**a_kw):
- kwargs = a_kw
- country_code = str(self.marketplace)[-2:]
- if country_code == 'GB':
- country_code = "UK"
- insert_time = datetime.now()
- shopReportday = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d")
- shopReportday_E = (datetime.now().date() + timedelta(days=days)).strftime("%Y-%m-%d")
- # print(shopReportday)
- # print("1"*50)
- client = get_client(host='3.93.43.158', port=8123, username='root',
- password='6f0eyLuiVn3slzbGWpzI')
- tmp_df = client.query_df(
- """select * from ams.performance where seller_id='%s' and country_code='%s' and date='%s' and isFeedback=0""" % (
- seller_id, country_code, shopReportday))
- if len(tmp_df) > 0:
- print("数据已存在")
- return 'Done'
- para = {"reportType": ReportType.GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA,#GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA
- "dataStartTime": shopReportday,
- "dataEndTime": shopReportday_E, "reportOptions":{"reportPeriod": "DAY"}}
- try:
- reportid = self.create_report(**para)
- decom_df = self.decompression(reportid)
- except:
- return pd.DataFrame()
- if len(decom_df) == 0:
- return reportid
- # print("2"*100)
- decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap(
- lambda x: str(x) if not pd.isna(x) else '')
- decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap(
- lambda x: str(x) if not pd.isna(x) else '')
- decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[
- decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
- decom_df.fillna('', inplace=True)
- # decom_df.to_csv('order.csv')
- decom_df["ReportDate"] = parse(shopReportday)
- # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
- decom_df['timezone'] = "UTC"
- print("==========================================================")
- decom_df['seller_id'] = seller_id
- decom_df['country_code'] = country_code
- decom_df['insert_time'] = datetime.now()
- df_fbaR = decom_df.copy()
- df_fbaR[['date', 'order_id', 'product_name', 'detailed_disposition', 'comment']] = df_fbaR[
- ['return-date', 'order-id', 'product-name', 'detailed-disposition', 'customer-comments']]
- # df_fbaR[['return-date','order-id','sku','asin','fnsku','product-name','quantity','reason','detailed-disposition','customer-comments']]
- df_fbaR['isFeedback'] = 0
- df_fbaR['date'] = df_fbaR['date'].map(lambda x: pd.to_datetime(x).date())
- df_fbaR['comment'].fillna("", inplace=True)
- # df_fbaR[['order_id','sku','asin','fnsku','detailed_disposition','product_name','isFeedback','date']]
- try:
- client.insert_df(table='ams.performance', df=df_fbaR[
- ['order_id', 'sku', 'asin', 'fnsku', 'detailed_disposition', 'product_name', 'isFeedback', 'date',
- 'comment', 'seller_id', 'country_code','insert_time']],
- column_names=['order_id', 'sku', 'asin', 'fnsku', 'detailed_disposition', 'product_name',
- 'isFeedback', 'date', 'comment', 'seller_id', 'country_code','insert_time'])
- except Exception as e:
- print(e)
- client.close()
- # print(decom_df.columns)
- # print(decom_df)
- return reportid
- def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self, refresh_token, seller_id, days, **a_kw):
- kwargs = a_kw
- country_code = str(self.marketplace)[-2:]
- if country_code == 'GB':
- country_code = "UK"
- insert_time = datetime.now()
- shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
- shopReportday_E = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
- client = get_client(host='3.93.43.158', port=8123, username='root',
- password='6f0eyLuiVn3slzbGWpzI')
- tmp_df = client.query_df("""select * from ams.return_record where seller_id='%s' and country_code='%s' and ReportDate='%s'""" % (seller_id,country_code,shopReportday))
- if len(tmp_df)>0:
- print("数据已存在")
- return 'Done'
- para = {"reportType": ReportType.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE,
- # GET_SELLER_FEEDBACK_DATA,# GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA
- "dataStartTime": shopReportday,
- "dataEndTime": shopReportday_E, "reportOptions": {"reportPeriod": "DAY"}}
- try:
- reportid = self.create_report(**para)
- decom_df = self.decompression(reportid)
- except Exception as e:
- print(e)
- return pd.DataFrame()
- if len(decom_df) == 0:
- return reportid
- print("执行插入")
- decom_df[decom_df.select_dtypes(float).columns] = decom_df[decom_df.select_dtypes(float).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
- decom_df[decom_df.select_dtypes(int).columns] = decom_df[decom_df.select_dtypes(int).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
- decom_df[decom_df.select_dtypes(datetime).columns] = decom_df[decom_df.select_dtypes(datetime).columns].applymap(lambda x: str(x) if not pd.isna(x) else '')
- decom_df.fillna('', inplace=True)
- # decom_df.to_csv('order.csv')
- #parse(shopReportday)
- # decom_df['timezone'] = decom_df["purchase-date"].map(lambda x: parse(x).tzname()).fillna(method='bfill')
- decom_df['timezone'] = "UTC"
- decom_df['seller_id'] = seller_id
- decom_df['country_code'] = country_code
- decom_df['insert_time'] = datetime.now()
- decom_df['Order date'] = decom_df['Order date'].map(lambda x:'' if pd.isna(x) or x=='' else parse(x))
- decom_df['Return request date'] = decom_df['Return request date'].map(lambda x:'' if pd.isna(x) or x=='' else parse(x))
- decom_df["ReportDate"] = decom_df["Return request date"]
- # print(decom_df.dtypes)
- # print("======================-------------==================")
- # decom_df['SafeT claim reimbursement amount'] = decom_df['SafeT claim reimbursement amount'].map(str)
- # decom_df['Refunded Amount'] = decom_df['Refunded Amount'].map(str)
- # decom_df['Label cost'] = decom_df['Label cost'].astype('string')
- # print(decom_df.dtypes)
- print("="*100)
- try:
- client.insert_df('ams.return_record',df=decom_df[['Order ID', 'Order date', 'Return request date',
- 'Return request status', 'Amazon RMA ID', 'Merchant RMA ID',
- 'Label type', 'Label cost', 'Currency code', 'Return carrier',
- 'Tracking ID', 'Label to be paid by', 'A-to-Z Claim', 'Is prime',
- 'ASIN', 'Merchant SKU', 'Item Name', 'Return quantity', 'Return Reason',
- 'In policy', 'Return type', 'Resolution', 'Invoice number',
- 'Return delivery date', 'Order Amount', 'Order quantity',
- 'SafeT Action reason', 'SafeT claim id', 'SafeT claim state',
- 'SafeT claim creation time', 'SafeT claim reimbursement amount',
- 'Refunded Amount', 'ReportDate', 'timezone', 'seller_id',
- 'country_code', 'insert_time']],column_names=['Order_ID', 'Order_date', 'Return_request_date','Return_request_status',
- 'Amazon_RMA_ID', 'Merchant_RMA_ID','Label_type', 'Label_cost', 'Currency_code',
- 'Return_carrier','Tracking_ID', 'Label_to_be_paid_by', 'A-to-Z_Claim', 'Is_prime',
- 'ASIN', 'Merchant_SKU', 'Item_Name', 'Return_quantity', 'Return_Reason',
- 'In_policy', 'Return_type', 'Resolution', 'Invoice_number','Return_delivery_date',
- 'Order_Amount', 'Order_quantity','SafeT_Action_reason', 'SafeT_claim_id',
- 'SafeT_claim_state','SafeT_claim_creation_time', 'SafeT_claim_reimbursement_amount',
- 'Refunded_Amount', 'ReportDate', 'timezone', 'seller_id','country_code', 'insert_time'])
- print("完成插入")
- except Exception as e:
- print(e)
- client.close()
- return reportid
- # def GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(self, refresh_token,seller_id,days=-1,**kwargs):
- # countryCode = None if kwargs.get("countryCode") == None else kwargs.get("countryCode")
- # shopReportday = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
- # print(shopReportday)
- # para = {"reportType": ReportType.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL,
- # "dataStartTime": shopReportday, "dataEndTime": shopReportday,
- # "reportOptions": {"ShowSalesChannel": "true"}}
- # reportid = self.create_report(**para) # {"ShowSalesChannel":"true"}
- # decom_df = self.decompression(reportid)
- # decom_df['seller_id'] = seller_id
- # country_code = str(self.marketplace)[-2:]
- # if country_code == 'GB':
- # country_code = "UK"
- # # decom_df['country_code'] = "UK"
- # decom_df['country_code'] = country_code
- # decom_df['insert_time'] = datetime.now()
- # shopReportday = "2024-11-04"
- # shopReportday_E = "2024-11-04"
- # print(shopReportday)
- # para = {"reportType": ReportType.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE,
- # # GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE |GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA
- # "dataStartTime": shopReportday,
- # "dataEndTime": shopReportday_E, "reportOptions": {"reportPeriod": "DAY"}}
- # reportid = self.create_report(**para)
- # print(reportid)
- # return reportid
- @staticmethod
- def data_judge_secondTry(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # Main-retry, 重试函数,重试次数2次
- a_kw = kwargs
- try:
- SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw)
- except Exception as e:
- try:
- if e.args[0][0]['code']=='InvalidInput':
- return ''
- print('first time to try...')
- time.sleep(15)
- SpApiRequest.data_judge(refresh_token,sp_api, data_type, seller_id, auth_conn,days=days,**a_kw)
- except Exception as e:
- print(e)
- print('twice time to try...')
- time.sleep(20)
- SpApiRequest.data_judge(refresh_token, sp_api, data_type, seller_id, auth_conn, days=days, **a_kw)
- @staticmethod
- def data_judge(refresh_token,sp_api,data_type,seller_id,auth_conn,days=-1,**kwargs): # select Report type - 报告获取类型判断
- a_kw = kwargs
- if data_type == "GET_FLAT_FILE_OPEN_LISTINGS_DATA":
- return sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(refresh_token,auth_conn,seller_id,days)
- elif data_type =="GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL":
- # for day_ in range(31,1):
- # sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(seller_id,days=day_*-1)
- return sp_api.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(refresh_token,seller_id,days,**a_kw)
- # elif data_type =="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE":
- # return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(refresh_token,seller_id,days)
- elif data_type =="GET_SALES_AND_TRAFFIC_REPORT":
- return sp_api.GET_SALES_AND_TRAFFIC_REPORT(refresh_token,seller_id,days,**a_kw)
- elif data_type == "GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA":
- return sp_api.GET_FBA_MYI_UNSUPPRESSED_INVENTORY_DATA(refresh_token,auth_conn,seller_id,days,**a_kw)
- elif data_type=="GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT":
- return sp_api.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT(refresh_token,seller_id,days,**a_kw)
- elif data_type=="GET_SELLER_FEEDBACK_DATA":
- return sp_api.GET_SELLER_FEEDBACK_DATA(refresh_token,seller_id,days,**a_kw)
- elif data_type=="GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA":
- return sp_api.GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA(refresh_token,seller_id,days,**a_kw)
- elif data_type=="GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE":
- return sp_api.GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE(refresh_token,seller_id,days,**a_kw)
- else:
- return ""
- @classmethod
- def get_allShops(cls,data_type=Literal["GET_FLAT_FILE_OPEN_LISTINGS_DATA","GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"],days=-1,**kwargs): # Main-AllCountries-AuthAndPost, 所有店铺数据报告获取的主要函数
- df = cls.auth_info()
- zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
- # zosi = df.query("account_name==' ANX防犯工房'")['refresh_token'].to_numpy().tolist()
- # print(zosi)
- refreshtoken_list = df.query("account_name!='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
- # refreshtoken_list = []
- zosi.extend(refreshtoken_list)
- refreshtoken_list = zosi.copy()
- print(len(refreshtoken_list))
- # refreshtoken_list = refreshtoken_list+refreshtoken_li
- a_kw = kwargs
- for refresh_token in refreshtoken_list:
- aws_credentials = {
- 'refresh_token': refresh_token,
- 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证
- 'lwa_client_secret': 'amzn1.oa2-cs.v1.13ebfa8ea7723b478215a2b61f4dbf5ca6f6927fa097c2dd0941a66d510aca7f',
- 'aws_access_key': 'AKIARBAGHTGOZC7544GN',
- 'aws_secret_key': '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',
- 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
- }
- single_info = df.query("refresh_token==@refresh_token")
- region_circle = single_info['region'].values[0]
- seller_id = single_info['selling_partner_id'].values[0]
- account_name = single_info['account_name'].values[0]
- if region_circle == 'NA':
- pass
- for marketplace in [Marketplaces.US, Marketplaces.BR, Marketplaces.CA,Marketplaces.MX]:
- sp_api = SpApiRequest(aws_credentials, marketplace)
- a_kw['countryCode'] = str(marketplace)[-2:]
- # print(a_kw)
- try:
- print("refresh_token:",refresh_token,'data_type:',data_type,'seller_id:',seller_id,'marketplace:',marketplace.marketplace_id,'country_code:',a_kw['countryCode'],sep='\n')
- auth_conn = SpApiRequest.Token_auth()
- print("CHECK>>>>>>>>>>>>>>>>>>>>>>>>>")
- # print(a_kw)
- cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
- ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
- except Exception as e:
- print(e)
- time.sleep(3.5)
- elif region_circle == 'EU':
- pass
- for marketplace in [Marketplaces.DE,Marketplaces.AE, Marketplaces.BE, Marketplaces.PL,
- Marketplaces.EG,Marketplaces.ES, Marketplaces.GB, Marketplaces.IN, Marketplaces.IT,
- Marketplaces.NL, Marketplaces.SA, Marketplaces.SE, Marketplaces.TR,Marketplaces.UK,Marketplaces.FR,
- ]:
- sp_api = SpApiRequest(aws_credentials, marketplace)
- a_kw['countryCode'] = str(marketplace)[-2:]
- try:
- auth_conn = SpApiRequest.Token_auth()
- cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
- ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
- except Exception as e:
- print(e)
- time.sleep(3.5)
- else:
- # if region_circle not in ['NA','EU']:
- auth_conn = SpApiRequest.Token_auth()
- print(region_circle)
- marketplace = eval(f'Marketplaces.{region_circle}')
- sp_api = SpApiRequest(aws_credentials, marketplace)
- a_kw['countryCode'] = str(marketplace)[-2:]
- cls.data_judge_secondTry(refresh_token,sp_api, data_type, seller_id, auth_conn,days,**a_kw)
- ## sp_api.GET_FLAT_FILE_OPEN_LISTINGS_DATA(auth_conn, seller_id)
- def price_tracker(self,asin):
- base_product = Products(credentials=self.credentials,marketplace=Marketplaces.US)
- return base_product.get_product_pricing_for_asins(asin)
- if __name__ == '__main__':
- # for days in range(-17,-2):
- # SpApiRequest.get_allShops("GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE",days=days,**{})
- for days in range(-5,-60,-1):
- SpApiRequest.get_allShops("GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA",days=days,**{})
- #
- # SpApiRequest.listing_infoTable()
- # rel = SpApiRequest.get_catelog(account_name='AM-ZOSI-US',country=Marketplaces.US,asin='B0B8CPHSL4')
- # print(rel)
- # SpApiRequest
- ###TEST
- # df = SpApiRequest.auth_info()
- # print(df)
- # zosi = df.query("account_name=='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
- # # print(zosi)
- # refreshtoken_list = df.query("account_name!='AM-ZOSI-US'")['refresh_token'].to_numpy().tolist()
- # zosi.extend(refreshtoken_list)
- # refreshtoken_list = zosi.copy()
- # aws_credentials = {
- # 'refresh_token': refreshtoken_list[0],
- # 'lwa_app_id': 'amzn1.application-oa2-client.1f9d3d4747e14b22b4b598e54e6b922e', # 卖家中心里面开发者资料LWA凭证
- # 'lwa_client_secret': 'amzn1.oa2-cs.v1.13ebfa8ea7723b478215a2b61f4dbf5ca6f6927fa097c2dd0941a66d510aca7f',
- # 'aws_access_key': 'AKIARBAGHTGOZC7544GN',
- # 'aws_secret_key': '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',
- # 'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
- # }
- # sp_api = SpApiRequest(aws_credentials, Marketplaces.US)
- # id = sp_api.GET_SELLER_FEEDBACK_DATA()
- # print(id)
- # df= sp_api.decompression(str(id))
- # print(df)
- # df.to_csv('GET_SELLER_FEEDBACK_DATA.csv')
- #### END
- # print(sp_api.price_tracker(['B00L3W2QJ2']))
- # shopReportday = "2024-08-25"
- # shopReportday_E = "2024-08-31"
- # print(shopReportday)
- # # para = {"reportType": ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT, "reportOptions": {"reportPeriod": "WEEK"},
- # # "dataStartTime": shopReportday, "dataEndTime": shopReportday_E, }
- # # report = Reports(credentials=aws, marketplace=self.marketplace)
- # report = Reports(credentials=aws_credentials, marketplace=Marketplaces.US)
- # rel = report.create_report(
- # reportType=ReportType.GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT, marketplaceIds=['ATVPDKIKX0DER'],
- # reportOptions={"reportPeriod": "WEEK"}, dataStartTime=shopReportday, dataEndTime=shopReportday_E
- # )
- # reportId = rel.payload.get("reportId")
- #
- # while True:
- # reportId_info = report.get_report(reportId=reportId)
- # # print(reportId_info.payload)
- # print("please wait...",reportId_info.payload.get("processingStatus"))
- # # print(reportId_info)
- # if reportId_info.payload.get("processingStatus") == ProcessingStatus.DONE:
- # print(reportId_info.payload)
- # reportDocumentId = reportId_info.payload.get("reportDocumentId")
- # rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=False)
- # print(rp_table)
- # elif reportId_info.payload.get("processingStatus") == ProcessingStatus.FATAL:
- # reportDocumentId = reportId_info.payload.get("reportDocumentId")
- # rp_table = report.get_report_document(reportDocumentId=reportDocumentId, download=False)
- # import requests
- # # requests.get(rp_table['payload']['url'])
- #
- # df = pd.read_table(filepath_or_buffer=rp_table.payload['url'], compression={"method": 'gzip'},
- # encoding='iso-8859-1')
- # # df = pandas.read_csv(rp_table['payload']['url'],)
- # print(df.to_dict(orient='records'))
- # break
|