Using a DLI Flink Job to Synchronize MRS Kafka Data to a CloudTable HBase Cluster in Real Time
This section describes the best practices of real-time data synchronization. You can use DLI Flink jobs to synchronize MRS Kafka data to HBase in real time.
- For details about DLI, see "DLI Introduction" in the Data Lake Insight User Guide.
- For details about Kafka, see "Service Overview" in the MapReduce Service User Guide.
Figure 1 Data synchronization process
Constraints
- Kerberos authentication is not enabled for the MRS cluster.
- To ensure network connectivity, the security group, region, VPC, and subnet of the MRS cluster must be the same as those of the CloudTable cluster.
- Add the elastic resource network segment subnet associated with the DLI queue to the inbound rules of both the MRS and CloudTable security groups to establish a data source connection. For details, see "Data Migration and Data Transmission > Configuring DLI to Read and Write Data from and to External Data Sources > Configuring the Network Connection Between DLI and Data Sources (Enhanced Datasource Connection) > Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.
- You must enable the network connectivity between the upstream and downstream of DLI. For details, see "Creating an Elastic Resource Pool and Queues Within It > Managing Queues > Testing Address Connectivity" in the Data Lake Insight User Guide.
Procedure
The general procedure is as follows:
Step 1: Creating a CloudTable HBase Cluster
- Log in to the CloudTable console and create a CloudTable HBase cluster.
- Create an ECS. For details, see "Getting Started" > "Creating an ECS" in the Elastic Cloud Server User Guide.
- Install the client. For details, see "Using HBase" > "Connecting to an HBase Cluster" > "Using HBase Shell to Access a Cluster" in the CloudTable Service User Guide.
- Start the Shell to access the cluster. Run the bin/hbase shell command to start the shell to access the cluster.
- Create the order table.create 'order', {NAME => 'detail'}
Step 2: Creating a Flink Job in the MRS Cluster to Generate Data
- Create an MRS cluster. For details, see "Getting Started" > "Purchasing and Using an MRS Cluster" in the MapReduce Service User Guide.
- Log in to Manager and choose Cluster > Flink > Dashboard.
- Click the link on the right of Flink WebUI to access the Flink web UI.
- On the Flink web UI, create a Flink task to generate data.
- Click Create Job on the Job Management page. The Create Job page is displayed.
- Set parameters and click OK to create a Flink SQL job. To modify a SQL statement, click Develop in the Operation column and add the following command on the SQL page:Note
ip:port: IP address and port number
- To obtain the IP address, log in to FusionInsight Manager and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker.
- To obtain the port number, click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
- You are advised to add multiple IP addresses and port numbers to the properties.bootstrap.servers parameter to prevent job running failures caused by unstable network or other reasons.
CREATE TABLE IF NOT EXISTS `lineorder_hbase` (`order_id` string,`order_channel` string,`order_time` string,`pay_amount` double,`real_pay` double,`pay_time` string,`user_id` string,`user_name` string,`area_id` string) WITH ('connector' = 'kafka','topic' = 'test_flink','properties.bootstrap.servers' = 'ip:port','value.format' = 'json','properties.sasl.kerberos.service.name' = 'kafka');CREATE TABLE lineorder_datagen (`order_id` string,`order_channel` string,`order_time` string,`pay_amount` double,`real_pay` double,`pay_time` string,`user_id` string,`user_name` string,`area_id` string) WITH ('connector' = 'datagen','rows-per-second' = '1000');INSERT INTOlineorder_hbaseSELECT*FROMlineorder_datagen; - Return to the Job Management page and click Start in the Operation column. If the job status is Running, the job is successfully executed.
Step 3: Creating a DLI Flink Job to Synchronize Data
- Create elastic resources and queues. For details, see Creating an Elastic Resource Pool and Queues Within It > Creating an Elastic Resource Pool and Creating Queues Within It in the Data Lake Insight User Guide.
- Create a data source connection. For details, see "Data Migration and Data Transmission > Configuring DLI to Read and Write Data from and to External Data Sources > Configuring the Network Connection Between DLI and Data Sources (Enhanced Datasource Connection) > Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.
- Test the connectivity between DLI and the upstream MRS Kafka and between DLI and the downstream CloudTable HBase.
- After the elastic resources and queues are created, choose Resources > Queue Management. On the displayed page, test the address connectivity. For details, see "Creating an Elastic Resource Pool and Queues Within It > Managing Queues > Testing Address Connectivity" in the Data Lake Insight User Guide.
- To obtain the upstream IP address and port number, go to Manager of the cluster and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker. Click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
- Obtain the downstream IP address and port number.
- To obtain the IP address, go to the Details page. Obtain the domain name from ZK Link (Intranet) under Cluster Information. Run the following command to resolve the IP address:ping Access domain name
- To obtain the port number, go to the Details page. Obtain the port from ZK Link (Intranet) under Cluster Information.
- To obtain the IP address, go to the Details page. Obtain the domain name from ZK Link (Intranet) under Cluster Information. Run the following command to resolve the IP address:
- Create a Flink job. For details, see "Submitting a Flink Job Using DLI" > "Creating a Flink OpenSource SQL Job" in the Data Lake Insight User Guide.
- Select the Flink job created in 1, click Edit in the Operation column, and add SQL statements for data synchronization.CREATE TABLE orders (order_id string,order_channel string,order_time string,pay_amount double,real_pay double,pay_time string,user_id string,user_name string,area_id string) WITH ('connector' = 'kafka','topic' = 'test_flink','properties.bootstrap.servers' = 'ip:port','properties.group.id' = 'testGroup_1','scan.startup.mode' = 'latest-offset','format' = 'json');create table hbaseSink(order_id string,detail Row(order_channel string,order_time string,pay_amount double,real_pay double,pay_time string,user_id string,user_name string,area_id string)) with ('connector' = 'hbase-2.2','table-name' = 'order','zookeeper.quorum' = 'ip:port','sink.buffer-flush.max-rows' = '1');insert into hbaseSink select order_id, Row(order_channel,order_time,pay_amount,real_pay,pay_time,user_id,user_name,area_id) from orders;
- Click Format and click Save.Notice
Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.
- On the DLI management console, choose Job Management > Flink Jobs.
- Click Start in the Operation column to start the job created in 1. If the job status is Running, the job is successfully executed.
Step 4: Verify the result.
- After the MRS Flink and DLI tasks are successfully executed, return to the command window of the HBase cluster and start the downstream HBase shell client.scan 'order'
- Check whether the data source is continuously updated.
Parent topic: Importing Data
- Constraints
- Procedure
- Step 1: Creating a CloudTable HBase Cluster
- Step 2: Creating a Flink Job in the MRS Cluster to Generate Data
- Step 3: Creating a DLI Flink Job to Synchronize Data
- Step 4: Verify the result.