配置 SHELL 类型任务节点¶
Data IDE 底层支持多种引擎,包括 Hive、Spark、MapReduce 等。在创建任务流时,可以通过添加 SHELL 类型任务节点,来进任务流开发。本文介绍如何配置SHELL类型任务节点。
执行HiveSQL任务¶
使用 SHELL 类型任务节点,通过命令行方式,执行 HiveSQL 任务,实现 Batch 计算。
命令格式¶
canaanhive [arguments]
参数说明¶
| 参数 | 说明 | 示例 | 
|---|---|---|
| -f <arg> | HQL文件名 | -f demo.sql | 
| -d <arg> | 时间参数 | -d 2018-01-01 | 
| -E <paraN>=<valN> | HQL文件中用户自定义参数赋值 | -E para=abc | 
| -str <sql> | SQL语句 | -str “show tables;” | 
对于自定义参数 -E,若传入 -E para=abc 时,HQL文件中的 ${env.para} 将自动被替换为abc。
对于时间参数 -d,以传入 -d 2018-01-01 为例,HQL文件中的 ${env.FORMAT} 将自动被替换,FORMAT参数说明如下:
备注
下表 FORMAT 参数中的字母 P 代表 Previous,因此 PnD/PnM/PnY 代表 前 n 天/月/年。
字母 N 代表 Next,因此 NnD/NnM/NnY 代表 后 n 天/月/年。
| FORMAT | 范围 | 值 | 
|---|---|---|
| YYYYMMDD | 2018-01-01 | |
| YYYYMMDD_PnD | 1<= n <=30 | 2017-12-31 ~ 2017-12-02 | 
| YYYYMMDD_PnM | 1<= n <=12 | 2017-12-01 ~ 2017-01-01 | 
| YYYYMMDD_PnY | 1<= n <=2 | 2017-01-01 ~ 2016-01-01 | 
| YYYYMMDD_NnD | 1<= n <=2 | 2018-01-02 ~ 2018-01-03 | 
| YYYYMMDD_NnM | 1<= n <=2 | 2018-02-01 ~ 2018-03-01 | 
| YYYYMMDD_NnY | 1<= n <=2 | 2019-01-01 ~ 2020-01-01 | 
| YYYYMM | 2018-01 | |
| YYYYMMDD_PnD | 1<= n <=2 | 2017-12 ~ 2017-12 | 
| YYYYMMDD_PnM | 1<= n <=2 | 2017-12 ~ 2017-11 | 
| YYYYMMDD_PnY | 1<= n <=2 | 2017-01 ~ 2016-01 | 
| YYYYMMDD_NnD | 1<= n <=2 | 2018-01 ~ 2018-01 | 
| YYYYMMDD_NnM | 1<= n <=2 | 2018-02 ~ 2018-03 | 
| YYYYMMDD_NnY | 1<= n <=2 | 2019-01 ~ 2020-01 | 
| YYYY | 2018 | |
| YYYY_PnD | 1<= n <=2 | 2017 ~ 2017 | 
| YYYY_PnM | 1<= n <=2 | 2017 ~ 2017 | 
| YYYY_PnY | 1<= n <=2 | 2017 ~ 2016 | 
| YYYY_NnD | 1<= n <=2 | 2018 ~ 2018 | 
| YYYY_NnM | 1<= n <=2 | 2018 ~ 2018 | 
| YYYY_NnY | 1<= n <=2 | 2019 ~ 2020 | 
| MM | 01 | |
| DD | 01 | 
示例¶
SHELL节点中输入命令行如下:
canaanhive -f demo.sql -d 2018-01-01 -E DB=demo
HQL文件 demo.sql 示例代码如下:
use ${env.DB};
create table if not exists demo(id string);
insert into demo values('${env.YYYYMMDD}');
执行内容如下:
use demo;
create table if not exists demo(id string);
insert into demo values('2018-01-01');
执行Spark任务¶
使用 SHELL 类型任务节点,通过命令行方式,执行 PySpark、Spark 任务。
命令格式¶
以 PySpark Job 为例,创建 SHELL 类型节点,使用 SHELL 命令,运行 Job 的主函数。
sh predict.sh
提交 PySpark Job¶
submit-pyspark-application    [options]      <python file>     [app arguments]
参数说明¶
| Options | Function | 
|---|---|
| –python 2.7/3.5 | python version. Support 2.7 or 3.5. Default is 2.7. | 
| –pythonEnvPath. If not set, default python envrionment will be used | VirtualEnv path In HDFS. | 
| –name NAME | Name of your application. | 
| –queue QUEUE_NAME | The YARN queue to submit to (Default: “default”). | 
| –num-executors NUM | Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM. | 
| –executor-cores NUM | Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) | 
| –driver-cores NUM | Number of cores used by the driver, only in cluster mode (Default: 1). | 
| –conf PROP=VALUE | Arbitrary Spark configuration property. | 
| –py-files PY_FILES | Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. | 
| –files FILES | Comma-separated list of files to be placed in the working directory of each executor. | 
| –archives ARCHIVES | Comma separated list of archives to be extracted into the working directory of each executor. | 
| –driver-memory MEM | Memory for driver (e.g. 1000M, 2G) (Default: 2G). | 
| –driver-java-options | Extra Java options to pass to the driver. | 
| –driver-library-path | Extra library path entries to pass to the driver. | 
| –driver-class-path | Extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath. | 
提交 Spark Job¶
submit-spark-application    [options]      <app-jar>     [app arguments]
参数说明¶
| Options | Function | 
|---|---|
| –class CLASS_NAME | Your application’s main class (for Java / Scala apps). | 
| –name NAME | Name of your application. | 
| –packages | Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by the –repositories option. The format for the coordinates should be groupId:artifactId:version. | 
| –jars JARS | Comma-separated list of local jars to include on the driver and executor classpaths. | 
| –conf PROP=VALUE | Arbitrary Spark configuration property. | 
| –files FILES | Comma-separated list of files to be placed in the working directory of each executor. | 
| –archives ARCHIVES | Comma-separated list of archives to be extracted into the working directory of each executor. | 
| –driver-memory MEM | Memory for driver (e.g. 1000M, 2G) (Default: 2G). | 
| –driver-java-options | Extra Java options to pass to the driver. | 
| –driver-library-path | Extra library path entries to pass to the driver. | 
| –driver-class-path | Extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath. | 
| –executor-cores NUM | Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode). | 
| –driver-cores NUM | Number of cores used by the driver, only in cluster mode (Default: 1). | 
| –queue QUEUE_NAME | The YARN queue to submit to (Default: “default”). | 
| –num-executors NUM | Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM. | 
示例¶
SHELL 节点输入 SHELL 命令行,运行主函数 predict.sh:
sh predict.sh
主函数代码样例如下:
submit_pyspark_application_func(){
    submit-pyspark-application \
    --deploy-mode cluster \
    --queue ${1} \
    --name pyspark_predict_test \
    --num-executors 10 \
    --driver-memory 16g \
    --executor-memory 12g \
    --driver-cores 2 \
    --executor-cores 3 \
    --conf spark.eventLog.enabled=true \
    --conf spark.network.timeout=240000 \
    --conf spark.executor.heartbeatInterval=24000 \
    --conf spark.yarn.executor.memoryOverhead=8192 \
    --archives hdfs://user/db_test/userPythonLib.zip#ANACONDA  \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/MINICONDA/bin/python \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.logger_table=wens_status_algo_running \
    --conf spark.hdfs_user=${2} \
    --conf spark.hdfs_path=hdfs://titan/user/${2} \
    --conf spark.start_date=${3} \
    --conf spark.end_date=${4} \
    --conf spark.site_ids=${5} \
    --conf spark.metric_save_path=/user/${2}/operaphm_temperature/metrics \
    --py-files anomaly.py,hadoop_common_functions.py,layout.py,utm.zip,rle.py,common_tools.py,steadystatefilter.py,math_utils.py \
    --conf spark.eventLog.enabled=true  predict.py
}
echo "test"
其中,predict.py 是入口py文件,需要和 predict.sh 放在同一层级的目录下面。