This is an option that we can use to do cassandra migration, using the lib spark-cassandra-connector. It's helpful when we want to migrate cassandra to a new cluster, at least it's an option.
To complete this, you need:
- Spark 3.4.0
- Cassandra 3.11.X
- Openshift 4
- JDK 11+
- S3 instance( if you want to use shared storage )
Build spark master and worker, for details of spark, ref the doc
docker build -t quay.io/sswguo/spark-master:3.4.0-jdk11-1.2 .
docker push quay.io/sswguo/spark-master:3.4.0-jdk11-1.2
docker build -t quay.io/sswguo/spark-worker:3.4.0-jdk11-1.2 .
docker push quay.io/sswguo/spark-worker:3.4.0-jdk11-1.2
Deploy spark
oc login --token=<TOKEN> --server=<CLUSTER>
oc apply -f spark-master-deployment.yaml
oc apply -f spark-master-service.yaml
oc apply -f spark-worker-persistent-volume-claim.yaml
oc apply -f spark-worker-deployment.yaml
NOTE: Request the suitable storage based on your business, the default size here is 5Gi. And resources for workers.
Build the migration application, confirm the migration app is in read mode
mvn clean package
Sync the jar into spark worker
mkdir app
cp target/migration-1.0-SNAPSHOT.jar app
oc rsync app/ <spark-worker POD_NAME>:/tmp/
Configure the setting for cassandra in /opt/spark/conf/config.yaml
host: cassandra-cluster
port: 9042
user: cassandra
password: cassandra
tables:
- keyspace: <KEYSPACE>
table: <TABLE>
tempView: <View>
filter: "creation > '2023-08-08'"
id: <ID>
tempView: Custom name of the table
filter: If you want to do migration separately, especially the data is large
id: The identities of the directory which stores the CSV files
Submit the job in worker node
spark-submit \
--class org.commonjava.migration.App \
--master spark://spark-master:7077 \
/tmp/migration-1.0-SNAPSHOT.jar 2>&1 | tee debug.log
If you have large data in prod, try to use filters to do the migration, for example we can try to use date to split the data
Dataset<Row> sqlDF = spark.sql("SELECT * FROM <tablename> where <date_column> > '2023-08-08'");
Or the filters when loading the data
.filter("some_column > some_value")
.select("relevant_column1", "relevant_column2");
Another point, we can increase the resources (cpu & mem) to speed up the process
- worker resource
- resources:
limits:
cpu: '4'
memory: 8Gi
requests:
cpu: 300m
memory: 400Mi
- executor resource
.config("spark.executor.memory", "6g")
.config("spark.executor.cores", "4")
NOTE: executor with cores 4 will run 4 tasks in parallel
This application supports two file formats, parquet and CSV, ref the config
fileFormat: parquet
Parquet is designed to be highly efficient for both reading and writing large datasets. link
If there is no shared storage, you need to move the data into the target worker first.
oc rsync stage_pathmap_migration/ <spark-worker POD_NAME>:/opt/spark/storage/indy_pathmap_0729/
Update the migration app to write mode, and then sync the jar into spark worker, submit the job in worker node as above.
NOTE: If you migrate the data into a new cassandra cluster, please ref the above to deploy the spark master & worker in the new cluster as well.
If you have shared storage, you will not need to transfer the csv files between Openshift cluster, you just need to configure the info of S3
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
BUCKET_REGION
BUCKET_NAME
You can use the following tool aws cli to check the files in S3, and remove them after migration.
Install the aws cli in the cluster.
oc apply -f s3/k8s/aws_cli.yaml
NOTE: I mount the volume of spark worker here, then I can download the files first before importing.
volumes:
- name: vol-migration-data
persistentVolumeClaim:
claimName: vol-migration-data
The cmd to download the files from S3 to local.
aws s3 cp s3://<BUCKET_NAME>/indy_migration_test/indystorage/pathmap/ ./ --recursive
Login into the cli container, exec aws s3 command with the following ENVs.
export AWS_ACCESS_KEY_ID=<>
export AWS_SECRET_ACCESS_KEY=<>
export AWS_DEFAULT_REGION=<>
aws s3 ls --human-readable --summarize s3://<BUCKET_NAME>/indy_migration_test/indystorage/pathmap/
aws s3 rm s3://<BUCKET_NAME>/indy_migration_test/ --recursive
During tests, I found that there is no big issue on reading from Cassandra. And just need to pay attention to the write operation. Try to decrease the value for each if you have smaller resources for your Cassandra.
.config("spark.cassandra.output.consistency.level", "ONE") // QUORUM
.config("spark.cassandra.output.batch.size.rows", "500") // 2000 for pathmap
.config("spark.cassandra.output.batch.size.bytes", "1048576") // 10485760 (10M) for pathmap
.config("spark.cassandra.output.concurrent.writes", "5") // 50 for pathmap
For example, the following is the config for folo.records.
- keyspace: folo
table: records2
tempView: records2
id: records2
# Write Tuning Parameters
outputConsistencyLevel: ONE
outputBatchSizeRows: 500
outputBatchSizeBytes: 524288
outputConcurrentWrites: 5