BigQuery can handle huge amounts of data, but you don't have to worry about the infrastructure at all (really, at all), and it's fast and cheap, It is tempting to put all your data into BigQuery and process it all with BigQuery.
That's why I sometimes wanted to load an entire MySQL database into BigQuery, and here's a working poem about it.
Configuration and requirements
MySQL had a capacity of several hundred gigabytes and hundreds of tables.
Fortunately, all tables had the same column name for the record creation date and time, I wanted to partition by record creation date for future convenience.
Cumbersome to load into BigQuery.
Partitioning is a pain in the ass.
BigQuery allows tables to be split by date and data to be loaded separately by date.
For example, to load data into the 2019-01-01 section, you would do the following
bq load <data_set>.<table>$20190101 <data_source>
However, this is a hassle. It cannot be loaded all at once and must be processed one day at a time.
However, if the date to be loaded is the same as the date column of the data, the column can be specified and loaded.
If you specify a date column as shown below, the data for the entire period will be loaded together and partitioned based on the date in the date column.
bq load --time_partitioning_field '<time_cloumn_name>' <data_set>.<table> <data_source>
I used this one this time and it is very convenient!
Table creation and schema definition is tedious
I need to create the same table on the BigQuery side as the MySQL table, but this is super tedious....
Since the information is already in MySQL, I would like it to be created automatically by referencing it.
There were several transfer services from MySQL to BigQuery, but the process was table-by-table and some table and schema definitions had to be written.
I am confident that I will make mistakes along the way....
Load into BigQuery with Parquet
There is a way to create a BigQuery schema from a MySQL schema using scripts or other heavy-handed techniques, and specify the created schema when loading with BigQuery, but I would like to make it easier if possible.
I found in BigQuery's help on loading that there is a data format called "Parquet" that also has the data type of the columns as information.
I tried it and it looked good, so I decided to use that one this time.
Loading in "Parquet" offers the following advantages
No table creation required
- If there is no table when you load it, it will automatically create one for you.
No schema definition required
- Column names and types are retrieved from Parquet and created automatically
No schema updates required
- When new columns are added, they are automatically added when Parquet is loaded
In other words, if you specify only the table name and load the Parquet file, the system will create and load the table without difficulty.
No touch on columns! Of course, you can also use the automatic partitioning feature by specifying the date column name!
We decided to output the MySQL tables once in Parquet to Google Cloud Storage and load them into BigQuery.
Since the BigQuery side of the operation is simply to call the load command, we were freed from the most tedious and troublesome task of table creation and schema definition!
Output in Parquet from MySQL
I decided to use Spark to output the data from MySQL in Parquet format, although Spark is somewhat difficult to learn because it requires programming, Spark is a bit difficult to get to grips with because of the programming required, but it is easy to write for a right-to-left data flow like in this case.
For example, reading a MySQL table, outputting it in Parquet to Storege, and loading it from Storage to BigQuery would be as follows.
import subprocess optons = { "url":"jdbc:mysql://<sv_name>:3306/<db_name>", "driver":"com.mysql.jdbc.Driver", "dbtable":"<table_name>" "user":"xxxx", "password":"xxxx" } df = spark.read.format("jdbc").options(**options).load() df.write.mode("overwrite").format("parquet").\ save("gs://xxxx/yyyy/<table_name>") args = [ "bq", "load", "--source_format", "PARQUET", "--time_partitionting_field", "<time_column_name>", <dataset>.<table_name>, "gs://xxxx/yyyy/<table_name>/" ] subprocess.run(args=args)
There is no BiqQuery connector for Python in Spark, so I load it directly with the bq command.... (as was the case with the Google sample)
Run as many tables as you want.
All that remains is to retrieve the list of tables from "information_schema" and execute the previous process for the number of tables.
import subprocess OPTIONS = { "url":"jdbc:mysql://<sv_name>:3306/{}", "driver":"com.mysql.jdbc.Driver", "user":"xxxx", "password":"xxxx" } def getTableList(): options = OPTIONS.copy() options["url"] = options["url"].format("information_schema") options["query"] = "SELECT table_name FROM tables WHERE table_schema='<db_name>'" df = spark.read.format("jdbc").options(**options).load() return list(map(lambda x: x["table_name"], df.collect())) def main(): for table in getTableList(): options = OPTIONS.copy() options["url"] = options["url"].format("<db_name>") options["dbtable"] = table df = spark.read.format("jdbc").options(**options).load() df.write.mode("overwrite").format("parquet").\ save("gs://xxxx/yyyy/{}".format(table)) args = [ "bq", "load", "--source_format", "PARQUET", "--time_partitionting_field", "<time_column_name>", <dataset>.<table_name>, "gs://xxxx/yyyy/{}/*".format(table) ] subprocess.run(args=args) main()
This will put the entire MySQL database into BigQuery without schema, regardless of how many tables there are!
Impressions, etc.
I am satisfied enough with BigQuery for data aggregation, so I don't dare to use Spark, but Spark is also useful when I want to convert data from one service to another.
For example, Spark has connectors for AWS S3 and Google Cloud Storage, so you can simply load("s3a://~")
and then save("gs://~")
,
You can easily move data from AWS S3 to Google Cloud Storage by simply doing load("s3a://~")
and then save("gs://~")
.
This would be great if it had a BigQuery connector.
When data is fetched from a database such as MySQL, the values of "spark.executor.heartbeatInterval" and "spark.network.timeout" should be large, as they may time out on read.