Spark SQL JOIN操作代码示例
title: Spark SQL JOIN操作
date: 2021-05-08 15:53:21
tags:
- Spark
本文主要介紹 Spark SQL 的多表連接,需要預先準備測試數據。分別創建員工和部門的 Datafame,并注冊為臨時視圖.
一、數據準備
本文主要介紹 Spark SQL 的多表連接,需要預先準備測試數據。分別創建員工和部門的 Datafame,并注冊為臨時視圖,代碼如下:
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()val empDF = spark.read.json("/usr/file/json/emp.json") empDF.createOrReplaceTempView("emp")val deptDF = spark.read.json("/usr/file/json/dept.json") deptDF.createOrReplaceTempView("dept")兩表的主要字段如下:
emp 員工表|-- ENAME: 員工姓名|-- DEPTNO: 部門編號|-- EMPNO: 員工編號|-- HIREDATE: 入職時間|-- JOB: 職務|-- MGR: 上級編號|-- SAL: 薪資|-- COMM: 獎金 dept 部門表|-- DEPTNO: 部門編號|-- DNAME: 部門名稱|-- LOC: 部門所在城市注:emp.json,dept.json 可以在本倉庫的resources 目錄進行下載。
二、連接類型
Spark 中支持多種連接類型:
Inner Join : 內連接; Full Outer Join : 全外連接; Left Outer Join : 左外連接; Right Outer Join : 右外連接; Left Semi Join : 左半連接; Left Anti Join : 左反連接; Natural Join : 自然連接; Cross (or Cartesian) Join : 交叉 (或笛卡爾) 連接。其中內,外連接,笛卡爾積均與普通關系型數據庫中的相同,如下圖所示:
這里解釋一下左半連接和左反連接,這兩個連接等價于關系型數據庫中的 IN 和 NOT IN 字句:
-- LEFT SEMI JOIN SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno -- 等價于如下的 IN 語句 SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept)-- LEFT ANTI JOIN SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno -- 等價于如下的 IN 語句 SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)所有連接類型的示例代碼如下:
2.1 INNER JOIN
// 1.定義連接表達式 val joinExpression = empDF.col("deptno") === deptDF.col("deptno") // 2.連接查詢 empDF.join(deptDF,joinExpression).select("ename","dname").show()// 等價 SQL 如下: spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()2.2 FULL OUTER JOIN
empDF.join(deptDF, joinExpression, "outer").show() spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()2.3 LEFT OUTER JOIN
empDF.join(deptDF, joinExpression, "left_outer").show() spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()2.4 RIGHT OUTER JOIN
empDF.join(deptDF, joinExpression, "right_outer").show() spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()2.5 LEFT SEMI JOIN
empDF.join(deptDF, joinExpression, "left_semi").show() spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()2.6 LEFT ANTI JOIN
empDF.join(deptDF, joinExpression, "left_anti").show() spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()2.7 CROSS JOIN
empDF.join(deptDF, joinExpression, "cross").show() spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()2.8 NATURAL JOIN
自然連接是在兩張表中尋找那些數據類型和列名都相同的字段,然后自動地將他們連接起來,并返回所有符合條件的結果。
spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()以下是一個自然連接的查詢結果,程序自動推斷出使用兩張表都存在的 dept 列進行連接,其實際等價于:
spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show()由于自然連接常常會產生不可預期的結果,所以并不推薦使用。
三、連接的執行
在對大表與大表之間進行連接操作時,通常都會觸發 Shuffle Join,兩表的所有分區節點會進行 All-to-All 的通訊,這種查詢通常比較昂貴,會對網絡 IO 會造成比較大的負擔。
而對于大表和小表的連接操作,Spark 會在一定程度上進行優化,如果小表的數據量小于 Worker Node 的內存空間,Spark 會考慮將小表的數據廣播到每一個 Worker Node,在每個工作節點內部執行連接計算,這可以降低網絡的 IO,但會加大每個 Worker Node 的 CPU 負擔。
是否采用廣播方式進行 Join 取決于程序內部對小表的判斷,如果想明確使用廣播方式進行 Join,則可以在 DataFrame API 中使用 broadcast 方法指定需要廣播的小表:
empDF.join(broadcast(deptDF), joinExpression).show()參考鏈接:
https://blog.csdn.net/m0_37809146/article/details/91282446
總結
以上是生活随笔為你收集整理的Spark SQL JOIN操作代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: GIS基本知识学习PDF文档
- 下一篇: Spark弹性式数据集RDDs