DataFrame műveletek
A műveletek többnyire megegyeznek az SQL-ből ismert műveletekkel.
Tekintsük az előző fejezetben is használt dolgozo.csv
adatfájlt.
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()
df = spark.read.option('header', True).option('inferschema', True).csv('dolgozo.csv')
A műveletek használatához fontos megnézni, hogyan tudunk az oszlopokra hivatkozni. Erre három megoldás létezik:
- string literállal:
OSZLOP
- adattagként:
df.OSZLOP
vagydf['OSZLOP']
(ahol df a dataframe amibe az oszlop van) col
függvénnyel:col('OSZLOP')
A három hivatkozási mód általában ekvivalens, de néha előfordul, hogy nem kikövetkeztethető, hogy egy oszlopnevet tartalmazó string literál tényleg oszlopra hivatkozik-e. Ekkor a másik módszerek egyikét kell használni.
Ilyen kifejezés például a 'JUTALEK' + 'FIZETES'
.
Vetítés
select(*cols)
Oszlophivatkozásokat vár, és egy olyan dataframe-et ad vissza, mely a megadott oszlopokat tartalmazza.
df.select('DNEV', df.FIZETES, df['FOGLALKOZAS'], col('OAZON')).show(4)
DNEV | FIZETES | FOGLALKOZAS | OAZON |
---|---|---|---|
KING | 5000 | PRESIDENT | 10 |
BLAKE | 2850 | MANAGER | 30 |
CLARK | 2450 | MANAGER | 10 |
JONES | 2975 | MANAGER | 20 |
Ezzel akár új oszlopokat is bevezethetünk:
df.select(df.DNEV, (12 * df.FIZETES + 12 * df.JUTALEK).alias('KERESET')).show(4)
DNEV | KERESET |
---|---|
KING | 60000 |
BLAKE | 34200 |
CLARK | 29400 |
JONES | 35700 |
A Python támogatja az operátor túlterhelést így a (12 * df.FIZETES + 12 * df.JUTALEK)
kifejezés eredménye egy új
oszlop objektum, aminek az alias()
metódussal adunk nevet.
Hasonlóan az SQL-hez, ha a vetítésben szerepel a *
karakter minden oszlop bele kerül az új dataframe-be.
df.select('*').show(4)
DKOD | DNEV | FOGLALKOZAS | FONOKE | BELEPES | FIZETES | JUTALEK | OAZON |
---|---|---|---|---|---|---|---|
7839 | KING | PRESIDENT | 0 | 81-NOV-17 | 5000 | 0 | 10 |
7698 | BLAKE | MANAGER | 7839 | 81-MAY-01 | 2850 | 0 | 30 |
7782 | CLARK | MANAGER | 7839 | 81-JUN-09 | 2450 | 0 | 10 |
7566 | JONES | MANAGER | 7839 | 81-APR-02 | 2975 | 0 | 20 |
selectExpr(*expr)
Tudunk SQL kifejezés alapján is vetíteni. (Az SQL kifejezésekről a következő fejezetben lesz szó.)
df.selectExpr('DNEV', '(FIZETES + JUTALEK) as KERESET').show(2)
DNEV | KERESET |
---|---|
KING | 5000 |
BLAKE | 2850 |
withColumn(colName, col)
Hozzáveszi a dataframe-hez a col
oszlopot, melyet colName
-ként nevez el.
df.withColumn('KERESET', df.FIZETES + df.JUTALEK).show(4)
DKOD | DNEV | FOGLALKOZAS | FONOKE | BELEPES | FIZETES | JUTALEK | OAZON | KERESET |
---|---|---|---|---|---|---|---|---|
7839 | KING | PRESIDENT | 0 | 81-NOV-17 | 5000 | 0 | 10 | 5000 |
7698 | BLAKE | MANAGER | NULL | 81-MAY-01 | 2850 | 0 | 30 | 2850 |
7782 | CLARK | MANAGER | 7839 | 81-JUN-09 | 2450 | 0 | 10 | 2450 |
7566 | JONES | MANAGER | 7839 | 81-APR-02 | 2975 | 0 | 20 | 2975 |
withColumnRenamed(existing, new)
Átnevezi a megadott oszlopot. Ha nem létezik az oszlop létrehozza.
df.withColumnRenamed('FIZETES', 'HAVI_FIZETES').show(4)
DKOD | DNEV | FOGLALKOZAS | FONOKE | BELEPES | HAVI_FIZETES | JUTALEK | OAZON |
---|---|---|---|---|---|---|---|
7839 | KING | PRESIDENT | 0 | 81-NOV-17 | 5000 | 0 | 10 |
7698 | BLAKE | MANAGER | NULL | 81-MAY-01 | 2850 | 0 | 30 |
7782 | CLARK | MANAGER | 7839 | 81-JUN-09 | 2450 | 0 | 10 |
7566 | JONES | MANAGER | 7839 | 81-APR-02 | 2975 | 0 | 20 |
drop(*cols)
A drop()
metódussal lehetőségünk van oszlopokat elhagyni.
df.drop(df.OAZON).show(4)
DKOD | DNEV | FOGLALKOZAS | FONOKE | BELEPES | FIZETES | JUTALEK |
---|---|---|---|---|---|---|
7839 | KING | PRESIDENT | 0 | 81-NOV-17 | 5000 | 0 |
7698 | BLAKE | MANAGER | 7839 | 81-MAY-01 | 2850 | 0 |
7782 | CLARK | MANAGER | 7839 | 81-JUN-09 | 2450 | 0 |
7566 | JONES | MANAGER | 7839 | 81-APR-02 | 2975 | 0 |
Lekérdezések ekvivalenciája
A következő művelettípus előtt beszélnünk kell kicsit, arról, hogy mikor számít kép művelet azonosnak. A Spark (hasonlóan az adatbázis-kezelő rendszerekhez) egy végrehajtási tervet készít amit fizikai tervnek nevezünk.
Két lekérdezés ekvivalens ha a fizikai terveik megegyeznek, amit a sameSemantics(self, dataframe)
metódussal tudunk
ellenőrizni.
df.withColumn('KERESET', df.FIZETES + df.JUTALEK).sameSemantics(
df.withColumn('KERESET', expr('FIZETES + JUTALEK'))
)
> True
Az egyes lekérdezések fizikai tervei az explain()
metódussal kérhetőek le:
df.withColumn('KERESET', df.FIZETES + df.JUTALEK).explain()
== Physical Plan ==
*(1) Project [DKOD#183, DNEV#184, FOGLALKOZAS#185, FONOKE#186, BELEPES#187, FIZETES#188, JUTALEK#189, OAZON#190, (FIZETES#188 + JUTALEK#189) AS KERESET#459]
+- FileScan csv [DKOD#183,DNEV#184,FOGLALKOZAS#185,FONOKE#186,BELEPES#187,FIZETES#188,JUTALEK#189,OAZON#190] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/bb200/Documents/elte-ik-bsc/5/bigdata/spark/07/dolgozo...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DKOD:int,DNEV:string,FOGLALKOZAS:string,FONOKE:int,BELEPES:string,FIZETES:int,JUTALEK:int,...
Szűrés
Természetesen rendelkezésre áll a szűrés művelete is.
where(condition)
df.where(df.FIZETES > 3000).select(df.DNEV).show(5)
DNEV |
---|
KING |
A where()
alias a filter()
metódusra.
A különböző feltételeket az
&
: és|
: vagy~
: nem logikai műveletekkel tudjuk összekapcsolni.
df.filter((df.FIZETES > 2000) & ~(df.OAZON == 20)).show(4)
DKOD | DNEV | FOGLALKOZAS | FONOKE | BELEPES | FIZETES | JUTALEK | OAZON |
---|---|---|---|---|---|---|---|
7839 | KING | PRESIDENT | 0 | 81-NOV-17 | 5000 | 0 | 10 |
7698 | BLAKE | MANAGER | 7839 | 81-MAY-01 | 2850 | 0 | 30 |
7782 | CLARK | MANAGER | 7839 | 81-JUN-09 | 2450 | 0 | 10 |
A where()
láncolás más végrehajtási tervet eredményez, mintha a feltételeket logikailag kapcsoltuk volna össze.
df.where(df.FIZETES > 2000).where(df.OAZON != 20).sameSemantics(
df.where((df.FIZETES > 2000) & (df.OAZON != 20))
)
> False