Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline执行步骤比较多时flink任务无法执行 #225

Open
ONI-MIKO opened this issue Dec 26, 2022 · 1 comment
Open

pipeline执行步骤比较多时flink任务无法执行 #225

ONI-MIKO opened this issue Dec 26, 2022 · 1 comment

Comments

@ONI-MIKO
Copy link

ONI-MIKO commented Dec 26, 2022

开发环境
Windows 11
open jdk 1.8
alink 1.6
flink 1.13.6/1.13.0

我在单机环境下测试对一个csv文件进行特征处理。
代码逻辑如下:
创建pipeline添加处理步骤,调用pipeline的fit方法生成pipelineModel再调用save保存到本地ak文件。

测试数据1个G, 添加了50个stage, 笔记本开发环境大概20分钟跑完,但是增加到70多个stage再运行,跑一晚上也没法出来结果,一直卡在flink的Optimizer类的compile方法。与数据量无关,任务并没有开始执行,cpu占用不高。

线程卡在org.apache.flink.optimizer.Optimizer类的compile方法
QQ截图20221226170800

更具体一点的是在
QQ截图20221226170734

UnionParallelismAndForwardEnforcer会遍历整个执行计划,跑一晚上一直卡在遍历执行计划这里。

我的代码样例

public class XGBTest2 {

    private static final String ROOT = "D:\\dev\\work\\workspaces\\deepq\\feature-engineering\\feature-server\\src\\test\\resources\\";
    //    private static final String INPUT_FILE = ROOT + "feature_full.csv";
    private static final String INPUT_FILE = ROOT + "feature-70.csv";
    private static final String MODEL_FILE = ROOT + "xgb.model";

    public static void main(String[] args) throws Exception {
        trainModel();
//        loadModel();
    }

    private static void trainModel() throws Exception {
        BatchOperator<?> input = loadCsvData();

        Pipeline pipeline = new Pipeline(
                // 年龄
                new UDFTransformer()
                        .setUdf(new DurationUDF())
                        .setSelectedCols("birthday")
                        .setOutputCol("age"),
                new EqualWidthDiscretizer()
                        .setSelectedCols("age")
                        .setOutputCols("age")
                        .setNumBuckets(5),
                // 性别
                new UDFTransformer()
                        .setUdf(StringMappingUDF.build("男", 1, "女", 2))
                        .setSelectedCols("gender")
                        .setOutputCol("gender"),
                // 学历
                new UDFTransformer()
                        .setUdf(StringMappingUDF.build("小学", 1, "中学", 2, "大学", 3, "研究生", 4))
                        .setSelectedCols("education")
                        .setOutputCol("education"),
                // 婚姻状态
                new UDFTransformer()
                        .setUdf(StringMappingUDF.build("未婚", 1, "已婚", 2))
                        .setSelectedCols("marriage_status")
                        .setOutputCol("marriage_status"),
                // 营业部代码
                new StringIndexer()
                        .setSelectedCol("branch_code")
                        .setOutputCol("branch_code")
                        .setStringOrderType(HasStringOrderTypeDefaultAsRandom.StringOrderType.RANDOM),
                // 属地区代码
                new StringIndexer()
                        .setSelectedCol("zone_code")
                        .setOutputCol("zone_code")
                        .setStringOrderType(HasStringOrderTypeDefaultAsRandom.StringOrderType.RANDOM),
                // 资产量
                new Bucketizer()
                        .setCutsArray(new double[]{1000, 10000, 100000, 500000, 1000000, 5000000})
                        .setSelectedCols("assets")
                        .setOutputCols("assets"),
                // 个人年收入
                new Bucketizer()
                        .setCutsArray(new double[]{1000, 10000, 100000, 500000, 1000000, 5000000})
                        .setSelectedCols("indv_annual_income")
                        .setOutputCols("indv_annual_income"),
                // 股龄
                new UDFTransformer()
                        .setUdf(new DurationUDF())
                        .setSelectedCols("account_time")
                        .setOutputCol("account_age"),
                new Bucketizer()
                        .setCutsArray(new double[]{1, 3, 10})
                        .setSelectedCols("account_age")
                        .setOutputCols("account_age"),
                // 客户权限标识
                new MultiHotEncoder()
                        .setSelectedCols("account_info")
                        .setOutputCols("account_info"),
                // 客群
                new MultiHotEncoder()
                        .setSelectedCols("group")
                        .setOutputCols("group"),
                // 盈利能力
                new EqualWidthDiscretizer()
                        .setSelectedCols("profitability")
                        .setOutputCols("profitability")
                        .setNumBuckets(5),
                // 关注理财产品
                new MultiHotEncoder()
                        .setSelectedCols("follow_finance")
                        .setOutputCols("follow_finance"),
                // 关注投顾产品
                new MultiHotEncoder()
                        .setSelectedCols("follow_invest")
                        .setOutputCols("follow_invest"),
                // 关注增值产品
                new MultiHotEncoder()
                        .setSelectedCols("follow_value_add")
                        .setOutputCols("follow_invest"),
                // 关注保险
                new MultiHotEncoder()
                        .setSelectedCols("follow_insurance")
                        .setOutputCols("follow_insurance"),
                // 关注投资工具
                new MultiHotEncoder()
                        .setSelectedCols("follow_tool")
                        .setOutputCols("follow_tool"),
                // 关注话题
                new MultiHotEncoder()
                        .setSelectedCols("follow_topic")
                        .setOutputCols("follow_topic"),
                // 关注栏目
                new MultiHotEncoder()
                        .setSelectedCols("follow_column")
                        .setOutputCols("follow_column"),
                // 持有基金
                new MultiHotEncoder()
                        .setSelectedCols("own_fund")
                        .setOutputCols("own_fund"),
                // 持有理财产品
                new MultiHotEncoder()
                        .setSelectedCols("own_finance")
                        .setOutputCols("own_finance"),
                // 持有投顾产品
                new MultiHotEncoder()
                        .setSelectedCols("own_invest")
                        .setOutputCols("own_invest"),
                // 持有增值产品
                new MultiHotEncoder()
                        .setSelectedCols("own_value_add")
                        .setOutputCols("own_value_add"),
                // 持有保险
                new MultiHotEncoder()
                        .setSelectedCols("own_insurance")
                        .setOutputCols("own_insurance"),
                // 持有投资工具
                new MultiHotEncoder()
                        .setSelectedCols("own_tool")
                        .setOutputCols("own_tool"),
                // 自选个股
                new Imputer()
                        .setSelectedCols("optional_stock")
                        .setOutputCols("optional_stock")
                        .setStrategy(HasStrategy.Strategy.MEAN),
                new MultiHotEncoder()
                        .setSelectedCols("optional_stock")
                        .setOutputCols("optional_stock"),
                // 自选基金
                new Imputer()
                        .setSelectedCols("optional_fund")
                        .setOutputCols("optional_fund")
                        .setStrategy(HasStrategy.Strategy.MEAN),
                // 偏好栏目
                new MultiHotEncoder()
                        .setSelectedCols("preference_column")
                        .setOutputCols("preference_column"),
                // 偏好个股
                new MultiHotEncoder()
                        .setSelectedCols("preference_stock")
                        .setOutputCols("preference_stock"),
                // 偏好行业
                new MultiHotEncoder()
                        .setSelectedCols("preference_industry")
                        .setOutputCols("preference_industry"),
                // 偏好概念
                new MultiHotEncoder()
                        .setSelectedCols("preference_concept")
                        .setOutputCols("preference_concept"),
                // 偏好新闻
                new MultiHotEncoder()
                        .setSelectedCols("preference_news")
                        .setOutputCols("preference_news"),
                // 偏好基金
                new MultiHotEncoder()
                        .setSelectedCols("preference_fund")
                        .setOutputCols("preference_fund"),
                // 偏好银行标签
                new MultiHotEncoder()
                        .setSelectedCols("preference_bank")
                        .setOutputCols("preference_bank"),
                // 偏好财经标签
                new MultiHotEncoder()
                        .setSelectedCols("preference_financal")
                        .setOutputCols("preference_financal"),
                // 偏好生活标签
                new MultiHotEncoder()
                        .setSelectedCols("preference_life")
                        .setOutputCols("preference_life"),
                // 偏好股票板块
                new MultiHotEncoder()
                        .setSelectedCols("preference_plate")
                        .setOutputCols("preference_plate"),

                // 物料类型
                new StringIndexer()
                        .setSelectedCol("item_type")
                        .setOutputCol("item_type")
                        .setStringOrderType(HasStringOrderTypeDefaultAsRandom.StringOrderType.RANDOM),
                // 来源
                new StringIndexer()
                        .setSelectedCol("source_code")
                        .setOutputCol("source_code")
                        .setStringOrderType(HasStringOrderTypeDefaultAsRandom.StringOrderType.RANDOM),
                // 资讯情感
                new UDFTransformer()
                        .setUdf(StringMappingUDF.build("POSITIVE", 1, "NEGATIVE", 2, "NEUTRAL", 0))
                        .setSelectedCols("nature_code")
                        .setOutputCol("nature_code"),
                // 栏目代码
                new Imputer()
                        .setSelectedCols("columns_code")
                        .setOutputCols("columns_code")
                        .setStrategy(HasStrategy.Strategy.MEAN),
                new StringIndexer()
                        .setSelectedCol("columns_code")
                        .setOutputCol("columns_code")
                        .setStringOrderType(HasStringOrderTypeDefaultAsRandom.StringOrderType.RANDOM),
                // 主题属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_concept")
                        .setOutputCols("item_concept")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_concept")
                        .setOutputCol("item_concept"),
                new MultiHotEncoder()
                        .setSelectedCols("item_concept")
                        .setOutputCols("item_concept"),
                // 新闻属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_news")
                        .setOutputCols("item_news")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_news")
                        .setOutputCol("item_news"),
                new MultiHotEncoder()
                        .setSelectedCols("item_news")
                        .setOutputCols("item_news"),
                // 行业属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_industry")
                        .setOutputCols("item_industry")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_industry")
                        .setOutputCol("item_industry"),
                new MultiHotEncoder()
                        .setSelectedCols("item_industry")
                        .setOutputCols("item_industry"),
                // 基金属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_fund")
                        .setOutputCols("item_fund")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_fund")
                        .setOutputCol("item_fund"),
                new MultiHotEncoder()
                        .setSelectedCols("item_fund")
                        .setOutputCols("item_fund"),
                // 个股属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_stock")
                        .setOutputCols("item_stock")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_stock")
                        .setOutputCol("item_stock"),
                new MultiHotEncoder()
                        .setSelectedCols("item_stock")
                        .setOutputCols("item_stock"),
                // 生活属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_life")
                        .setOutputCols("item_life")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_life")
                        .setOutputCol("item_life"),
                new MultiHotEncoder()
                        .setSelectedCols("item_life")
                        .setOutputCols("item_life"),
                // 财经属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_financial")
                        .setOutputCols("item_financial")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_financial")
                        .setOutputCol("item_financial"),
                new MultiHotEncoder()
                        .setSelectedCols("item_financial")
                        .setOutputCols("item_financial"),
                // 银行属性
                new JsonValue()
                        .setJsonPath("$[*].tagCode")
                        .setSelectedCol("item_bank")
                        .setOutputCols("item_bank")
                        .setOutputColTypes("OBJECT_ARRAY<STRING>"),
                new UDFTransformer()
                        .setUdf(new ArrayJoinUDF(" ", 5))
                        .setSelectedCols("item_bank")
                        .setOutputCol("item_bank"),
                new MultiHotEncoder()
                        .setSelectedCols("item_bank")
                        .setOutputCols("item_bank"),
                // 行为
                new UDFTransformer()
                        .setUdf(StringMappingUDF.build(1, CollectionUtil.newHashMap("click", 1)))
                        .setSelectedCols("action")
                        .setOutputCol("action")
        );

        PipelineModel pipelineModel = pipeline.fit(input);

        pipelineModel.save(MODEL_FILE, true);

        BatchOperator.execute();

        System.out.println("train finished");
    }

    private static BatchOperator<?> loadCsvData() {
        String schemaStr = "user_id STRING, birthday STRING, gender STRING, education STRING, marriage_status STRING, branch_code STRING, zone_code STRING, assets DOUBLE, indv_annual_income DOUBLE, account_time STRING, risk_appetite INT, account_info STRING, user_level INT, group STRING, profitability DOUBLE, follow_finance STRING, follow_invest STRING, follow_value_add STRING, follow_insurance STRING, follow_tool STRING, follow_topic STRING, follow_column STRING, own_fund STRING, own_finance STRING, own_invest STRING, own_value_add STRING, own_insurance STRING, own_tool STRING, optional_stock STRING, optional_fund STRING, sign INT, preference_reason INT, preference_itemtype INT, preference_column STRING, preference_stock STRING, preference_industry STRING, preference_concept STRING, preference_news STRING, preference_fund STRING, preference_bank STRING, preference_financal STRING, preference_life STRING, preference_plate STRING, item_id STRING, item_type STRING, source_code STRING, quality DOUBLE, nature_code STRING, columns_code STRING, expose_duration INT, click_duration INT, item_concept STRING, item_news STRING, item_industry STRING, item_fund STRING, item_stock STRING, item_life STRING, item_financial STRING, item_bank STRING, szzs INT, szcz INT, cybz INT, hszs INT, ixic INT, location INT, trade_day INT, month INT, day INT, hour INT, action STRING";
//        String schemaStr2 = "user_id STRING, birthday STRING, prefer_stock STRING, action STRING, item_id STRING, item_type STRING, supplier_code STRING, source_code STRING, gender STRING, education STRING, marriage_status STRING, branch_code STRING, zone_code STRING, assets DOUBLE, indv_annual_income DOUBLE, account_time STRING, risk_appetite INT, account_info STRING, user_level INT, group STRING, profitability DOUBLE, follow_finance STRING, follow_invest STRING, follow_tool STRING, follow_topic STRING, follow_column STRING, own_fund STRING, own_finance STRING, optional_stock STRING, optional_fund STRING, sign INT, quality DOUBLE, columns_code STRING, month INT, day INT, hour INT";
        return new CsvSourceBatchOp()
                .setFilePath(INPUT_FILE)
                .setSchemaStr(schemaStr)
                .setIgnoreFirstLine(true)
                .setFieldDelimiter(",")
                .setQuoteChar('"');
    }

}

pom依赖

        <properties>
		<alink.version>1.6.0</alink.version>
		<alink.flink.major.version>1.13</alink.flink.major.version>
		<alink.scala.major.version>2.11</alink.scala.major.version>
		<flink.version>1.13.6</flink.version>
	</properties>

	<dependencies>
		<!-- alink -->
		<dependency>
			<groupId>com.alibaba.alink</groupId>
			<artifactId>alink_core_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
			<version>${alink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_${alink.scala.major.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_${alink.scala.major.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${alink.scala.major.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
	</dependencies>
@ONI-MIKO
Copy link
Author

ONI-MIKO commented Dec 26, 2022

这个是可复现问题的工程,包含了测试数据
feature-demo.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant