用Apache Spark 2.2中的结构化流处理API处理Kafka数据

编者按

本文译自Processing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2, 类似编者翻译的另一篇文章,本文用实际的例子演示了Spark Structured Streaming和Kafka整合的方法

文中涉及到的一些开源项目源代码学习:

我们会在本文中演示如何利用Spark的SQL API接收和处理从Apache Kafka得到的复杂数据流。通过使用一些简单的API你就可以做一些复杂的操作,比如事件触发的一次性数据聚合,并把输出写到不同系统中。同时使用Apache Spark和Apache Spark可以用来:

  • 使用和批处理一样的API对Apache Kafka中的试试数据做变换和清洗
  • 将存储在其他诸如S3,HDFS,MySQL这些系统中的信息和Apache Kafka中的数据整合起来
  • 自动得益于Catalyst优化器带来的增量执行,以及后面步骤中Tungsten的高效代码生成

我们会先过一下Kafka的一些术语,然后演示结构化流处理查询如何从Apache Kafka读写数据。最后,我们实现一个现实中的端到端的例子。

Apache Kafka

Apache Kafka是一种分布式的发布-订阅消息系统,它的流行主要因为它能事实处理数据流并且能同时让下游消费者得到数据,并且容错性好。这也使得Kafka非常合适用来构造实时流数据处理系统使数据在不同处理系统中流动。在我们深究结构化流处理API对Kafka的支持之前,我们想来重温一些基本概念和术语。

数据在Kafka中被组织到不同的topic中去,topic又被分成不同的分区以便并行处理。每个分区都是一些有序的,不可变的连续数据记录。你可以将其想成是结构化的提交日志。生产者把数据记录附加到日志的尾部,消费者则以他们自己的节奏读取这些日志。多个消费者可以订阅同一个topic来获取新进来的数据记录。当新数据记录到达Kafka topic的一个分区中时,它们会被分配一个被叫做偏移的Id。Kafka集群保存所有的发布过的数据记录,无论其被消费过没。保留的时间期限可以被配置,超过保留期限的记录会被删除。

指定从Kafka中读哪种数据


一个Kafka topic可以被看做无限的数据流,其中数据会被保留一段时间,时间长短可以被设置。其无限的属性决定了当我们要做一个查询的时候,我们要先确定读取那种数据以及从何时开始。大致上说有三种选项:

earlist - 从流数据的开头开始处理。这种情况不会包括已经被Kafka删掉的数据

latest - 从现在开始,只处理查询开始后进来的数据
per-partition assignment - 对于每个分区都确切的指定从哪个偏移量开始读取,从而可以精确控制每个分区从哪里开始处理。比如,如果我们想从其他的系统或者查询终止的地方继续,就可以用这个选项

接下来你会看到,startinfOffsets设置接受上述三个选项,而且只有当我们从一个全新的存盘点开始一次查询的时候才会被用。如果你从一个现有的存盘点重启一个查询,则会从之前中断的地方继续,除非中断的地方已经被清理了。如果有未被处理的数据被清理的话,查询的行为就取决于failOnDataLoss的设置了,这个问题在Kafka整合指南中有详细讲解。

已经在使用KafkaConsumer的用户会注意到结构化流处理API里把auto.offset.reset这个设置做的更精细了。我们实际上把这个选项拆成了两个选项,一个表明数据流从何处开始(startingOffsets),另一个表明如果如果有数据已经被清理,导致查询无法从上次中断的地方开始的情况下,该如何处理这种情况(failOnDataLoss)。

结构化流处理中对Apache Kafka的支持

结构化流处理中对Apache Kafka的支持提供了统一的批处理和流处理API,这使我们能把发布到Kafka中的数据看成是一个DataFrame。当将无界的数据以数据流方式处理的时候,我们和批处理的时候用一样的API并且得到一样的数据一致性保证。系统保证了端到端,有且仅一次,容错的特性,这样用户不需要深究流数据的底层原理。

我们下面来探究下从Kafka读写的实例,再看一个端到端的应用。

从Kafka的topic中读数据记录

第一步是确定我们kafka的集群地址以及我们要从那个topic中读取。Spark允许你读单个topic,指定的某些topic,匹配正则表达式的所有topic,甚至是某些topic中的某些指定分区。我们的例子中会从单个topic读取,其他的可能在Kafka整合指南中涉及了。

# Construct a streaming DataFrame that reads from topic1
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()

上面这段代码中的DataFrame是一个订阅了"topic1"流数据DataFrame。这种设置通过向DataStreamReader提供选项实现,其中我们至少需要kafka.bootstrap.servers(比如 host:port)和我们要订阅的topic名字这两个选项。上述例子中我们还将startingOffsets设成了“earliest”,所以我们会处理topic里面所有存在的数据。如果我们不设这个选项,默认值是“latest”,只有查询开始后进入的数据才会被处理。

df.printSchema() 展示了DataFrame的结构

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

返回的DataFrame中装了Kafka数据记录中常见的域和相关的元数据。我们现在可以用我们熟悉的DataFrame或者数据集的操作来转换结果了。不过通常来说,我们都是从解析键和值两个列中的二进制数据开始的。如何解析这些数据是每个应用特定的逻辑。幸运的是,Spark SQL内置了一些常用的序列化变换。

UTF8字符串形式的数据

如果Kafka数据记录中的字符存的是UTF8字符串,我们可以用cast将二进制转换成正确类型

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
JSON数据

JSON是另一种常用的数据格式,我们可以用内置的from_json函数和我们期望的结构把二进制值变成Spark SQL结构。
JSON is another common format for data that is written to Kafka. In this case, we can use the built-in from_json function along with the expected schema to convert a binary value into a Spark SQL struct.

# value schema: { "a": 1, "b": "string" }
schema = StructType().add("a", IntegerType()).add("b", StringType())
df.select( \
  col("key").cast("string"),
  from_json(col("value").cast("string"), schema))
用户自定义的序列化和反序列化

有时,我们可能已经有代码实现了Kafka的Deserializer接口。你可以重用这些代码,把它包成用户定义函数(UDF),像下面的Scala代码这样:

object MyDeserializerWrapper {
  val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
  MyDeserializerWrapper.deser.deserialize(topic, bytes)
)

df.selectExpr("""deserialize("topic1", value) AS message""")

注意上面代码里面的DataFrame和用标准Kafka consumer的时候指定value.deserializer是类似的。

把Spark作为Kafka Producer使用

从Spark支持的数据源往Kafka里写数据简单到就是在任意含有”value“列的DataFrame上调用writeStream方法,可选的还有一个列是“key”。如果key列没有制定,那么空值的key列会被自动加上。不过空值的key列有时候可能会导致数据分区的不均衡,所以要小心对待。

DataFrame的数据记录的目的地topic既可以静态的在DataStreamWriter制定,也可以在DataFrame中的topic列对每一个数据记录指定。

# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
  .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .option("checkpointLocation", "/path/to/HDFS/dir") \
  .start()

上述的查询拿一个包含用户信息的DataFrame写到Kafka。userId被序列化成字符串并且当做key来用。我们DataFrame的所有列一起序列化成一个JSON字符串,作为value放到结果中。

写Kafka时需要的两个选项是kafka.bootstrap.servers和checkpointLocation。上述的例子中,额外的“topic”选项可以指定一个写入的topic,这个选项会覆盖DataFrame中的topic列。

用Nest设备的端到端例子

这一节中,我们会探索一套端到端的数据处理管道,其中涉及Kakfa和其他的一些数据输入输出系统。我们处理一些Nest摄像头设备的日志文件的数据,数据的JSON结构如下

"devices": {
  "cameras": {
    "device_id": "awJo6rH...",
    "last_event": {
      "has_sound": true,
      "has_motion": true,
      "has_person": true,
      "start_time": "2016-12-29T00:00:00.000Z",
      "end_time": "2016-12-29T18:42:00.000Z"
    }
  }
}

我们会把这些数据和一些静态数据做join,静态数据中包含了从device_id到设备注册地的zip_code的映射

概括的说,我们想要的工作流如上图。给我们Nest摄像头的数据流,我们需要Spark处理下面几个任务:

  • 用比如Parquet列存储格式,建立高效且可查询的所有事件的历史归档
  • 实现低延时的事件聚合逻辑并且把结果写回Kafka供其他consumer使用
  • 实现对存储在Kafka中的精简数据实现批量汇报
Read Nest Device Logs From Kafka

第一步我们从KAfka中读取原始Nest数据流并且把我们感兴趣的摄像头数据提取出来。我们先从Kafka数据中解析Nest的JSON,我们调用from_json方法并且提供JSON结构和时间戳结构。然后我们我们在对和摄像头相关的列上做一系列变换来简化后续的处理步骤。

JSON结构

schema = StructType() \
  .add("metadata", StructType() \
    .add("access_token", StringType()) \
    .add("client_version", IntegerType())) \
  .add("devices", StructType() \
    .add("thermostats", MapType(StringType(), StructType().add(...))) \
    .add("smoke_co_alarms", MapType(StringType(), StructType().add(...))) \
    .add("cameras", MapType(StringType(), StructType().add(...))) \
    .add("companyName", StructType().add(...))) \
  .add("structures", MapType(StringType(), StructType().add(...)))

nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"

解析原始JSON数据

jsonOptions = { "timestampFormat": nestTimestampFormat }
parsed = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "nest-logs") \
  .load() \
  .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
Project Relevant Columns
camera = parsed \
  .select(explode("parsed_value.devices.cameras")) \
  .select("value.*")

sightings = camera \
  .select("device_id", "last_event.has_person", "last_event.start_time") \
  .where(col("has_person") == True)

生成cameraDataFrame,我们先讲“cameras”的域放到顶层。因为“cameras”是个MapType,每个生成的行都包含一个key-value对。我们用explode函数来为每一个key-value对生成新的一行。最后我们用star()将value列去嵌套化。下面是调用camera.printSchema()的结果

root
 |-- device_id: string (nullable = true)
 |-- software_version: string (nullable = true)
 |-- structure_id: string (nullable = true)
 |-- where_id: string (nullable = true)
 |-- where_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- name_long: string (nullable = true)
 |-- is_online: boolean (nullable = true)
 |-- is_streaming: boolean (nullable = true)
 |-- is_audio_input_enable: boolean (nullable = true)
 |-- last_is_online_change: timestamp (nullable = true)
 |-- is_video_history_enabled: boolean (nullable = true)
 |-- web_url: string (nullable = true)
 |-- app_url: string (nullable = true)
 |-- is_public_share_enabled: boolean (nullable = true)
 |-- activity_zones: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |-- public_share_url: string (nullable = true)
 |-- snapshot_url: string (nullable = true)
 |-- last_event: struct (nullable = true)
 |    |-- has_sound: boolean (nullable = true)
 |    |-- has_motion: boolean (nullable = true)
 |    |-- has_person: boolean (nullable = true)
 |    |-- start_time: timestamp (nullable = true)
 |    |-- end_time: timestamp (nullable = true)
 |    |-- urls_expire_time: timestamp (nullable = true)
 |    |-- web_url: string (nullable = true)
 |    |-- app_url: string (nullable = true)
 |    |-- image_url: string (nullable = true)
 |    |-- animated_image_url: string (nullable = true)
 |    |-- activity_zone_ids: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
聚合并写回Kafka

Aggregate and Write Back to Kafka

下面我们处理sightingsDataFrame来将每个sighting都加上相应的位置。别忘了我们还需要一些位置数据来通过设备Id来查找该设备的邮编。我们先为位置信息创建一个DataFrame然后和sightings的DataFrame在设备Id上做join。也就是说我们是在吧sightings的流数据和位置的静态数据做join。

加入位置信息

locationDF = spark.table("device_locations").select("device_id", "zip_code")
sightingLoc = sightings.join(locationDF, "device_id")
聚合统计结果并写到Kafka中

现在我们来生成每个邮编每小时的sightings总数,并把将其写到压缩的Kafka topic中,topic名字叫做“nest-camera-stats”

sightingLoc \
  .groupBy("zip_code", window("start_time", "1 hour")) \
  .count() \
  .select( \
    to_json(struct("zip_code", "window")).alias("key"),
    col("count").cast("string").alias("value")) \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "nest-camera-stats") \
  .option("checkpointLocation", "/path/to/HDFS/dir") \
  .outputMode("complete") \
  .start()

上述的查询会实时处理发生的sighting信息,然后按邮编和小时窗口为key写到Kafka中。随着相同key的一直收到更新,同一个key的下的数据记录会有很多,而Kafka的topic压缩会把保证看到相同key的新数据到达的时候把老的删掉。这样,压缩性会保证每个key最终只有最新的值被保留。

将归档结果放入持久存储中

除了将聚合结果写到kafka,我们还可能想把原始摄像头数据存一份到持久存储以便之后用。下面的例子讲摄像头的DataFrame用Parquet格式写到S3上。我们选择了Parquet因为其压缩和列存储性质,不过还有很多其他格式可以选择,诸如ORC,Avro,CSV等等格式都可以支持。

camera.writeStream \
  .format("parquet") \
  .option("startingOffsets", "earliest") \
  .option("path", "s3://nest-logs") \
  .option("checkpointLocation", "/path/to/HDFS/dir") \
  .start()

注意我们可以重用同一个摄像头DataFrame来开始多个流处理查询。比如,我们可以查询所有离线的camera,然后给运维中心发个提醒来调查。

批查询生成报告

下个例子我们要在“nest-camera-stats”压缩topic上运行一个批查询来生成哪些邮编有大量sightings的报告。

写批查询和流处理查询非常类似,同的是我们用read方法而不是readStream方法,write方法而不是writeStream方法。

批量读取和处理数据
report = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "nest-camera-stats") \
  .load() \
  .select( \
    json_tuple(col("key").cast("string"), "zip_code", "window").alias("zip_code", "window"),
    col("value").cast("string").cast("integer").alias("count")) \
  .where("count > 1000") \
  .select("zip_code", "window") \
  .distinct()

这个报告DataFrame可被用于生成报告或者生成实时的数据仪表盘。

结论

本文中我们展示了如何实时处理和变换Kafka的数据。我们实现了端到端的连续应用的例子,演示了用结构化流处理API的思路和容易性,以及如何利用强大的有且仅一次语法。