Advanced
Тема интерфейса

Using a DLI Flink Job to Synchronize MRS Kafka Data to a CloudTable ClickHouse Cluster in Real Time

This section describes the best practices of real-time data synchronization. You can use DLI Flink jobs to synchronize data generated by MRS Kafka jobs to ClickHouse in real time.

  • For details about DLI, see "DLI Introduction" in the Data Lake Insight User Guide.
  • For details about Kafka, see "Service Overview" in the MapReduce Service User Guide.

    Figure 1 Data synchronization process


Constraints

  • Kerberos authentication is not enabled for the MRS cluster.
  • To ensure network connectivity, the security group, region, VPC, and subnet of the MRS cluster must be the same as those of the CloudTable cluster.
  • Add the elastic resource network segment subnet associated with the DLI queue to the inbound rules of both the MRS and CloudTable security groups to establish a data source connection. For details, see "Data Migration and Data Transmission > Configuring DLI to Read and Write Data from and to External Data Sources > Configuring the Network Connection Between DLI and Data Sources (Enhanced Datasource Connection) > Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.
  • You must enable the network connectivity between the upstream and downstream of DLI. For details, see "Creating an Elastic Resource Pool and Queues Within It > Managing Queues > Testing Address Connectivity" in the Data Lake Insight User Guide.

Procedure

The general procedure is as follows:

  1. Step 1: Creating a CloudTable ClickHouse Cluster
  2. Step 2: Creating a Flink Job in the MRS Cluster to Generate Data
  3. Step 3: Creating a DLI Flink Job to Synchronize Data
  4. Step 4: Verify the result.

Step 1: Creating a CloudTable ClickHouse Cluster

  1. Log in to the CloudTable console and create a ClickHouse cluster. For details, see "Using ClickHouse > Creating a ClickHouse Cluster" in the CloudTable Service User Guide.
  2. Download the client and verification file. For details, see "Using ClickHouse > Connecting to a ClickHouse Cluster > Using a Client to Access a ClickHouse Cluster" in the CloudTable Service User Guide.
  3. Create an ECS. For details, see "Getting Started" > "Creating an ECS" in the Elastic Cloud Server User Guide.
  4. Install and verify the client. For details, see "Using ClickHouse" > "Connecting to a ClickHouse Cluster" > "Using a Client to Access a ClickHouse Cluster" in the CloudTable Service User Guide.
  5. Create a Flink database.
    create database flink;

    Use the Flink database.

    use flink;
  6. Create the flink.order table.
    create table flink.order(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
  7. Check whether the table is successfully created:
    select * from flink.order;

Step 2: Creating a Flink Job in the MRS Cluster to Generate Data

  1. Create an MRS cluster. For details, see "Getting Started" > "Purchasing and Using an MRS Cluster" in the MapReduce Service User Guide.
  2. Log in to Manager and choose Cluster > Flink > Dashboard.
  3. Click the link on the right of Flink WebUI to access the Flink web UI.
  4. On the Flink web UI, create a Flink task to generate data.
    1. Click Create Job on the Job Management page. The Create Job page is displayed.
    2. Set parameters and click OK to create a Flink SQL job. To modify a SQL statement, click Develop in the Operation column and add the following command on the SQL page:
      Note

      ip:port: IP address and port number

      • To obtain the IP address, log in to FusionInsight Manager and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker.
      • To obtain the port number, click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
      • You are advised to add multiple IP addresses and port numbers to the properties.bootstrap.servers parameter to prevent job running failures caused by unstable network or other reasons.

      CREATE TABLE IF NOT EXISTS `lineorder_ck` (
      `order_id` string,
      `order_channel` string,
      `order_time` string,
      `pay_amount` double,
      `real_pay` double,
      `pay_time` string,
      `user_id` string,
      `user_name` string,
      `area_id` string
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_flink',
      'properties.bootstrap.servers' = 'ip:port',
      'value.format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka'
      );
      CREATE TABLE lineorder_datagen (
      `order_id` string,
      `order_channel` string,
      `order_time` string,
      `pay_amount` double,
      `real_pay` double,
      `pay_time` string,
      `user_id` string,
      `user_name` string,
      `area_id` string
      ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1000'
      );
      INSERT INTO
      lineorder_ck
      SELECT
      *
      FROM
      lineorder_datagen;
    3. Return to the Job Management page and click Start in the Operation column. If the job status is Running, the job is successfully executed.

Step 3: Creating a DLI Flink Job to Synchronize Data

  1. Create elastic resources and queues. For details, see Creating an Elastic Resource Pool and Queues Within It > Creating an Elastic Resource Pool and Creating Queues Within It in the Data Lake Insight User Guide.
  2. Create a data source connection. For details, see "Data Migration and Data Transmission > Configuring DLI to Read and Write Data from and to External Data Sources > Configuring the Network Connection Between DLI and Data Sources (Enhanced Datasource Connection) > Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.
  3. Test the connectivity between DLI and the upstream MRS Kafka and between DLI and the downstream CloudTable ClickHouse.
    1. After the elastic resource pool and queue are created, choose Resources > Queue Management. On the displayed page, test the address connectivity. For details, see "Creating an Elastic Resource Pool and Queues Within It" > "Managing Queues" > "Testing Address Connectivity" in the Data Lake Insight User Guide.
    2. To obtain the upstream IP address and port number, go to Manager of the cluster and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker. Click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
    3. To obtain the downstream IP address and port number, go to the Details page.
  4. Create a Flink job. For details, see "Submitting a Flink Job Using DLI" > "Creating a Flink OpenSource SQL Job" in the Data Lake Insight User Guide.
  5. Select the Flink job created in 1, click Edit in the Operation column, and add SQL statements for data synchronization.
    create table orders (
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'test_flink',
    'properties.bootstrap.servers' = 'ip:port',
    'properties.group.id' = 'testGroup_1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
    );
    create table clickhouseSink(
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string
    ) with (
    'connector' = 'clickhouse',
    'url' = 'jdbc:clickhouse://ip:port/flink',
    'username' = 'admin',
    'password' = '****',
    'table-name' = 'order',
    'sink.buffer-flush.max-rows' = '10',
    'sink.buffer-flush.interval' = '3s'
    );
    insert into clickhouseSink select * from orders;
  6. Click Format and click Save.
    Notice

    Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.

  7. On the DLI management console, choose Job Management > Flink Jobs.
  8. Click Start in the Operation column to start the job created in 1. If the job status is Running, the job is successfully executed.

Step 4: Verify the result.

  1. After the MRS Flink and DLI jobs are successfully executed, return to the ClickHouse cluster command window and access the cluster client.
  2. View databases.
    show databases;
  3. Use databases.
    use databases;
  4. View tables.
    show tables;
  5. View the synchronized data.
    select * from order limit 10;

    Figure 2 Viewing synchronization data