Json column parsing: Data engineering Interview Question by FANNG

Deepa Saw
2 min readMay 26, 2024

--

Question : Parse the Json column to identify the last DialogId

from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, from_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType, ArrayType



# Define the schema for the DataFrame
schema = StructType([
StructField("studentregistrationno", StringType(), True),
StructField("json", StringType(), True)
])

# Define the data
data = [
("1001", """{
"id":"1dzqlCDiOfLD25Ju88Dc4b-us|0000012",
"from":{
"id":"bot-vsa-eastus-dev@trpme8IbWCI",
"name":"bot-vsa-eastus-dev"
},
"conversation":{
"id":"1dzqlCDiOfLD25Ju88Dc4b-us"
},
"recipient":{
"id":"dl_27dfeb85aa8e4abf9ba1c99f15d62c94"
},
"channelData":{
"DialogTraceDetail":{
"DialogTraces":[
{
"DialogId":"1",
"NodeId":"c3e3eb0f-97b2-46ae-9f36-77f009b1152a"
},
{
"DialogId":"2",
"NodeId":"0bbb332e-08fa-4e16-885d-005d869fd509"
},
{
"DialogId":"3",
"NodeId":"8a32ebd1-bec3-4cde-a75a-1977a561b019"
}
]
}
},
"SequenceNumber":8,
"DialogTag":"StartOver|StartOver|Greeting",
"IsLastConversationActivity":false
}"""),
("1002", """{
"id":"2dzqlCDiOfLD25Ju88Dc4b-us|0000013",
"from":{
"id":"bot-vsa-eastus-dev@trpme8IbWCI",
"name":"bot-vsa-eastus-dev"
},
"conversation":{
"id":"2dzqlCDiOfLD25Ju88Dc4b-us"
},
"recipient":{
"id":"dl_27dfeb85aa8e4abf9ba1c99f15d62c94"
},
"channelData":{
"DialogTraceDetail":{
"DialogTraces":[
{
"DialogId":"new_topic_8cac3ca38a6c4698a67cfd23e14eb3af_8a4964498f83424bb3a86295bf7d0952_startover",
"NodeId":"c3e3eb0f-97b2-46ae-9f36-77f009b1152a"
}
]
}
},
"SequenceNumber":8,
"DialogTag":"StartOver|StartOver|Greeting",
"IsLastConversationActivity":false
}""")
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

display(df)
Input Dataframe

Read the Schema of Json


json_schema = spark.read.json(df.select("json").rdd.map(lambda row: row[0]))
display(json_schema.schema)

Use element_at and size to determine the last element


df_parsed = df.withColumn("jsonStruct", from_json(col("json"), json_schema.schema))
from pyspark.sql.functions import col, size, element_at
display(df_parsed.select("studentregistrationno", element_at(col("jsonStruct.channelData.DialogTraceDetail.DialogTraces.DialogId"), size(col("jsonStruct.channelData.DialogTraceDetail.DialogTraces.DialogId"))).alias("last_element"),"jsonStruct.channelData.DialogTraceDetail.DialogTraces.DialogId","jsonStruct.*"))

--

--