PySpark Pandas_Udf()

Pyspark Pandas Udf



Transformacija PySpark DataFramea moguća je pomoću funkcije pandas_udf(). To je korisnički definirana funkcija koja se primjenjuje na PySpark DataFrame sa strelicom. Možemo izvesti vektorizirane operacije pomoću pandas_udf(). Može se implementirati prosljeđivanjem ove funkcije kao dekoratora. Uronimo u ovaj vodič kako bismo upoznali sintaksu, parametre i različite primjere.

Tema sadržaja:

Ako želite znati o PySpark DataFrameu i instalaciji modula, prođite kroz ovo članak .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () dostupan je u modulu sql.functions u PySparku koji se može uvesti pomoću ključne riječi 'from'. Koristi se za izvođenje vektoriziranih operacija na našem PySpark DataFrameu. Ova funkcija je implementirana kao dekorater prosljeđivanjem tri parametra. Nakon toga možemo izraditi korisnički definiranu funkciju koja vraća podatke u vektorskom formatu (kao što za to koristimo series/NumPy) pomoću strelice. Unutar ove funkcije možemo vratiti rezultat.



Struktura i sintaksa:



Prvo, pogledajmo strukturu i sintaksu ove funkcije:

@pandas_udf(tip podataka)
def naziv_funkcije(operacija) -> format_konvertiranja:
povratna izjava

Ovdje je naziv_funkcije naziv naše definirane funkcije. Vrsta podataka specificira vrstu podataka koju ova funkcija vraća. Rezultat možemo vratiti pomoću ključne riječi 'return'. Sve operacije se izvode unutar funkcije s dodjelom strelice.





Pandas_udf (funkcija i vrsta povrata)

  1. Prvi parametar je korisnički definirana funkcija koja mu se prosljeđuje.
  2. Drugi parametar koristi se za određivanje povratnog tipa podataka iz funkcije.

Podaci:

U cijelom ovom vodiču koristimo samo jedan PySpark DataFrame za demonstraciju. Sve korisnički definirane funkcije koje definiramo primjenjuju se na ovaj PySpark DataFrame. Provjerite jeste li prvo stvorili ovaj DataFrame u svom okruženju nakon instalacije PySpark-a.



uvoz pyspark

iz pyspark.sql uvesti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Savjet za Linux' ).getOrCreate()

iz pyspark.sql.functions uvoz pandas_udf

iz pyspark.sql.types import *

uvezi pande kao pande

# detalji o povrću

povrće =[{ 'tip' : 'povrće' , 'Ime' : 'rajčica' , 'locate_country' : 'SAD' , 'količina' : 800 },

{ 'tip' : 'voće' , 'Ime' : 'banana' , 'locate_country' : 'KINA' , 'količina' : dvadeset },

{ 'tip' : 'povrće' , 'Ime' : 'rajčica' , 'locate_country' : 'SAD' , 'količina' : 800 },

{ 'tip' : 'povrće' , 'Ime' : 'Mango' , 'locate_country' : 'JAPAN' , 'količina' : 0 },

{ 'tip' : 'voće' , 'Ime' : 'limun' , 'locate_country' : 'INDIJA' , 'količina' : 1700 },

{ 'tip' : 'povrće' , 'Ime' : 'rajčica' , 'locate_country' : 'SAD' , 'količina' : 1200 },

{ 'tip' : 'povrće' , 'Ime' : 'Mango' , 'locate_country' : 'JAPAN' , 'količina' : 0 },

{ 'tip' : 'voće' , 'Ime' : 'limun' , 'locate_country' : 'INDIJA' , 'količina' : 0 }

]

# kreirajte tržišni podatkovni okvir iz gornjih podataka

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Izlaz:

Ovdje stvaramo ovaj DataFrame s 4 stupca i 8 redaka. Sada koristimo pandas_udf() za stvaranje korisnički definiranih funkcija i njihovu primjenu na ove stupce.

Pandas_udf() s različitim tipovima podataka

U ovom scenariju stvaramo neke korisnički definirane funkcije s pandas_udf() i primjenjujemo ih na stupce te prikazujemo rezultate pomoću metode select(). U svakom slučaju koristimo pandas.Series dok izvodimo vektorizirane operacije. Time se vrijednosti stupca smatraju jednodimenzionalnim nizom i operacija se primjenjuje na stupac. U samom dekoratoru specificiramo povratni tip funkcije.

Primjer 1: Pandas_udf() s vrstom niza

Ovdje stvaramo dvije korisnički definirane funkcije s tipom povrata niza za pretvaranje vrijednosti stupca tipa niza u velika i mala slova. Konačno, ove funkcije primjenjujemo na stupce 'type' i 'locate_country'.

# Pretvorite stupac vrste u velika slova pomoću pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

vrati i.str.upper()

# Pretvorite stupac locate_country u mala slova s ​​pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

vrati i.str.lower()

# Prikaži stupce koristeći select()

market_df.select( 'tip' ,upišite_velika_slova( 'tip' ), 'lociraj državu' ,
država_malim slovima( 'lociraj državu' )).pokazati()

Izlaz:

Obrazloženje:

Funkcija StringType() dostupna je u modulu pyspark.sql.types. Već smo uvezli ovaj modul dok smo stvarali PySpark DataFrame.

  1. Prvo, UDF (korisnički definirana funkcija) vraća nizove velikim slovima pomoću funkcije str.upper(). St.upper() dostupan je u strukturi podataka niza (jer pretvaramo u niz sa strelicom unutar funkcije) koji pretvara dati niz u velika slova. Konačno, ova se funkcija primjenjuje na stupac 'type' koji je naveden unutar metode select(). Prethodno su svi nizovi u stupcu tipa bili pisani malim slovima. Sada su promijenjena u velika slova.
  2. Drugo, UDF vraća nizove u velikim slovima pomoću funkcije str.lower(). Funkcija str.lower() dostupna je u strukturi podataka serije koja zadani niz pretvara u mala slova. Konačno, ova se funkcija primjenjuje na stupac 'type' koji je naveden unutar metode select(). Prethodno su svi nizovi u stupcu tipa bili pisani velikim slovima. Sada su promijenjena u mala slova.

Primjer 2: Pandas_udf() s tipom Integer

Kreirajmo UDF koji pretvara PySpark DataFrame stupac s cijelim brojem u Pandas niz i dodamo 100 svakoj vrijednosti. Proslijedite stupac 'količina' ovoj funkciji unutar metode select().

# Dodajte 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

vratiti i+ 100

# Proslijedite stupac količine u gornju funkciju i prikaz.

market_df.select( 'količina' ,dodaj_100( 'količina' )).pokazati()

Izlaz:

Obrazloženje:

Unutar UDF-a ponavljamo sve vrijednosti i pretvaramo ih u serije. Nakon toga svakoj vrijednosti u seriji dodajemo 100. Na kraju, ovoj funkciji prosljeđujemo stupac 'količina' i možemo vidjeti da se 100 dodaje svim vrijednostima.

Pandas_udf() s različitim tipovima podataka koristeći Groupby() & Agg()

Pogledajmo primjere za prosljeđivanje UDF-a agregiranim stupcima. Ovdje se vrijednosti stupaca prvo grupiraju pomoću funkcije groupby(), a agregacija se vrši pomoću funkcije agg(). Naš UDF prosljeđujemo unutar ove agregatne funkcije.

Sintaksa:

pyspark_dataframe_object.groupby( 'grouping_column' ).agg(UDF
(pyspark_dataframe_object[ 'stupac' ]))

Ovdje se prvo grupiraju vrijednosti u stupcu za grupiranje. Zatim se vrši agregacija za svaki grupirani podatak s obzirom na naš UDF.

Primjer 1: Pandas_udf() s agregatnom sredinom()

Ovdje stvaramo korisnički definiranu funkciju s povratnom vrstom float. Unutar funkcije izračunavamo prosjek pomoću funkcije mean(). Ovaj UDF se prosljeđuje u stupac 'količina' kako bi se dobila prosječna količina za svaku vrstu.

# vrati srednju vrijednost/prosjek

@pandas_udf( 'plutati' )

def prosječna_funkcija(i: panda.Series) -> float:

vrati i.mean()

# Proslijedite stupac količine u funkciju grupiranjem stupca tipa.

market_df.groupby( 'tip' ).agg(prosječna_funkcija(market_df[ 'količina' ])).pokazati()

Izlaz:

Grupiramo na temelju elemenata u stupcu 'vrsta'. Formiraju se dvije skupine – “voće” i “povrće”. Za svaku grupu se izračunava i vraća srednja vrijednost.

Primjer 2: Pandas_udf() s Aggregate Max() i Min()

Ovdje stvaramo dvije korisnički definirane funkcije s povratnom vrstom cijelog broja (int). Prvi UDF vraća minimalnu vrijednost, a drugi UDF vraća maksimalnu vrijednost.

# pandas_udf koji vraća minimalnu vrijednost

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

vrati i.min()

# pandas_udf koji vraća maksimalnu vrijednost

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

vrati i.max()

# Proslijedite stupac količine u min_ pandas_udf grupiranjem locate_country.

market_df.groupby( 'lociraj državu' ).agg(min_(market_df[ 'količina' ])).pokazati()

# Proslijedite stupac količine u max_ pandas_udf grupiranjem locate_country.

market_df.groupby( 'lociraj državu' ).agg(max_(market_df[ 'količina' ])).pokazati()

Izlaz:

Za vraćanje minimalnih i maksimalnih vrijednosti koristimo funkcije min() i max() u tipu povrata UDF-ova. Sada grupiramo podatke u stupac 'locate_country'. Formiraju se četiri skupine (“KINA”, “INDIJA”, “JAPAN”, “SAD”). Za svaku grupu vraćamo maksimalnu količinu. Slično, vraćamo minimalnu količinu.

Zaključak

U osnovi, pandas_udf () se koristi za izvođenje vektoriziranih operacija na našem PySpark DataFrameu. Vidjeli smo kako stvoriti pandas_udf() i primijeniti ga na PySpark DataFrame. Radi boljeg razumijevanja, raspravljali smo o različitim primjerima uzimajući u obzir sve tipove podataka (string, float i integer). Moguće je koristiti pandas_udf() s groupby() putem funkcije agg().