Pyspark.sql.DataFrameWriter.saveAsTable()
Prvo ćemo vidjeti kako napisati postojeći PySpark DataFrame u tablicu pomoću funkcije write.saveAsTable(). Potreban je naziv tablice i drugi izborni parametri kao što su modovi, partionBy, itd., za pisanje DataFramea u tablicu. Čuva se kao parket turpija.
Sintaksa:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
- Table_name je naziv tablice koja je stvorena iz dataframe_obj.
- Možemo dodati/brisati podatke u tablici pomoću parametra mode.
- PartitionBy uzima jedan/više stupaca za stvaranje particija na temelju vrijednosti u tim stupcima.
Primjer 1:
Napravite PySpark DataFrame s 5 redaka i 4 stupca. Zapišite ovaj podatkovni okvir u tablicu pod nazivom “Agri_Table1”.
uvoz pyspark
iz pyspark.sql uvesti SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()
# poljoprivredni podaci s 5 redaka i 5 stupaca
agri =[{ 'Vrsta_tla' : 'Crno' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 2500 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'SAD' },
{ 'Vrsta_tla' : 'Crno' , 'dostupnost_navodnjavanja' : 'Da' , 'Akri' : 3500 , 'Stanje_tla' : 'Mokro' ,
'Zemlja' : 'Indija' },
{ 'Vrsta_tla' : 'Crvena' , 'dostupnost_navodnjavanja' : 'Da' , 'Akri' : 210 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'UK' },
{ 'Vrsta_tla' : 'Ostalo' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 1000 , 'Stanje_tla' : 'Mokro' ,
'Zemlja' : 'SAD' },
{ 'Vrsta_tla' : 'Pijesak' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 500 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'Indija' }]
# kreirajte podatkovni okvir iz gornjih podataka
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Napišite gornji DataFrame u tablicu.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tablica1' )
Izlaz:
Vidimo da je jedna datoteka parketa kreirana s prethodnim PySpark podacima.
Primjer 2:
Razmotrite prethodni DataFrame i zapišite 'Agri_Table2' u tablicu particioniranjem zapisa na temelju vrijednosti u stupcu 'Država'.
# Zapišite gornji DataFrame u tablicu s partitionBy parametromagri_df.write.saveAsTable( 'Agri_Tablica2' ,partitionBy=[ 'Zemlja' ])
Izlaz:
Postoje tri jedinstvene vrijednosti u stupcu 'Država' - 'Indija', 'UK' i 'SAD'. Dakle, stvorene su tri particije. Svaka pregrada drži turpije za parket.
Pyspark.sql.DataFrameReader.table()
Učitajmo tablicu u PySpark DataFrame pomoću funkcije spark.read.table(). Potreban je samo jedan parametar koji je naziv staze/tablice. Izravno učitava tablicu u PySpark DataFrame i sve SQL funkcije koje se primjenjuju na PySpark DataFrame također se mogu primijeniti na ovaj učitani DataFrame.
Sintaksa:
spark_app.read.table(path/'Table_name')U ovom scenariju koristimo prethodnu tablicu koja je stvorena iz PySpark DataFramea. Provjerite trebate li implementirati prethodne isječke koda scenarija u svom okruženju.
Primjer:
Učitajte tablicu “Agri_Table1” u DataFrame pod nazivom “loaded_data”.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Tablica1' )loaded_data.show()
Izlaz:
Vidimo da je tablica učitana u PySpark DataFrame.
Izvršavanje SQL upita
Sada izvršavamo neke SQL upite na učitanom DataFrameu pomoću funkcije spark.sql().
# Koristite naredbu SELECT za prikaz svih stupaca iz gornje tablice.linuxhint_spark_app.sql( 'SELECT * iz Agri_Table1' ).pokazati()
# WHERE klauzula
linuxhint_spark_app.sql( 'SELECT * iz Agri_Table1 WHERE Soil_status='Dry' ' ).pokazati()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000 ' ).pokazati()
Izlaz:
- Prvi upit prikazuje sve stupce i zapise iz DataFramea.
- Drugi upit prikazuje zapise na temelju stupca 'Soil_status'. Postoje samo tri zapisa s elementom 'Dry'.
- Posljednji upit vraća dva zapisa s 'Acres' koji su veći od 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Pomoću funkcije insertInto() možemo dodati DataFrame u postojeću tablicu. Ovu funkciju možemo koristiti zajedno s selectExpr() za definiranje imena stupaca i zatim je umetnuti u tablicu. Ova funkcija također uzima TableName kao parametar.
Sintaksa:
DataFrame_obj.write.insertInto('Ime_tablice')U ovom scenariju koristimo prethodnu tablicu koja je stvorena iz PySpark DataFramea. Provjerite trebate li implementirati prethodne isječke koda scenarija u svom okruženju.
Primjer:
Napravite novi DataFrame s dva zapisa i umetnite ih u tablicu 'Agri_Table1'.
uvoz pysparkiz pyspark.sql uvesti SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()
# poljoprivredni podaci s 2 retka
agri =[{ 'Vrsta_tla' : 'Pijesak' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 2500 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'SAD' },
{ 'Vrsta_tla' : 'Pijesak' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 1200 , 'Stanje_tla' : 'Mokro' ,
'Zemlja' : 'Japan' }]
# kreirajte podatkovni okvir iz gornjih podataka
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Akri' , 'Zemlja' , 'dostupnost_navodnjavanja' , 'Vrsta_tla' ,
'Stanje_tla' ).write.insertInto( 'Agri_Tablica1' )
# Prikaz konačne Agri_Table1
linuxhint_spark_app.sql( 'SELECT * iz Agri_Table1' ).pokazati()
Izlaz:
Sada je ukupan broj redaka koji su prisutni u DataFrameu 7.
Zaključak
Sada razumijete kako napisati PySpark DataFrame u tablicu pomoću funkcije write.saveAsTable(). Uzima naziv tablice i druge izborne parametre. Zatim smo ovu tablicu učitali u PySpark DataFrame pomoću funkcije spark.read.table(). Potreban je samo jedan parametar koji je naziv staze/tablice. Ako želite dodati novi DataFrame u postojeću tablicu, koristite funkciju insertInto().