nav-img
Advanced

Using DLI to Submit a Flink SQL Job to Write Data to Kafka

Scenario

DLI Flink jobs can use other cloud services as data sources and sink streams for real-time compute. This section describes how to create and submit a Flink SQL job that uses DIS as the source stream and DMS Kafka as the sink stream.

Procedure

You need to create a Flink SQL job that has a source stream and a sink stream. The source stream is used to read data from DIS, and the sink stream is used to write data to Kafka. The procedure is as follows:

Table 1 Procedure for submitting a Flink SQL job using DLI

Procedure

Description

In this example scenario, use DIS as the data source and create a Kafka instance as the data destination.

Create an OBS bucket to store checkpoints, job logs, and debugging test data for the Flink SQL job.

Create compute resources required for submitting the Flink SQL job.

Create an enhanced datasource connection to connect the DLI elastic resource pool and DIS as well as the Kafka instance.

Use DLI's datasource authentication to save the access credentials of the data source.

Configure the security group where the data source is to allow access from the queue's CIDR block.

Submit a Flink SQL job to analyze data.

Step 1: Prepare Source and Sink Streams

In this example, DIS is the source stream. Distributed Message Service (DMS) Kafka is the sink stream.

  • Create a DIS stream as the job source stream.
    1. Log in to the DIS management console.
    2. In the upper left corner of the management console, select the target region and project.
    3. On the Overview page, click Buy Stream and configure stream parameters. The stream information is as follows:
      • Region: Select the region where DLI is located.
      • Stream Name: csinput
      • Stream Type: Common
      • Partitions: 1
      • Data Retention (hours): 24
      • Source Data Type: BLOB
      • Auto Scaling: disabled
      • Enterprise Project: default
      • Advanced Settings: Skip it.
    4. Click Buy Now. The Details page is displayed.
    5. Click Submit.
  • Create a Kafka platinum instance for the job sink stream.

    For details, see "Creating a Queue" in the Distributed Message Service Kafka User Guide.

    1. Before creating a Kafka instance, ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.

      The created VPC and the Kafka instance you will create must be in the same region. For more information, see "Kafka Network Connection Conditions" in the Distributed Message Service User Guide.

    2. Log in to the DMS for Kafka management console.
    3. Select a region in the upper left corner.
    4. Choose DMS for Kafka form the navigation pane on the left, click Buy Instance in the upper right corner, and set related parameters. The instance information is as follows:
      • Region: Select the region where DLI is located.
      • Project: Keep the default value.
      • AZ: Keep the default value.
      • Instance Name: kafka-dliflink
      • Enterprise Project: default
      • Version: Keep the default value.
      • CPU Architecture: Keep the default value.
      • Specifications: Select the specification as needed.
      • Brokers: Keep the default value.
      • Storage Space: Keep the default value.
      • Capacity Threshold Policy: Keep the default value.
      • VPC and Subnet: Select the VPC and subnet created in 1.
      • Security Group: Select the security group created in 1.
      • Manager Username: Enter dliflink (used to log in to the instance management page).
      • Password: **** (The system cannot detect your password.)
      • Confirm Password: ****
      • Advanced Settings: Enable Kafka SASL_SSL and configure the username and password for SSL authentication as prompted. You do not need to set other parameters.
    5. Click Buy. The confirmation page is displayed.
    6. Click Submit.
    7. On the DMS for Kafka console, click Kafka Premium and click the name of the Kafka instance, for example, kafka-dliflink. The instance details page is displayed.
    8. Locate the SSL certificate in Basic Information > Advanced Settings, and click Download. Download the package to the local PC and decompress it to obtain the client certificate file client.truststore.jks.

Step 2: Create an OBS Bucket for Saving Outputs

In this example, you need to enable OBS for job JobSample to provide DLI Flink jobs with the functions of checkpointing, saving job logs, and commissioning test data.

For how to create a bucket, see "Creating a Bucket" in the Object Storage Service Console Operation Guide.

  1. In the navigation pane of OBS Console, choose Object Storage.
  2. In the upper right corner of the page, click Create Bucket and set bucket parameters.
    • Region: Select the region where DLI is located.
    • Bucket Name: Enter a bucket name.
    • Default Storage Class: Standard
    • Bucket Policy: Private
    • Default Encryption: Do not enable
    • Direct Reading: Do not enable
    • Enterprise Project: default
  3. Click Create Now.

Step 3: Create an Elastic Resource Pool and Add Queues to the Pool

To create a Flink OpenSource SQL job, you must use your own queue as the existing default queue cannot be used. In this example, create an elastic resource pool named dli_resource_pool and a queue named dli_queue_01.

  1. Log in to the DLI management console.
  2. In the navigation pane on the left, choose Resources > Resource Pool.
  3. On the displayed page, click Buy Resource Pool in the upper right corner.
  4. On the displayed page, set the parameters.

    Table 2 describes the parameters.

    Table 2 Parameters

    Parameter

    Description

    Example Value

    Region

    Select a region where you want to buy the elastic resource pool.

    _

    Project

    Project uniquely preset by the system for each region

    Default

    Name

    Name of the elastic resource pool

    dli_resource_pool

    Specifications

    Specifications of the elastic resource pool

    Standard

    CU Range

    The maximum and minimum CUs allowed for the elastic resource pool

    64-64

    CIDR Block

    CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.

    172.16.0.0/19

    Enterprise Project

    Select an enterprise project for the elastic resource pool.

    default

  5. Click Buy.
  6. Click Submit.
  7. In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
  8. Set the basic parameters listed below.
    Table 3 Basic parameters for adding a queue

    Parameter

    Description

    Example Value

    Name

    Name of the queue to add

    dli_queue_01

    Type

    Type of the queue

    • To execute SQL jobs, select For SQL.
    • To execute Flink or Spark jobs, select For general purpose.

    _

    Enterprise Project

    Select an enterprise project.

    default

  9. Click Next and configure scaling policies for the queue.

    Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.

    Table 4 Scaling policy parameters

    Parameter

    Description

    Example Value

    Priority

    Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.

    1

    Period

    The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.

    The period for the scaling policy is from 00 to 24.

    00–24

    Min CU

    Minimum number of CUs allowed by the scaling policy

    16

    Max CU

    Maximum number of CUs allowed by the scaling policy

    64

  10. Click OK.

Step 4: Create an Enhanced Datasource Connection

You need to create an enhanced datasource connection for the Flink job. For details, see "Datasource Connections" > "Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.

Note
  • The CIDR block of the DLI queue bound with a datasource connection cannot overlap with the CIDR block of the data source.
  • Datasource connections cannot be created for the default queue.
  • To access a table across data sources, you need to use a queue bound to a datasource connection.
  1. Go to the DLI management console and choose Datasource Connections in the navigation pane on the left.
  2. On the displayed Enhanced tab, click Create. Set the following parameters:

    • Connection Name: diskafka
    • Bind Queue: flinktest
    • VPC: vpc-dli
    • Subnet: dli-subnet
      Note

      The VPC and subnet must be the same as those of the Kafka instance.

  3. Click OK.
  4. On the Enhanced tab page, click the created connection diskafka to view its VPC Peering ID and Connection Status. If the status is Active, the connection is successful.

Step 5: Create a Datasource Authentication

For how to create a datasource authentication, see "Datasource Connections" > "Datasource Authentication" in the Data Lake Insight User Guide.

  1. Upload the Kafka authentication file client.truststore.jks obtained in Step 1: Prepare Source and Sink Streams to the OBS bucket smoke-test created in Step 2: Create an OBS Bucket for Saving Outputs.
  2. On the DLI management console, click Datasource Connections.
  3. On the Datasource Authentication tab page, click Create to add an authentication information. Set the following parameters:
    • Authentication Certificate: Flink
    • Type: Kafka_SSL
    • Truststore Path: obs://smoke-test/client.truststore.jks
    • Truststore Password: ******

    You do not need to set other parameters.

  4. Click OK.

Step 6: Configure Security Group Rules and Test Address Connectivity

  1. On the DLI management console, click Resources > Queue Management, select the bound queue, and click the arrow next to the queue name to view the network segment information of the queue.
  2. Log in to the DMS for Kafka console, click Kafka Premium, and click the name of the created Kafka instance, for example, kafka-dliflink. The instance information page is displayed.
  3. On the Basic Information page, obtain the Kafka connection address and port number in the Connection Address area.

  4. On the Basic Information page, click the security group name in the Security Group area.
  5. On the security group configuration page of the Kafka instance, choose Inbound Rules > Add Rule. Set Protocol to TCP, Port to 9093, and Source to the CIDR block of the DLI queue. Click OK.
  6. Log in to the DLI management console and choose Resources > Queue Management. In the row of the Flink queue, choose More > Test Address Connectivity. In the Address text box, enter the Kafka connection address and port number in the format of IP address:port number, and click Test. The subsequent operations can be performed only when the address is reachable. Note that multiple addresses must be tested separately.

Step 7: Create a Flink SQL Job

After the source and sink streams are prepared, you can create a Flink SQL job.

  1. In the left navigation pane of the DLI management console, choose Job Management > Flink Jobs. The Flink Jobs page is displayed.
  2. In the upper right corner of the Flink Jobs page, click Create Job. Set the following parameters:
    • Type: Flink SQL
    • Name: DIS-Flink-Kafka
    • Description: Leave it blank.
    • Template Name: Do not select any template.
  3. Click OK to enter the editing page.
  4. Edit the Flink SQL job

    Enter detailed SQL statements in the statement editing area. The example statements are as follows. Note that the values of the parameters in bold must be changed according to the comments.

    CREATE SOURCE STREAM car_info (
    a1 string,
    a2 string,
    a3 string,
    a4 INT
    )
    WITH (
    type = "dis",
    region = "xxx",// Region where the current DLI queue is located
    channel = "csinput",
    encode = "csv",
    FIELD_DELIMITER = ";"
    );
    CREATE SINK STREAM kafka_sink (
    a1 string,
    a2 string,
    a3 string,
    a4 INT
    ) // Output field
    WITH (
    type="kafka",
    Change kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093",// Connection address of the Kafka instance
    kafka_topic = "testflink", // Topic to be written to Kafka. Log in to the Kafka console, click the name of the created Kafka instance, and view the topic name on the Topic Management page.
    encode = "csv", // Encoding format, which can be JSON or CSV.
    kafka_certificate_name = "Flink",// The value is the name of the Kafka datasource authentication created in Step 7.
    kafka_properties_delimiter = ",",
    // Replace xxx in username and password in kafka_properties with the username and password for creating SSL authentication for Kafka in step 2.
    kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL"
    );
    INSERT INTO kafka_sink
    SELECT * FROM car_info;
    CREATE sink STREAM car_info1 (
    a1 string,
    a2 string,
    a3 string,
    a4 INT
    )
    WITH (
    type = "dis",
    region = "xxx",// Region where the current DLI queue is located
    channel = "csinput",
    encode = "csv",
    FIELD_DELIMITER = ";"
    );
    insert into car_info1 select 'id','owner','brand',1;
    insert into car_info1 select 'id','owner','brand',2;
    insert into car_info1 select 'id','owner','brand',3;
    insert into car_info1 select 'id','owner','brand',4;
    insert into car_info1 select 'id','owner','brand',5;
    insert into car_info1 select 'id','owner','brand',6;
    insert into car_info1 select 'id','owner','brand',7;
    insert into car_info1 select 'id','owner','brand',8;
    insert into car_info1 select 'id','owner','brand',9;
    insert into car_info1 select 'id','owner','brand',10;
  5. Click Check Semantics.
  6. Set job running parameters. The mandatory parameters are as follows:
    • Queue: Flinktest
    • CUs: 2
    • Job Manager CUs: 1
    • Parallelism: 1
    • Save Job Log: selected
    • OBS Bucket: Select the OBS bucket for storing job logs. You need the permissions to access this bucket.

    You do not need to set other parameters.

  7. Click Save.
  8. Click Start. On the displayed Start Flink Job page, confirm the job specifications and the price, and click Start Now to start the job.

    After the job is started, the system automatically switches to the Flink Jobs page, and the created job is displayed in the job list. You can view the job status in the Status column. After a job is successfully submitted, Status of the job will change from Submitting to Running.

    If Status of a job is Submission failed or Running exception, the job fails to be submitted or fails to run. In this case, you can hover over the status icon to view the error details. You can click to copy these details. Rectify the fault based on the error information and resubmit the job.

  9. After the job is complete, you can log in to the management console of Distributed Message Service for Kafka to view the corresponding Kafka premium instance. Click the instance name, click the Message Query tab, select the Kafka topic written in the Flink SQL job, and click Search. In the Operation column, click View Message Body to view the written message content.