4 Monitoring
4.0.1 Relevant Job to analyze
= spark.read.parquet(os.path.join("computed", "travels", "*.parquet"))
df_to_analyze
df_to_analyze.show()
df_star.show()
We’ll also analyze some toPandas performed for visualization graphics in the notebook.
4.0.2 Monitor Travels
True) df.explain(
== Parsed Logical Plan ==
Relation [birth_year#3175,end_lat#3176,end_lng#3177,end_station_id#3178,end_station_name#3179,ended_at#3180,gender#3181,member_casual#3182,old_format#3183,ride_id#3184,rideable_type#3185,start_lat#3186,start_lng#3187,start_station_id#3188,start_station_name#3189,started_at#3190,trip_duration#3191,year#3192] parquet
== Analyzed Logical Plan ==
birth_year: int, end_lat: double, end_lng: double, end_station_id: string, end_station_name: string, ended_at: timestamp, gender: string, member_casual: string, old_format: boolean, ride_id: string, rideable_type: string, start_lat: double, start_lng: double, start_station_id: string, start_station_name: string, started_at: timestamp, trip_duration: int, year: int
Relation [birth_year#3175,end_lat#3176,end_lng#3177,end_station_id#3178,end_station_name#3179,ended_at#3180,gender#3181,member_casual#3182,old_format#3183,ride_id#3184,rideable_type#3185,start_lat#3186,start_lng#3187,start_station_id#3188,start_station_name#3189,started_at#3190,trip_duration#3191,year#3192] parquet
== Optimized Logical Plan ==
Relation [birth_year#3175,end_lat#3176,end_lng#3177,end_station_id#3178,end_station_name#3179,ended_at#3180,gender#3181,member_casual#3182,old_format#3183,ride_id#3184,rideable_type#3185,start_lat#3186,start_lng#3187,start_station_id#3188,start_station_name#3189,started_at#3190,trip_duration#3191,year#3192] parquet
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [birth_year#3175,end_lat#3176,end_lng#3177,end_station_id#3178,end_station_name#3179,ended_at#3180,gender#3181,member_casual#3182,old_format#3183,ride_id#3184,rideable_type#3185,start_lat#3186,start_lng#3187,start_station_id#3188,start_station_name#3189,started_at#3190,trip_duration#3191,year#3192] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/astragaliton/Documents/documentation/Big-Data-Project-II-IF..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<birth_year:int,end_lat:double,end_lng:double,end_station_id:string,end_station_name:string...
Relation does not differ between Analyzed and Optimized Logical Plan. The query optimizer in the RDBMS did not find any opportunities to further optimize the logical plan after the initial analysis phase.
The Physical Plan differs from the Optimized Logical Plan in that it includes the ColumnarToRow operator, which is a Spark-specific operator that converts data from the columnar format used for storage and processing to the row format used for execution. The FileScan parquet operator is also specific to Spark and is used to scan Parquet data files.
- It requires :
- One stage to read the parquet file of
travels
. - One stage to show it.
- Two stages (over two jobs) to show the df_star
- One stage to read the parquet file of
- HashAggregate efficiently performs aggregations by grouping data using hash tables, optimizing memory usage and processing speed.
- Exchange hashpartitioning redistributes data across nodes based on hashed keys, enabling parallel processing and necessary data colocation for operations like joins, albeit with the cost of shuffle operations.
- It requires :
The above operations do not perform shuffle operations. But, neverthless, if we look down on jobs performing at .toPandas operation, they do perform a shuffle operation to do so.
- Tasks are the smallest units of work within a stage, each processing a single data partition.
- Again, for the 3 operations shown in this part, there is one task (except for df_star which uses 2 stages in one of its tasks). Nevertheless, most toPandas stages perform 8 task (because, the
travels.parquet
is partionned into the 8`** years present in the dataset)