Kako čitati i pisati tablične podatke u PySparku

Kako Citati I Pisati Tablicne Podatke U Pysparku



Obrada podataka u PySparku je brža ako se podaci učitavaju u obliku tablice. Uz ovo, korištenjem SQL izraza, obrada će biti brza. Dakle, pretvaranje PySpark DataFrame/RDD-a u tablicu prije slanja na obradu je bolji pristup. Danas ćemo vidjeti kako pročitati podatke tablice u PySpark DataFrame, napisati PySpark DataFrame u tablicu i umetnuti novi DataFrame u postojeću tablicu pomoću ugrađenih funkcija. Idemo!

Pyspark.sql.DataFrameWriter.saveAsTable()

Prvo ćemo vidjeti kako napisati postojeći PySpark DataFrame u tablicu pomoću funkcije write.saveAsTable(). Potreban je naziv tablice i drugi izborni parametri kao što su modovi, partionBy, itd., za pisanje DataFramea u tablicu. Čuva se kao parket turpija.

Sintaksa:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Table_name je naziv tablice koja je stvorena iz dataframe_obj.
  2. Možemo dodati/brisati podatke u tablici pomoću parametra mode.
  3. PartitionBy uzima jedan/više stupaca za stvaranje particija na temelju vrijednosti u tim stupcima.

Primjer 1:

Napravite PySpark DataFrame s 5 redaka i 4 stupca. Zapišite ovaj podatkovni okvir u tablicu pod nazivom “Agri_Table1”.



uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

# poljoprivredni podaci s 5 redaka i 5 stupaca

agri =[{ 'Vrsta_tla' : 'Crno' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 2500 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'SAD' },

{ 'Vrsta_tla' : 'Crno' , 'dostupnost_navodnjavanja' : 'Da' , 'Akri' : 3500 , 'Stanje_tla' : 'Mokro' ,
'Zemlja' : 'Indija' },

{ 'Vrsta_tla' : 'Crvena' , 'dostupnost_navodnjavanja' : 'Da' , 'Akri' : 210 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'UK' },

{ 'Vrsta_tla' : 'Ostalo' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 1000 , 'Stanje_tla' : 'Mokro' ,
'Zemlja' : 'SAD' },

{ 'Vrsta_tla' : 'Pijesak' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 500 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'Indija' }]



# kreirajte podatkovni okvir iz gornjih podataka

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Napišite gornji DataFrame u tablicu.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tablica1' )

Izlaz:







Vidimo da je jedna datoteka parketa kreirana s prethodnim PySpark podacima.



Primjer 2:

Razmotrite prethodni DataFrame i zapišite 'Agri_Table2' u tablicu particioniranjem zapisa na temelju vrijednosti u stupcu 'Država'.

# Zapišite gornji DataFrame u tablicu s partitionBy parametrom

agri_df.write.saveAsTable( 'Agri_Tablica2' ,partitionBy=[ 'Zemlja' ])

Izlaz:

Postoje tri jedinstvene vrijednosti u stupcu 'Država' - 'Indija', 'UK' i 'SAD'. Dakle, stvorene su tri particije. Svaka pregrada drži turpije za parket.

Pyspark.sql.DataFrameReader.table()

Učitajmo tablicu u PySpark DataFrame pomoću funkcije spark.read.table(). Potreban je samo jedan parametar koji je naziv staze/tablice. Izravno učitava tablicu u PySpark DataFrame i sve SQL funkcije koje se primjenjuju na PySpark DataFrame također se mogu primijeniti na ovaj učitani DataFrame.

Sintaksa:

spark_app.read.table(path/'Table_name')

U ovom scenariju koristimo prethodnu tablicu koja je stvorena iz PySpark DataFramea. Provjerite trebate li implementirati prethodne isječke koda scenarija u svom okruženju.

Primjer:

Učitajte tablicu “Agri_Table1” u DataFrame pod nazivom “loaded_data”.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Tablica1' )

loaded_data.show()

Izlaz:

Vidimo da je tablica učitana u PySpark DataFrame.

Izvršavanje SQL upita

Sada izvršavamo neke SQL upite na učitanom DataFrameu pomoću funkcije spark.sql().

# Koristite naredbu SELECT za prikaz svih stupaca iz gornje tablice.

linuxhint_spark_app.sql( 'SELECT * iz Agri_Table1' ).pokazati()

# WHERE klauzula

linuxhint_spark_app.sql( 'SELECT * iz Agri_Table1 WHERE Soil_status='Dry' ' ).pokazati()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000 ' ).pokazati()

Izlaz:

  1. Prvi upit prikazuje sve stupce i zapise iz DataFramea.
  2. Drugi upit prikazuje zapise na temelju stupca 'Soil_status'. Postoje samo tri zapisa s elementom 'Dry'.
  3. Posljednji upit vraća dva zapisa s 'Acres' koji su veći od 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Pomoću funkcije insertInto() možemo dodati DataFrame u postojeću tablicu. Ovu funkciju možemo koristiti zajedno s selectExpr() za definiranje imena stupaca i zatim je umetnuti u tablicu. Ova funkcija također uzima TableName kao parametar.

Sintaksa:

DataFrame_obj.write.insertInto('Ime_tablice')

U ovom scenariju koristimo prethodnu tablicu koja je stvorena iz PySpark DataFramea. Provjerite trebate li implementirati prethodne isječke koda scenarija u svom okruženju.

Primjer:

Napravite novi DataFrame s dva zapisa i umetnite ih u tablicu 'Agri_Table1'.

uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

# poljoprivredni podaci s 2 retka

agri =[{ 'Vrsta_tla' : 'Pijesak' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 2500 , 'Stanje_tla' : 'suho' ,
'Zemlja' : 'SAD' },

{ 'Vrsta_tla' : 'Pijesak' , 'dostupnost_navodnjavanja' : 'Ne' , 'Akri' : 1200 , 'Stanje_tla' : 'Mokro' ,
'Zemlja' : 'Japan' }]

# kreirajte podatkovni okvir iz gornjih podataka

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Akri' , 'Zemlja' , 'dostupnost_navodnjavanja' , 'Vrsta_tla' ,
'Stanje_tla' ).write.insertInto( 'Agri_Tablica1' )

# Prikaz konačne Agri_Table1

linuxhint_spark_app.sql( 'SELECT * iz Agri_Table1' ).pokazati()

Izlaz:

Sada je ukupan broj redaka koji su prisutni u DataFrameu 7.

Zaključak

Sada razumijete kako napisati PySpark DataFrame u tablicu pomoću funkcije write.saveAsTable(). Uzima naziv tablice i druge izborne parametre. Zatim smo ovu tablicu učitali u PySpark DataFrame pomoću funkcije spark.read.table(). Potreban je samo jedan parametar koji je naziv staze/tablice. Ako želite dodati novi DataFrame u postojeću tablicu, koristite funkciju insertInto().