温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何进行delta lake 的curd操作

发布时间:2021-12-23 16:48:25 来源:亿速云 阅读:139 作者:柒染 栏目:大数据

这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

delta lake 的表支持删除和更新数据的语法,下面主要是从sql和scala两个语法说起吧。

1. 删除delta 表数据

可以根据查询条件,从delta表中删除数据,比如删除日期在2017年之前的数据,sql和scala的表达语法如下。

sql

DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

scala

import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

请注意,delete操作会将数据从delta 表的最新版本中删除,但其实只有到历史版本直接被vacuum清空的时候,才会从物理存储中删除数据。

2. 更新表

可以更新满足条件的表。比如想更新eventType的字段字符串的编写失误,可以使用下面的表达,sql和scala的表达分别如下:

sql

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

scala

import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string  "eventType = 'clck'",  Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.update(                // predicate using Spark SQL functions and implicits  col("eventType") === "clck",  Map("eventType" -> lit("click")));

3.merge算子实现upsert操作

使用merge操作可以将source表,view,dataframe中的数据upsert到目标的delta lake表中。该操作很像传统数据库的merge into操作,但是额外的支持删除操作,和更新,插入和删除的额外条件。

假设你计算过程中生成了一个dataframe,元素是events,包含eventId。而且该dataframe中数据部分数据的eventId已经在events表中存在了。这个时候就可以使用merge into实现,eventId存在的话就更新其对应的值,不存在就插入其对应的值。实现表达式如下:

sql

MERGE INTO eventsUSING updatesON events.eventId = updates.eventIdWHEN MATCHED THEN  UPDATE SET events.data = updates.dataWHEN NOT MATCHED  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

scala

import io.delta.tables._import org.apache.spark.sql.functions._
val updatesDF = ...  // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")  .as("events")  .merge(    updatesDF.as("updates"),    "events.eventId = updates.eventId")  .whenMatched  .updateExpr(    Map("data" -> "updates.data"))  .whenNotMatched  .insertExpr(    Map(      "date" -> "updates.date",      "eventId" -> "updates.eventId",      "data" -> "updates.data"))  .execute()

关于如何进行delta lake 的curd操作就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI