Ugrás a fő tartalomhoz

DataFrame műveletek

A műveletek többnyire megegyeznek az SQL-ből ismert műveletekkel.

Tekintsük az előző fejezetben is használt dolgozo.csv adatfájlt.

from pyspark.sql import  *
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

df = spark.read.option('header', True).option('inferschema', True).csv('dolgozo.csv')

A műveletek használatához fontos megnézni, hogyan tudunk az oszlopokra hivatkozni. Erre három megoldás létezik:

  • string literállal: OSZLOP
  • adattagként: df.OSZLOP vagy df['OSZLOP'] (ahol df a dataframe amibe az oszlop van)
  • col függvénnyel: col('OSZLOP')
veszély

A három hivatkozási mód általában ekvivalens, de néha előfordul, hogy nem kikövetkeztethető, hogy egy oszlopnevet tartalmazó string literál tényleg oszlopra hivatkozik-e. Ekkor a másik módszerek egyikét kell használni.

Ilyen kifejezés például a 'JUTALEK' + 'FIZETES'.

Vetítés

select(*cols)

Oszlophivatkozásokat vár, és egy olyan dataframe-et ad vissza, mely a megadott oszlopokat tartalmazza.

df.select('DNEV', df.FIZETES, df['FOGLALKOZAS'], col('OAZON')).show(4)
DNEVFIZETESFOGLALKOZASOAZON
KING5000PRESIDENT10
BLAKE2850MANAGER30
CLARK2450MANAGER10
JONES2975MANAGER20

Ezzel akár új oszlopokat is bevezethetünk:

df.select(df.DNEV, (12 * df.FIZETES + 12 * df.JUTALEK).alias('KERESET')).show(4)
DNEVKERESET
KING60000
BLAKE34200
CLARK29400
JONES35700
megjegyzés

A Python támogatja az operátor túlterhelést így a (12 * df.FIZETES + 12 * df.JUTALEK) kifejezés eredménye egy új oszlop objektum, aminek az alias() metódussal adunk nevet.

Hasonlóan az SQL-hez, ha a vetítésben szerepel a * karakter minden oszlop bele kerül az új dataframe-be.

df.select('*').show(4)
DKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEKOAZON
7839KINGPRESIDENT081-NOV-175000010
7698BLAKEMANAGER783981-MAY-012850030
7782CLARKMANAGER783981-JUN-092450010
7566JONESMANAGER783981-APR-022975020

selectExpr(*expr)

Tudunk SQL kifejezés alapján is vetíteni. (Az SQL kifejezésekről a következő fejezetben lesz szó.)

df.selectExpr('DNEV', '(FIZETES + JUTALEK) as KERESET').show(2)
DNEVKERESET
KING5000
BLAKE2850

withColumn(colName, col)

Hozzáveszi a dataframe-hez a col oszlopot, melyet colName-ként nevez el.

df.withColumn('KERESET', df.FIZETES + df.JUTALEK).show(4)
DKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEKOAZONKERESET
7839KINGPRESIDENT081-NOV-1750000105000
7698BLAKEMANAGERNULL81-MAY-0128500302850
7782CLARKMANAGER783981-JUN-0924500102450
7566JONESMANAGER783981-APR-0229750202975

withColumnRenamed(existing, new)

Átnevezi a megadott oszlopot. Ha nem létezik az oszlop létrehozza.

df.withColumnRenamed('FIZETES', 'HAVI_FIZETES').show(4)
DKODDNEVFOGLALKOZASFONOKEBELEPESHAVI_FIZETESJUTALEKOAZON
7839KINGPRESIDENT081-NOV-175000010
7698BLAKEMANAGERNULL81-MAY-012850030
7782CLARKMANAGER783981-JUN-092450010
7566JONESMANAGER783981-APR-022975020

drop(*cols)

A drop() metódussal lehetőségünk van oszlopokat elhagyni.

df.drop(df.OAZON).show(4)
DKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEK
7839KINGPRESIDENT081-NOV-1750000
7698BLAKEMANAGER783981-MAY-0128500
7782CLARKMANAGER783981-JUN-0924500
7566JONESMANAGER783981-APR-0229750

Lekérdezések ekvivalenciája

A következő művelettípus előtt beszélnünk kell kicsit, arról, hogy mikor számít kép művelet azonosnak. A Spark (hasonlóan az adatbázis-kezelő rendszerekhez) egy végrehajtási tervet készít amit fizikai tervnek nevezünk.

Két lekérdezés ekvivalens ha a fizikai terveik megegyeznek, amit a sameSemantics(self, dataframe) metódussal tudunk ellenőrizni.

df.withColumn('KERESET', df.FIZETES + df.JUTALEK).sameSemantics(
df.withColumn('KERESET', expr('FIZETES + JUTALEK'))
)
> True

Az egyes lekérdezések fizikai tervei az explain() metódussal kérhetőek le:

df.withColumn('KERESET', df.FIZETES + df.JUTALEK).explain()
== Physical Plan ==
*(1) Project [DKOD#183, DNEV#184, FOGLALKOZAS#185, FONOKE#186, BELEPES#187, FIZETES#188, JUTALEK#189, OAZON#190, (FIZETES#188 + JUTALEK#189) AS KERESET#459]
+- FileScan csv [DKOD#183,DNEV#184,FOGLALKOZAS#185,FONOKE#186,BELEPES#187,FIZETES#188,JUTALEK#189,OAZON#190] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/bb200/Documents/elte-ik-bsc/5/bigdata/spark/07/dolgozo...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DKOD:int,DNEV:string,FOGLALKOZAS:string,FONOKE:int,BELEPES:string,FIZETES:int,JUTALEK:int,...

Szűrés

Természetesen rendelkezésre áll a szűrés művelete is.

where(condition)

df.where(df.FIZETES > 3000).select(df.DNEV).show(5)
DNEV
KING
megjegyzés

A where() alias a filter() metódusra.

A különböző feltételeket az

  • &: és
  • |: vagy
  • ~: nem logikai műveletekkel tudjuk összekapcsolni.
df.filter((df.FIZETES > 2000) & ~(df.OAZON == 20)).show(4)
DKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEKOAZON
7839KINGPRESIDENT081-NOV-175000010
7698BLAKEMANAGER783981-MAY-012850030
7782CLARKMANAGER783981-JUN-092450010
veszély

A where() láncolás más végrehajtási tervet eredményez, mintha a feltételeket logikailag kapcsoltuk volna össze.

df.where(df.FIZETES > 2000).where(df.OAZON != 20).sameSemantics(
df.where((df.FIZETES > 2000) & (df.OAZON != 20))
)
> False

distinct()

Megszünteti az ismétléseket.

df.select(df.FOGLALKOZAS).distinct().count()
> 6

limit(n)

Olyan dataframe-et állít elő, mely az eredeti dataframe első n sorát tartalmazza.

Rendezés - orderBy(*cols)

A dataframe rendezhető. Ez, hogy az adott oszlop szerint növekvő vagy csökkenő sorrend legyen az asc() és desc() hívásokkal specifikálható.

df.orderBy(df.OAZON.asc(), df.FIZETES.desc()).show(5)
DKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEKOAZON
7839KINGPRESIDENT081-NOV-175000010
7782CLARKMANAGER783981-JUN-092450010
7934MILLERCLERK778282-JAN-231300010
7877LOLACLERK790281-JAN-12800010
7788SCOTTANALYST756682-DEC-093000020

Csoportosítás - groupBy()

Csoportosításra is lehetőségünk van.

df.groupBy(df.OAZON, df.FOGLALKOZAS).count().show()
OAZONFOGLALKOZAScount
20ANALYST2
20NULL1
20MANAGER1
30MANAGER1
30SALESMAN4
30CLERK1
10PRESIDENT1
20CLERK2
10CLERK2
10MANAGER1
-----------------

A csoportokon mint SQL-ben végezhetünk összegezést. Ehhez az SQL-ben megismert összegző függvények is adottak (avg, max, min, sum, count).

df.groupBy(df.OAZON).agg(sum(df.FIZETES)).alias('OSSZ').show()
OAZONsum(FIZETES)
2012675
109550
309400

Halmazműveletek

A halmazműveletek is elérhetők és az elvárt módon működnek:

  • substract(): különbség
  • intersect(): metszet
  • union(): unió

Összekapcsolás

Mint a táblák esetében lehetőségünk van a dataframe-ek összekapcsolására is. Ennek bemutatására először töltsünk be egy másik dataframe-et.

osztaly_df = spark.read.option('header', True).option('inferSchema', True).csv('osztaly.csv')
osztaly_df.show()
OAZONNEVTELEPHELY
10ACCOUNTINGNEW YORK
20RESEARCHDALLAS
30SALESCHICAGO
40OPERATIONSBOSTON

Keresztszorzat

Két dataframe keresztszorzata a crossJoin() metódussal képezhető.

osztaly_df.crossJoin(osztaly_df)

Théta összekapcsolás

Théta összekapcsolás a join(other, [conditions], type) metódus segítségével végezhető el.

Az összekapcsolási feltétel a második paraméterként feltételek tömbjeként adandó meg. Egy feltétel esetén a tömb elhagyható. Harmadik paraméter az összekapcsolás típusa ('inner' vagy 'outer').

df.join(osztaly_df, df.OAZON == osztaly_df.OAZON, 'inner').show(4)
DKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEKOAZONOAZONNEVTELEPHELY
7839KINGPRESIDENT081-NOV-17500001010ACCOUNTINGNEW YORK
7698BLAKEMANAGER783981-MAY-01285003030SALESCHICAGO
7782CLARKMANAGER783981-JUN-09245001010ACCOUNTINGNEW YORK
7566JONESMANAGER783981-APR-02297502020RESEARCHDALLAS

Természetes összekapcsolás

Ha természetes összekapcsolást szeretnénk végezni a feltételek helyett elég csak oszlopneveket megadni.

df.join(osztaly_df, 'OAZON').show(4)
OAZONDKODDNEVFOGLALKOZASFONOKEBELEPESFIZETESJUTALEKNEVTELEPHELY
107839KINGPRESIDENT081-NOV-1750000ACCOUNTINGNEW YORK
307698BLAKEMANAGER783981-MAY-0128500SALESCHICAGO
107782CLARKMANAGER783981-JUN-0924500ACCOUNTINGNEW YORK
207566JONESMANAGER783981-APR-0229750RESEARCHDALLAS