I am reading a text file into a dataframe and writing that data into a parquet file. Now, when I read the created parquet file using parquet-tool then I am getting the scientific notation value instead of the actual value mentioned in the test file.
Sample data in text file:-
111173245.136459|131856.12
Writing to parquet file using below code:-
df.write.format("parquet").save(test.parquet)
When I read it using parquet-tools it gives me an exponential value (see column1) but not for the column2. I have provided schema as a string for both columns but the values are still getting changed to scientific notation(I am thinking since the provided schema is a string, values should be read as is without making any changes to it). Since my code is not manipulating anything with these columns so unable to understand why it is converting it to the exponential while writing to parquet.
parquet-tools show --columns col1,col2 ./test.parquet
OUTPUT:-
| Column1 | Column2 |
|---|---|
| 1.11073e+08 | 130850 |
Please help me to understand it or I need to add any property while writing the parquet file so that values will not convert to scientific notation.
Sample Code :-
import io,csv
import base64
import pandas as pd
import pyspark.sql.types
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *
###################################################
spark = SparkSession.builder.master('local').config('spark.sql.session.timeZone', 'UTC').getOrCreate()
sc = spark.sparkContext
###################################################
rDelim='\r\n'
cDelim='|'
rDelim = rDelim.replace('\\n','\n')
rDelim = rDelim.replace('\\r','\r')
encoding = 'UTF-8'
###################################################
fileName='test.bcp'
col=['Column1|StringType','Column2|StringType'];
###################################################
def typeName(dType):
return getattr(pyspark.sql.types, dType)()
###################################################
def getType():
cFields=[]
for x in col:
colList = x.split("|")
cFields.append(StructField(colList[0], typeName(colList[1]), True))
schema = StructType(cFields)
return schema
###################################################
def csvStr(x):
output = io.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip()
###################################################
def creataeParquet(schema):
rdd = sc.newAPIHadoopFile(fileName, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat","org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": rDelim}).map(lambda line: line[1].split(cDelim))
df = spark.read.format(
"com.databricks.spark.csv").schema(schema).option(
"escape", '\"').option(
"quote", '\"').option(
"header", "false").option(
"encoding", encoding).csv(
rdd.map(csvStr))
]
df.printSchema();
df.show(2000, truncate=False)
df.write.format("parquet").save('/tmp/test.parquet')
schema = getType()
cnt = creataeParquet(schema)