#res=DbService.getTrainData("F20000",3)#sqlalchemy查询结果--》rowproxy #res=DbService.getTestMysqlDb("F20000",3)#MySQLdb查询结果--》tuple #res=MetaDataDao.findByDrgId2("F20000")#PymysqlPool封装查询结果---》dict #row2Dataframe(res) #convert2Dataframe(res) #dict2Dataframe(res)#tuple转dataframe====>mysqlDb查询结果def convert2Dataframe(data): df=pd.DataFrame(list(data),columns=["akc190","yka055","yp_sum","ypzf_yp","hl_sum"]) ss=df[["yka055","akc190"]]#dataframe数据选取 print(df)#dict转dataframedef dict2Dataframe(data): df=pd.DataFrame(data) df2=pd.DataFrame.from_dict(data) print(df)#rowproxy转dataframedef row2Dataframe(data): arrs=[] for d in data: flag1=type(d) # todo 转tuple res=d._row arrs.append(res) #todo list[tuple] 转dataframe convert2Dataframe(arrs)#第二种:rowproxy转dataframedef row2Dataframe(data): df=pd.DataFrame(data,columns=["akc190","yka055","yp_sum","ypzf_yp","hl_sum"]) ss=df[["yka055","akc190"]]#dataframe数据选取
pandas的df与spark的df进行互转
import pandas as pdfrom pyspark import SQLContextfrom pyspark.sql import SparkSessionimport pyspark.sql.types as typ'''pandas dataframe转spark df'''##直接调用官方api进行转换def createSpark(): spark=SparkSession.builder.getOrCreate() df=pd.read_excel('C:\\Users\\shea\\Desktop\\测试.xlsx') print(df) #使用sqlContext转换 sqlContext=SQLContext(spark.sparkContext) spark_df=sqlContext.createDataFrame(df) spark_df.select("bah").show()#使用shema转换--针对某些列 有特殊值,不能模板匹配类型def convert2Spakr(): columns=[ ('bah',typ.StringType()), ('name', typ.StringType()), ('sex', typ.IntegerType()), ('akc194', typ.StringType()), ('zzfs', typ.StringType()) ] schema=typ.StructType([ typ.StructField(index[0],index[1],False) for index in columns ]) spark = SparkSession.builder.getOrCreate() df = pd.read_excel('C:\\Users\\shea\\Desktop\\测试.xlsx') df['akc194']=df['akc194'].map(lambda x:str(x))#操作某一列,将timestamp转为为string #dataframe转list ss=df.values.tolist() ss_tuple=list(map(lambda x:tuple(x),ss)) spark_df=spark.createDataFrame(ss_tuple,schema=schema) spark_df.show()if __name__ == '__main__': #createSpark() convert2Spakr()