Pretvaranje PySpark DataFrame u CSV

Pretvaranje Pyspark Dataframe U Csv



Pogledajmo četiri različita scenarija pretvaranja PySpark DataFramea u CSV. Izravno koristimo metodu write.csv() za pretvaranje PySpark DataFramea u CSV. Pomoću funkcije to_csv() pretvaramo PySpark Pandas DataFrame u CSV. To također može biti moguće pretvaranjem u NumPy polje.

Tema sadržaja:

Ako želite znati o PySpark DataFrameu i instalaciji modula, prođite kroz ovo članak .







PySpark DataFrame u CSV pretvaranjem u Pandas DataFrame

To_csv() je metoda koja je dostupna u Pandas modulu koja pretvara Pandas DataFrame u CSV. Prvo, moramo pretvoriti naš PySpark DataFrame u Pandas DataFrame. Za to se koristi metoda toPandas(). Pogledajmo sintaksu to_csv() zajedno s njegovim parametrima.



Sintaksa:



pandas_dataframe_obj.to_csv(put/ 'ime_datoteke.csv' , Zaglavlje ,indeks,stupci,način...)
  1. Moramo navesti naziv CSV datoteke. Ako želite pohraniti preuzeti CSV na određeno mjesto na računalu, uz naziv datoteke možete navesti i put.
  2. Stupci su uključeni ako je zaglavlje postavljeno na 'Istina'. Ako ne trebate stupce, postavite zaglavlje na 'False'.
  3. Indeksi su navedeni ako je indeks postavljen na 'True'. Ako ne trebate indekse, postavite indeks na 'False'.
  4. Parametar Stupci uzima popis naziva stupaca u kojima možemo odrediti koji će stupci biti ekstrahirani u CSV datoteku.
  5. Možemo dodati zapise u CSV pomoću parametra mode. Dodavanje – za to se koristi 'a'.

Primjer 1: s parametrima zaglavlja i indeksa

Napravite “skills_df” PySpark DataFrame s 3 retka i 4 stupca. Pretvorite ovaj DataFrame u CSV tako da ga prvo pretvorite u Pandas DataFrame.





uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

# podaci o vještinama s 3 retka i 4 stupca

vještine =[{ 'iskaznica' : 123 , 'osoba' : 'Med' , 'vještina' : 'slika' , 'nagrada' : 25000 },

{ 'iskaznica' : 112 , 'osoba' : 'Mouni' , 'vještina' : 'ples' , 'nagrada' : 2000. godine },

{ 'iskaznica' : 153 , 'osoba' : 'Tulasi' , 'vještina' : 'čitanje' , 'nagrada' : 1200 }

]

# kreirajte podatkovni okvir vještina iz gornjih podataka

skills_df = linuxhint_spark_app.createDataFrame(skills)

vještina_df.show()

# Pretvorite skills_df u pandas DataFrame

pandas_skills_df= vještina_df.toPandas()

ispis(pandas_skills_df)

# Pretvorite ovaj DataFrame u csv sa zaglavljem i indeksom

pandas_skills_df.to_csv( 'pandas_skills1.csv' , Zaglavlje =Istina, indeks=Istina)

Izlaz:



Vidimo da je PySpark DataFrame pretvoren u Pandas DataFrame. Pogledajmo je li pretvoren u CSV s nazivima stupaca i indeksima:

Primjer 2: dodavanje podataka u CSV

Napravite još jedan PySpark DataFrame s 1 zapisom i dodajte ga u CSV koji je stvoren kao dio našeg prvog primjera. Provjerite moramo li postaviti zaglavlje na 'False' zajedno s parametrom načina. U suprotnom, nazivi stupaca također se dodaju kao redak.

uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

vještine =[{ 'iskaznica' : 90 , 'osoba' : 'Bhargav' , 'vještina' : 'čitanje' , 'nagrada' : 12000 }

]

# kreirajte podatkovni okvir vještina iz gornjih podataka

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Pretvorite skills_df u pandas DataFrame

pandas_skills_df= vještina_df.toPandas()

# Dodajte ovaj DataFrame u datoteku pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , način rada= 'a' , Zaglavlje =Netočno)

CSV izlaz:

Vidimo da je novi red dodan u CSV datoteku.

Primjer 3: s parametrom Columns

Uzmimo isti DataFrame i pretvorimo ga u CSV s dva stupca: 'osoba' i 'nagrada'.

uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

# podaci o vještinama s 3 retka i 4 stupca

vještine =[{ 'iskaznica' : 123 , 'osoba' : 'Med' , 'vještina' : 'slika' , 'nagrada' : 25000 },

{ 'iskaznica' : 112 , 'osoba' : 'Mouni' , 'vještina' : 'ples' , 'nagrada' : 2000. godine },

{ 'iskaznica' : 153 , 'osoba' : 'Tulasi' , 'vještina' : 'čitanje' , 'nagrada' : 1200 }

]

# kreirajte podatkovni okvir vještina iz gornjih podataka

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Pretvorite skills_df u pandas DataFrame

pandas_skills_df= vještina_df.toPandas()

# Pretvorite ovaj DataFrame u csv s određenim stupcima

pandas_skills_df.to_csv( 'pandas_skills2.csv' , stupci=[ 'osoba' , 'nagrada' ])

CSV izlaz:

Vidimo da u CSV datoteci postoje samo stupci 'osoba' i 'nagrada'.

PySpark Pandas DataFrame u CSV pomoću metode To_Csv().

To_csv() je metoda koja je dostupna u Pandas modulu koja pretvara Pandas DataFrame u CSV. Prvo, moramo pretvoriti naš PySpark DataFrame u Pandas DataFrame. Za to se koristi metoda toPandas(). Pogledajmo sintaksu to_csv() zajedno s njegovim parametrima:

Sintaksa:

pyspark_pandas_dataframe_obj.to_csv(put/ 'ime_datoteke.csv' , Zaglavlje ,indeks,stupci,...)
  1. Moramo navesti naziv CSV datoteke. Ako želite pohraniti preuzeti CSV na određeno mjesto na računalu, uz naziv datoteke možete navesti i put.
  2. Stupci su uključeni ako je zaglavlje postavljeno na 'Istina'. Ako ne trebate stupce, postavite zaglavlje na 'False'.
  3. Indeksi su navedeni ako je indeks postavljen na 'True'. Ako ne trebate indekse, postavite indeks na 'False'.
  4. Parametar stupaca uzima popis naziva stupaca u kojima možemo odrediti koji će stupci biti ekstrahirani u CSV datoteku.

Primjer 1: s parametrom Columns

Napravite PySpark Pandas DataFrame s 3 stupca i pretvorite ga u CSV koristeći to_csv() sa stupcima 'osoba' i 'nagrada'.

iz pyspark uvozne pande

pyspark_pandas_dataframe=pandas.DataFrame({ 'iskaznica' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Med' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})

ispis(pyspark_pandas_dataframe)

# Pretvorite ovaj DataFrame u csv s određenim stupcima

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , stupci=[ 'osoba' , 'nagrada' ])

Izlaz:

Vidimo da je PySpark Pandas DataFrame pretvoren u CSV s dvije particije. Svaka particija sadrži 2 zapisa. Također, stupci u CSV-u su samo 'osoba' i 'nagrada'.

Particiona datoteka 1:

Particiona datoteka 2:

Primjer 2: s parametrom zaglavlja

Upotrijebite prethodni DataFrame i odredite parametar zaglavlja tako da ga postavite na 'True'.

iz pyspark uvozne pande

pyspark_pandas_dataframe=pandas.DataFrame({ 'iskaznica' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Med' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})

# Pretvorite ovaj DataFrame u csv sa zaglavljem.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , Zaglavlje =Istina)

CSV izlaz:

Vidimo da je PySpark Pandas DataFrame pretvoren u CSV s dvije particije. Svaka particija sadrži 2 zapisa s nazivima stupaca.

Particiona datoteka 1:

Particiona datoteka 2:

PySpark Pandas DataFrame u CSV pretvaranjem u NumPy polje

Imamo opciju pretvoriti PySpark Pandas DataFrame u CSV pretvaranjem u polje Numpy. To_numpy() je metoda koja je dostupna u modulu PySpark Pandas koja pretvara PySpark Pandas DataFrame u polje NumPy.

Sintaksa:

pyspark_pandas_dataframe_obj.to_numpy()

Neće uzeti nikakve parametre.

Korištenje metode Tofile().

Nakon pretvorbe u polje NumPy, možemo koristiti metodu tofile() za pretvorbu NumPy u CSV. Ovdje pohranjuje svaki zapis u novu ćeliju u stupcu u CSV datoteci.

Sintaksa:

array_obj.to_numpy(ime datoteke/put,sep=’ ’)

Uzima naziv datoteke ili putanju CSV-a i separator.

Primjer:

Napravite PySpark Pandas DataFrame s 3 stupca i 4 zapisa i pretvorite ga u CSV tako da ga prvo pretvorite u NumPy polje.

iz pyspark uvozne pande

pyspark_pandas_dataframe=pandas.DataFrame({ 'iskaznica' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Med' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})

# Pretvorite gornji DataFrame u numpy polje

pretvoreno = pyspark_pandas_dataframe.to_numpy()

ispis (pretvoreno)

# Korištenje tofile()

pretvoreno.tofile( 'converted1.csv' , ruj = ',' )

Izlaz:

[[ 90 'Med' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'sam' 3 ]

[ 57 'radha' 4 ]]

Vidimo da je PySpark Pandas DataFrame pretvoren u NumPy polje (12 vrijednosti). Ako možete vidjeti CSV podatke, pohranjuje svaku vrijednost ćelije u novi stupac.

PySpark DataFrame u CSV pomoću metode Write.Csv().

Metoda write.csv() kao parametar uzima naziv/put datoteke gdje trebamo spremiti CSV datoteku.

Sintaksa:

dataframe_object.coalesce( 1 ).write.csv( 'naziv datoteke' )

Zapravo, CSV se sprema kao particije (više od jedne). Kako bismo se toga riješili, spajamo sve particionirane CSV datoteke u jednu. U ovom scenariju koristimo funkciju coalesce(). Sada možemo vidjeti samo jednu CSV datoteku sa svim redovima iz PySpark DataFramea.

Primjer:

Razmotrite PySpark DataFrame s 4 zapisa koji imaju 4 stupca. Zapišite ovaj DataFrame u CSV s datotekom pod nazivom “market_details”.

uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

# tržišni podaci s 4 retka i 4 stupca

tržište =[{ 'm_id' : 'mz-001' , 'moje_ime' : 'ABC' , 'm_city' : 'Delhi' , 'm_state' : 'Delhi' },

{ 'm_id' : 'mz-002' , 'moje_ime' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'lucknow' },

{ 'm_id' : 'mz-003' , 'moje_ime' : 'PQR' , 'm_city' : 'Florida' , 'm_state' : 'jedan' },

{ 'm_id' : 'mz-004' , 'moje_ime' : 'ABC' , 'm_city' : 'Delhi' , 'm_state' : 'lucknow' }

]



# kreirajte tržišni podatkovni okvir iz gornjih podataka

market_df = linuxhint_spark_app.createDataFrame(market)

# Stvarni podaci o tržištu

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'tržišni_detalji' )

Izlaz:

Provjerimo datoteku:

Otvorite posljednju datoteku da vidite zapise.

Zaključak

Naučili smo četiri različita scenarija koji pretvaraju PySpark DataFrame u CSV s primjerima uzimajući u obzir različite parametre. Kada radite s PySpark DataFrameom, imate dvije opcije za pretvaranje ovog DataFramea u CSV: jedan način je korištenje metode write(), a drugi je korištenje metode to_csv() pretvaranjem u Pandas DataFrame. Ako radite s PySpark Pandas DataFrameom, također možete koristiti to_csv() i tofile() pretvaranjem u NumPy polje.