# At the first execution those options should all be set to True.# Once those operations are performed, you can rely on the baked version and disable them.## If you already have the https://s3.amazonaws.com/tripdata/{year}-citibike-tripdata.zip# You can directly disable ENABLE_DOWNLOAD and put the zip files into data## For the graphics, we do not use the star schema. # Thus if you disable "DO_NOT_IGNORE_STAR_SCHEMA", # then "ENABLE_COMPUTER" and "ENABLE_COMPUTE_EVENT" will be ignored.# EXTRACTENABLE_DOWNLOAD =FalseENABLE_UNZIP =False# TRANSFORMENABLE_COMPUTE_ZIP_TO_PARQUET =FalseDO_NOT_IGNORE_STAR_SCHEMA =FalseENABLE_COMPUTE_DIMENSION =FalseENABLE_COMPUTE_EVENT =False# AGGREGATEENABLE_COMPUTE_GRAPHICS_DATA =False
We download the data from https://s3.amazonaws.com/tripdata/{year}-citibike-tripdata.zip
Code
# from 2014 to 2023zip_files = []def download_data(): trip_urls = [ (year, f"https://s3.amazonaws.com/tripdata/{year}-citibike-tripdata.zip")for year inrange(2014, 2023+1) ]ifnot os.path.exists(os.path.join('data')) : os.makedirs('data')for year, url in trip_urls: basename = os.path.join('data', str(year) +"_"+'citibike_tripdata') zip_filename = basename +".zip" csv_filename = basename +".csv" zip_files.append((year, zip_filename, csv_filename))ifnot ENABLE_DOWNLOAD : continueprint(f'Check {basename} ...') response = requests.get(url, stream=True) total_size =int(response.headers.get('content-length', 0)) block_size =1024 progress =0ifnot os.path.exists(zip_filename) andnot os.path.exists(csv_filename) :withopen(zip_filename, 'wb') as f:for data in response.iter_content(block_size):if data: f.write(data) progress +=len(data)print(f'\rDownloaded {progress}/{total_size} bytes', end='')print(f'\nDownload complete: {zip_filename}')print("Finished")if ENABLE_UNZIP or ENABLE_DOWNLOAD: download_data()
We then unzip the data.
Code
def unzip_data():for (year, zip_filename, csv_filename) in zip_files:# if year < 2018: continue # WARNING : DISABLE THIS LINEifnot zipfile.is_zipfile(zip_filename):print("Corrupted zip file.")breakif os.path.exists("tmp"): shutil.rmtree("tmp")print("Unzip : ", zip_filename)with zipfile.ZipFile(zip_filename, 'r') as zip_ref: zip_ref.extractall("tmp")print("Process ..")# find the folder in tmp items = os.listdir("tmp")for folder in items :ifnot os.path.isdir(os.path.join("tmp", folder)) or\ folder.startswith("__") :continue# find all the folder in this folder sub_folders = os.listdir(os.path.join("tmp", folder))for sub_folder in sub_folders :ifnot os.path.isdir(os.path.join("tmp", folder, sub_folder)) or\ sub_folder.startswith(".") : continue sub_item = os.listdir(os.path.join("tmp", folder, sub_folder))for leaf in sub_item :# move the csv inside to data from_path = os.path.join("tmp", folder, sub_folder, leaf) dest_path = os.path.join("data", leaf)if os.path.exists(dest_path) : os.remove(dest_path) shutil.move(from_path, "data")if os.path.exists("tmp"): shutil.rmtree("tmp")if ENABLE_UNZIP : unzip_data()
To optimize disk usage, we could have unziped one file at a time and convert its content instantaneously to .parquet.
By looking at the previous code output (cached in column_names.txt),
we notice the following columns between 2014-01 \(\to\) 2021-01 (included) :
['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender']
The naming convention is not exactly the same between : 201610-citibike-tripdata_1.csv\(\to\)201703-citibike-tripdata.csv_1.csv : ['Trip Duration', 'Start Time', 'Stop Time', 'Start Station ID', 'Start Station Name', 'Start Station Latitude', 'Start Station Longitude', 'End Station ID', 'End Station Name', 'End Station Latitude', 'End Station Longitude', 'Bike ID', 'User Type', 'Birth Year', 'Gender']
The columns change between 2021-02 \(\to\) 2023-12 (included) :
We will also add a binary column old_format to indicate if the data comes from O or N as defined above.
Code
col_mapping_1 = {'tripduration': 'trip_duration','usertype': 'member_casual','birth year': 'birth_year','starttime': 'started_at','stoptime': 'ended_at','start station id': 'start_station_id','start station name': 'start_station_name','start station latitude': 'start_lat','start station longitude': 'start_lng','end station id': 'end_station_id','end station name': 'end_station_name','end station latitude': 'end_lat','end station longitude': 'end_lng','bikeid': 'ride_id',}col_mapping_2 = {'Trip Duration': 'tripduration','Start Time': 'starttime','Stop Time': 'stoptime','Start Station ID': 'start station id','Start Station Name': 'start station name','Start Station Latitude': 'start station latitude','Start Station Longitude': 'start station longitude','End Station ID': 'end station id','End Station Name' : 'end station name','End Station Latitude' : 'end station latitude','End Station Longitude' : 'end station longitude','Bike ID' : 'bikeid','User Type' : 'usertype','Birth Year' : 'birth year','Gender' : 'gender'}