Using DLI to Submit a Flink SQL Job to Write Data to Kafka
Scenario
DLI Flink jobs can use other cloud services as data sources and sink streams for real-time compute. This section describes how to create and submit a Flink SQL job that uses DIS as the source stream and DMS Kafka as the sink stream.
Procedure
You need to create a Flink SQL job that has a source stream and a sink stream. The source stream is used to read data from DIS, and the sink stream is used to write data to Kafka. The procedure is as follows:
Procedure | Description |
---|---|
In this example scenario, use DIS as the data source and create a Kafka instance as the data destination. | |
Create an OBS bucket to store checkpoints, job logs, and debugging test data for the Flink SQL job. | |
Create compute resources required for submitting the Flink SQL job. | |
Create an enhanced datasource connection to connect the DLI elastic resource pool and DIS as well as the Kafka instance. | |
Use DLI's datasource authentication to save the access credentials of the data source. | |
Configure the security group where the data source is to allow access from the queue's CIDR block. | |
Submit a Flink SQL job to analyze data. |
Step 1: Prepare Source and Sink Streams
In this example, DIS is the source stream. Distributed Message Service (DMS) Kafka is the sink stream.
- Create a DIS stream as the job source stream.
- Log in to the DIS management console.
- In the upper left corner of the management console, select the target region and project.
- On the Overview page, click Buy Stream and configure stream parameters. The stream information is as follows:
- Region: Select the region where DLI is located.
- Stream Name: csinput
- Stream Type: Common
- Partitions: 1
- Data Retention (hours): 24
- Source Data Type: BLOB
- Auto Scaling: disabled
- Enterprise Project: default
- Advanced Settings: Skip it.
- Click Buy Now. The Details page is displayed.
- Click Submit.
- Create a Kafka platinum instance for the job sink stream.
For details, see "Creating a Queue" in the Distributed Message Service Kafka User Guide.
- Before creating a Kafka instance, ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.
The created VPC and the Kafka instance you will create must be in the same region. For more information, see "Kafka Network Connection Conditions" in the Distributed Message Service User Guide.
- Log in to the DMS for Kafka management console.
- Select a region in the upper left corner.
- Choose DMS for Kafka form the navigation pane on the left, click Buy Instance in the upper right corner, and set related parameters. The instance information is as follows:
- Region: Select the region where DLI is located.
- Project: Keep the default value.
- AZ: Keep the default value.
- Instance Name: kafka-dliflink
- Enterprise Project: default
- Version: Keep the default value.
- CPU Architecture: Keep the default value.
- Specifications: Select the specification as needed.
- Brokers: Keep the default value.
- Storage Space: Keep the default value.
- Capacity Threshold Policy: Keep the default value.
- VPC and Subnet: Select the VPC and subnet created in 1.
- Security Group: Select the security group created in 1.
- Manager Username: Enter dliflink (used to log in to the instance management page).
- Password: **** (The system cannot detect your password.)
- Confirm Password: ****
- Advanced Settings: Enable Kafka SASL_SSL and configure the username and password for SSL authentication as prompted. You do not need to set other parameters.
- Click Buy. The confirmation page is displayed.
- Click Submit.
- On the DMS for Kafka console, click Kafka Premium and click the name of the Kafka instance, for example, kafka-dliflink. The instance details page is displayed.
- Locate the SSL certificate in Basic Information > Advanced Settings, and click Download. Download the package to the local PC and decompress it to obtain the client certificate file client.truststore.jks.
- Before creating a Kafka instance, ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.
Step 2: Create an OBS Bucket for Saving Outputs
In this example, you need to enable OBS for job JobSample to provide DLI Flink jobs with the functions of checkpointing, saving job logs, and commissioning test data.
For how to create a bucket, see "Creating a Bucket" in the Object Storage Service Console Operation Guide.
- In the navigation pane of OBS Console, choose Object Storage.
- In the upper right corner of the page, click Create Bucket and set bucket parameters.
- Region: Select the region where DLI is located.
- Bucket Name: Enter a bucket name.
- Default Storage Class: Standard
- Bucket Policy: Private
- Default Encryption: Do not enable
- Direct Reading: Do not enable
- Enterprise Project: default
- Click Create Now.
Step 3: Create an Elastic Resource Pool and Add Queues to the Pool
To create a Flink OpenSource SQL job, you must use your own queue as the existing default queue cannot be used. In this example, create an elastic resource pool named dli_resource_pool and a queue named dli_queue_01.
- Log in to the DLI management console.
- In the navigation pane on the left, choose Resources > Resource Pool.
- On the displayed page, click Buy Resource Pool in the upper right corner.
- On the displayed page, set the parameters.
Table 2 describes the parameters.
Table 2 Parameters Parameter
Description
Example Value
Region
Select a region where you want to buy the elastic resource pool.
_
Project
Project uniquely preset by the system for each region
Default
Name
Name of the elastic resource pool
dli_resource_pool
Specifications
Specifications of the elastic resource pool
Standard
CU Range
The maximum and minimum CUs allowed for the elastic resource pool
64-64
CIDR Block
CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.
172.16.0.0/19
Enterprise Project
Select an enterprise project for the elastic resource pool.
default
- Click Buy.
- Click Submit.
- In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
- Set the basic parameters listed below.
Table 3 Basic parameters for adding a queue Parameter
Description
Example Value
Name
Name of the queue to add
dli_queue_01
Type
Type of the queue
- To execute SQL jobs, select For SQL.
- To execute Flink or Spark jobs, select For general purpose.
_
Enterprise Project
Select an enterprise project.
default
- Click Next and configure scaling policies for the queue.
Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.
Table 4 Scaling policy parameters Parameter
Description
Example Value
Priority
Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.
1
Period
The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.
The period for the scaling policy is from 00 to 24.
00–24
Min CU
Minimum number of CUs allowed by the scaling policy
16
Max CU
Maximum number of CUs allowed by the scaling policy
64
- Click OK.
Step 4: Create an Enhanced Datasource Connection
You need to create an enhanced datasource connection for the Flink job. For details, see "Datasource Connections" > "Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.
- The CIDR block of the DLI queue bound with a datasource connection cannot overlap with the CIDR block of the data source.
- Datasource connections cannot be created for the default queue.
- To access a table across data sources, you need to use a queue bound to a datasource connection.
- Go to the DLI management console and choose Datasource Connections in the navigation pane on the left.
- On the displayed Enhanced tab, click Create. Set the following parameters:
- Connection Name: diskafka
- Bind Queue: flinktest
- VPC: vpc-dli
- Subnet: dli-subnetNote
The VPC and subnet must be the same as those of the Kafka instance.
- Click OK.
- On the Enhanced tab page, click the created connection diskafka to view its VPC Peering ID and Connection Status. If the status is Active, the connection is successful.
Step 5: Create a Datasource Authentication
For how to create a datasource authentication, see "Datasource Connections" > "Datasource Authentication" in the Data Lake Insight User Guide.
- Upload the Kafka authentication file client.truststore.jks obtained in Step 1: Prepare Source and Sink Streams to the OBS bucket smoke-test created in Step 2: Create an OBS Bucket for Saving Outputs.
- On the DLI management console, click Datasource Connections.
- On the Datasource Authentication tab page, click Create to add an authentication information. Set the following parameters:
- Authentication Certificate: Flink
- Type: Kafka_SSL
- Truststore Path: obs://smoke-test/client.truststore.jks
- Truststore Password: ******
You do not need to set other parameters.
- Click OK.
Step 6: Configure Security Group Rules and Test Address Connectivity
- On the DLI management console, click Resources > Queue Management, select the bound queue, and click the arrow next to the queue name to view the network segment information of the queue.
- Log in to the DMS for Kafka console, click Kafka Premium, and click the name of the created Kafka instance, for example, kafka-dliflink. The instance information page is displayed.
- On the Basic Information page, obtain the Kafka connection address and port number in the Connection Address area.
- On the Basic Information page, click the security group name in the Security Group area.
- On the security group configuration page of the Kafka instance, choose Inbound Rules > Add Rule. Set Protocol to TCP, Port to 9093, and Source to the CIDR block of the DLI queue. Click OK.
- Log in to the DLI management console and choose Resources > Queue Management. In the row of the Flink queue, choose More > Test Address Connectivity. In the Address text box, enter the Kafka connection address and port number in the format of IP address:port number, and click Test. The subsequent operations can be performed only when the address is reachable. Note that multiple addresses must be tested separately.
Step 7: Create a Flink SQL Job
After the source and sink streams are prepared, you can create a Flink SQL job.
- In the left navigation pane of the DLI management console, choose > . The Flink Jobs page is displayed.
- In the upper right corner of the Flink Jobs page, click Create Job. Set the following parameters:
- Type: Flink SQL
- Name: DIS-Flink-Kafka
- Description: Leave it blank.
- Template Name: Do not select any template.
- Click OK to enter the editing page.
- Edit the Flink SQL job
Enter detailed SQL statements in the statement editing area. The example statements are as follows. Note that the values of the parameters in bold must be changed according to the comments.
CREATE SOURCE STREAM car_info (a1 string,a2 string,a3 string,a4 INT)WITH (type = "dis",region = "xxx",// Region where the current DLI queue is locatedchannel = "csinput",encode = "csv",FIELD_DELIMITER = ";");CREATE SINK STREAM kafka_sink (a1 string,a2 string,a3 string,a4 INT) // Output fieldWITH (type="kafka",Change kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093",// Connection address of the Kafka instancekafka_topic = "testflink", // Topic to be written to Kafka. Log in to the Kafka console, click the name of the created Kafka instance, and view the topic name on the Topic Management page.encode = "csv", // Encoding format, which can be JSON or CSV.kafka_certificate_name = "Flink",// The value is the name of the Kafka datasource authentication created in Step 7.kafka_properties_delimiter = ",",// Replace xxx in username and password in kafka_properties with the username and password for creating SSL authentication for Kafka in step 2.kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL");INSERT INTO kafka_sinkSELECT * FROM car_info;CREATE sink STREAM car_info1 (a1 string,a2 string,a3 string,a4 INT)WITH (type = "dis",region = "xxx",// Region where the current DLI queue is locatedchannel = "csinput",encode = "csv",FIELD_DELIMITER = ";");insert into car_info1 select 'id','owner','brand',1;insert into car_info1 select 'id','owner','brand',2;insert into car_info1 select 'id','owner','brand',3;insert into car_info1 select 'id','owner','brand',4;insert into car_info1 select 'id','owner','brand',5;insert into car_info1 select 'id','owner','brand',6;insert into car_info1 select 'id','owner','brand',7;insert into car_info1 select 'id','owner','brand',8;insert into car_info1 select 'id','owner','brand',9;insert into car_info1 select 'id','owner','brand',10; - Click Check Semantics.
- Set job running parameters. The mandatory parameters are as follows:
- Queue: Flinktest
- CUs: 2
- Job Manager CUs: 1
- Parallelism: 1
- Save Job Log: selected
- OBS Bucket: Select the OBS bucket for storing job logs. You need the permissions to access this bucket.
You do not need to set other parameters.
- Click Save.
- Click Start. On the displayed Start Flink Job page, confirm the job specifications and the price, and click Start Now to start the job.
After the job is started, the system automatically switches to the page, and the created job is displayed in the job list. You can view the job status in the column. After a job is successfully submitted, Status of the job will change from to .
If Status of a job is or , the job fails to be submitted or fails to run. In this case, you can hover over the status icon to view the error details. You can click
to copy these details. Rectify the fault based on the error information and resubmit the job.
- After the job is complete, you can log in to the management console of Distributed Message Service for Kafka to view the corresponding Kafka premium instance. Click the instance name, click the Message Query tab, select the Kafka topic written in the Flink SQL job, and click Search. In the Operation column, click View Message Body to view the written message content.
- Scenario
- Procedure
- Step 1: Prepare Source and Sink Streams
- Step 2: Create an OBS Bucket for Saving Outputs
- Step 3: Create an Elastic Resource Pool and Add Queues to the Pool
- Step 4: Create an Enhanced Datasource Connection
- Step 5: Create a Datasource Authentication
- Step 6: Configure Security Group Rules and Test Address Connectivity
- Step 7: Create a Flink SQL Job