-
Notifications
You must be signed in to change notification settings - Fork 5
/
datax.txt
1028 lines (773 loc) · 42.3 KB
/
datax.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
Features
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
DataX详细介绍
请参考:DataX-Introduction
Quick Start
Download DataX下载地址
请点击:Quick Start
Support Data Channels
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南
类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL √ √ 读 、写
Oracle √ √ 读 、写
SQLServer √ √ 读 、写
PostgreSQL √ √ 读 、写
DRDS √ √ 读 、写
通用RDBMS(支持所有关系型数据库) √ √ 读 、写
阿里云数仓数据存储 ODPS √ √ 读 、写
ADS √ 写
OSS √ √ 读 、写
OCS √ √ 读 、写
NoSQL数据存储 OTS √ √ 读 、写
Hbase0.94 √ √ 读 、写
Hbase1.1 √ √ 读 、写
Phoenix4.x √ √ 读 、写
MongoDB √ √ 读 、写
Hive √ √ 读 、写
无结构化数据存储 TxtFile √ √ 读 、写
FTP √ √ 读 、写
HDFS √ √ 读 、写
Elasticsearch √ 写
我要开发新的插件
请点击:DataX插件开发宝典
项目成员
核心Contributions: 光戈、一斅、祁然、云时
感谢天烬、巴真、静行对DataX做出的贡献。
License
This software is free to use under the Apache License Apache license.
请及时提出issue给我们。请前往:DataxIssue
开源版DataX企业用户
Datax-logo
长期招聘 联系邮箱:hanfa.shf@alibaba-inc.com
【JAVA开发职位】
职位名称:JAVA资深开发工程师/专家/高级专家
工作年限 : 2年以上
学历要求 : 本科(如果能力靠谱,这些都不是条件)
期望层级 : P6/P7/P8
岗位描述:
1. 负责阿里云大数据平台(数加)的开发设计。
2. 负责面向政企客户的大数据相关产品开发;
3. 利用大规模机器学习算法挖掘数据之间的联系,探索数据挖掘技术在实际场景中的产品应用 ;
4. 一站式大数据开发平台
5. 大数据任务调度引擎
6. 任务执行引擎
7. 任务监控告警
8. 海量异构数据同步
岗位要求:
1. 拥有3年以上JAVA Web开发经验;
2. 熟悉Java的基础技术体系。包括JVM、类装载、线程、并发、IO资源管理、网络;
3. 熟练使用常用Java技术框架、对新技术框架有敏锐感知能力;深刻理解面向对象、设计原则、封装抽象;
4. 熟悉HTML/HTML5和JavaScript;熟悉SQL语言;
5. 执行力强,具有优秀的团队合作精神、敬业精神;
6. 深刻理解设计模式及应用场景者加分;
7. 具有较强的问题分析和处理能力、比较强的动手能力,对技术有强烈追求者优先考虑;
8. 对高并发、高稳定可用性、高性能、大数据处理有过实际项目及产品经验者优先考虑;
9. 有大数据产品、云产品、中间件技术解决方案者优先考虑。
阿里云开源离线同步工具DataX3.0介绍
一. DataX�3.0概览
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
datax_why_new
设计理念
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
当前使用现状
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。
此前已经开源DataX1.0版本,此次介绍为阿里云开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX
二、DataX3.0框架设计
datax_framework_new
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader�为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
三. DataX3.0插件体系
经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL √ √ 读 、写
Oracle √ √ 读 、写
SQLServer √ √ 读 、写
PostgreSQL √ √ 读 、写
DRDS √ √ 读 、写
达梦 √ √ 读 、写
通用RDBMS(支持所有关系型数据库) √ √ 读 、写
阿里云数仓数据存储 ODPS √ √ 读 、写
ADS √ 写
OSS √ √ 读 、写
OCS √ √ 读 、写
NoSQL数据存储 OTS √ √ 读 、写
Hbase0.94 √ √ 读 、写
Hbase1.1 √ √ 读 、写
MongoDB √ √ 读 、写
Hive √ √ 读 、写
无结构化数据存储 TxtFile √ √ 读 、写
FTP √ √ 读 、写
HDFS √ √ 读 、写
Elasticsearch √ 写
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。详情请看:DataX数据源指南
�四、DataX3.0核心架构
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
datax_arch
核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
五、DataX 3.0六大核心优势
可靠的数据质量监控
完美解决数据传输个别类型失真问题
DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
提供作业全链路的流量、数据量�运行时监控
DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。
提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!
丰富的数据转换功能
DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。
精准的速度控制
还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
"speed": {
"channel": 5,
"byte": 1048576,
"record": 10000
}
强劲的同步性能
DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。性能测试相关详情可以参照每单个数据源的详细介绍:DataX数据源指南
健壮的容错机制
DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
线程内部重试
DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
线程级别重试
目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
极简的使用体验
易用
下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。请点击:Quick Start
详细
DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。
传输过程中打印传输速度、进度等
datax_run_speed
传输过程中会打印进程相关的CPU、JVM等
datax_run_cpu
在任务结束之后,打印总体运行情况
datax_end_info
-----------
使用Shell执行DataX任务最佳实践
更新时间:2019-04-08 16:46:10
编辑 ·
· 我的收藏
本页目录
前提条件
操作步骤
本文将为您介绍如何使用Shell执行DataX任务。
前提条件
使用Shell执行DataX任务前,您需要进行以下准备工作。
创建Shell节点任务,详情请参见Shell节点。
添加自定义资源组,详情请参见新增任务资源。
说明 请在 管理控制台 > 调度资源列表页面而不是数据集成页面添加自定义资源组。
操作步骤
双击创建的Shell节点,填写下述代码。
shell_datax_home='/home/admin/shell_datax'
mkdir -p ${shell_datax_home}
shell_datax_config=${shell_datax_home}/${ALISA_TASK_ID}
echo '''
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "${bdp.system.bizdate}",
"type": "string"
},
{
"value": "${bdp.system.cyctime}",
"type": "string"
},
{
"value": "${params1}__${params2}",
"type": "string"
},
{
"value": 19890427,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
''' > ${shell_datax_config}
params1=$1
params2=$2
datax_params='-p "-Dparams1=${params1} -Dparams2=${params2}"'
echo "`date '+%Y-%m-%d %T'` shell datax config: ${shell_datax_config}"
echo "`date '+%Y-%m-%d %T'` shell datax params: -p \"-Dparams1=${params1} -Dparams2=${params2}\""
/home/admin/datax3/bin/datax.py ${shell_datax_config} -p "-Dparams1=${params1} -Dparams2=${params2}"
shell_datax_run_result=$?
rm ${shell_datax_config}
if [ ${shell_datax_run_result} -ne 0 ]
then
echo "`date '+%Y-%m-%d %T'` shell datax ended failed :("
exit -1
fi
echo "`date '+%Y-%m-%d %T'` shell datax ended success~"
代码说明如下:
生成临时DataX配置文件(您只需修改配置文件内容即可,详情请参见DataX文档。
读取调度参数,分别为$1,$2。
不需配置${bdp.system.bizdate},${bdp.system.cyctime},参数详情请参见参数配置。
执行DataX任务,进行数据同步。
删除临时文件。
判断任务成功或失败,进行返回,0代表成功。
单击右侧的调度配置,进行系统参数的配置,详情请参见调度配置模块的文档。
配置完成后,提交并发布节点任务。
进入运维中心 > 周期任务页面,选择相应的节点修改自定义资源组。
单击相应节点后的测试,并查看执行结果。
======================================================================
DataX调优
DataX调优要分成几个部分,任务机指运行Datax任务所在的机器。
1. 网络本身的带宽等硬件因素造成的影响;
2. DataX本身的参数;
3. 从源端到任务机;
4. 从任务机到目的端;
即当觉得DataX传输速度慢时,需要从上述四个方面着手开始排查。
4.1 网络本身的带宽等硬件因素造成的影响
此部分主要需要了解网络本身的情况,即从源端到目的端的带宽是多少,平时使用量和繁忙程度的情况,从而分析是否是本部分造成的速度缓慢。一下提供几个思路。
1. 可使用从源端到目的端scp的方式观察速度;
2. 结合监控观察任务运行时间段时,网络整体的繁忙情况,来判断是否应将任务避开网络高峰运行;
3. 观察任务机的负载情况,尤其是网络和磁盘IO,观察其是否成为瓶颈,影响了速度;
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
3.2 运行Datax任务
运行Datax任务很简单,只要执行python脚本即可。
python /home/admin/datax3/bin/datax.py ./json/table_1.json
建议真正跑任务时,可按照ODPS迁移指南中给出的批量工具的方式运行
即将所有的命令整理到一个sh文件中,最后再用nohup运行该文件。
cat /root/datax_tools/run_datax.sh
python /home/admin/datax3/bin/datax.py ./json/table_1.json > ./log/table_1.log
#实际运行
nohup /root/datax_tools/run_datax.sh > result.txt 2>&1 &
4.2 DataX本身的参数
1. 可通过增加如下的core参数,去除掉DataX默认对速度的限制;
{
"core":{
"transport":{
"channel":{
"speed":{
"record":-1,
"byte":-1
}
}
}
},
"job":{
...
}
}
2. 针对odpsreader有如下参数可以调节,注意并不是压缩速度就会提升,根据具体情况不同,速度还有可能下降,此项为试验项,需要具体情况具体分析。
...
“parameter”:{
"isCompress":"true",
...
}
3. 针对odpswrtier有如下参数可以调节,其中isCompress选项同reader,blockSizeInMB,为按块写入,后面的值为块的大小。该项值并不是越大越好,一般可以结合tunnel做综合考量。过分大的 blockSizeInMB 可能造成速度波动以及内存OOM。
...
“parameter”:{
"isCompress":"true",
"blockSizeInMB":128,
...
}
4. 针对任务本身,有如下参数可以调节,注意如果调整了tunnel的数量,可能会造成JVM虚拟机崩溃,故需修改相应的参数;
"job": {
"setting": {
"speed": {
"channel": 32
}
}
channel增大,为防止OOM,需要修改datax工具的datax.py文件。
如下所示,可根据任务机的实际配置,提升-Xms与-Xmx,来防止OOM。由此可以看出,tunnel并不是越大越好,过分大反而会影响宿主机的性能。
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
4.3 从源端到任务机
1. 可使用dataX从源端传输分区信息到本机,来观察速度。并和初始任务的速度进行比较,从而判断是哪一部分的原因造成的速度缓慢;具体配置文件如下:
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/root/log",
"fileName": "test_src_result.csv",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd"
}
}
试验时,注意观察任务机本身的IO负载。
2. 如果判断结果是源端的速度慢,可将任务机迁移至源端所在的集群,并再次运行任务,观察结果;
3. 可尝试用odpscmd,直接从源端odps下载分区到本地,和上述结果作比较。如果odpscmd速度快,可尝试调整datax的相关参数;
odpscmd --config=odps_config.ini.src
> tunnel download -thread 20 project_name.table_name/dt='20150101' log.txt;
上述是通过tunnel使用20线程下载数据到本地。
4. 如果是在专有云环境可尝试指定源端的tunnel server的ip进行测试,从而排除从域名到负载均衡部分的网络造成的影响。源端的tunnel server的ip需要咨询云端管理员。
...
“parameter”:{
"tunnelServer":"http://src_tunnel_server_ip",
...
}
注意此步骤可选择负载较低的tunnel_server。
5. 观察源端tunnel server的负载情况,尤其是磁盘IO和网卡的负载,从而判断是否为tunnel sever负载过高造成了资源瓶颈。
6. 观察源端的表结果,当有多个分区键或列过多时,都有可能造成传输的性能下降,此时可考虑换一张表进行测试,以排除表结构等问题造成的影响。
4.4 从任务机到目的端
1. 可使用datax从任务机传输文件分区文件到目的端,来观察速度。并和初始任务的速度进行比较,从而判断是哪一部分的原因造成的速度缓慢;具体配置文件如下:
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/haiwei.luo/case00/data"],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "date",
"format": "yyyy.MM.dd"
}
],
"fieldDelimiter": ","
}
},
2. 如果判断结果是源端的速度慢,可将任务机迁移至源端所在的集群,并再次运行任务,观察结果;
3. 可尝试用odpscmd,直接从本地上传分区到目的端,和上述结果作比较。如果odpscmd速度快,可尝试调整datax的相关参数;
odpscmd --config=odps_config.ini.src
> tunnel upload ./log.txt mingxuantest/pt='20150101';
上述log.txt可为上一步tunnel下载的文件,或自行编写。
4. 如果是在专有云环境可尝试指定指定端的tunnel server的ip进行测试,从而排除从域名到负载均衡部分的网络造成的影响。源端的tunnel server的ip需要咨询云端管理员。
...
“parameter”:{
"tunnelServer":"http://dst_tunnel_server_ip",
...
}
注意此步骤可选择负载较低的tunnel_server。
5. 观察源端tunnel server的负载情况,尤其是磁盘IO和网卡的负载,从而判断是否为tunnel sever负载过高造成了资源瓶颈。
4.5 综合
1. 通过对DataX本身参数,源端到任务机、任务机到目的端的网络、负载等情况综合考量,进行针对各个部分的优化;
2. 同时,可在多台机器上部署DataX,将任务平均分配到多台机器上并发运行,来提高速度;
===============
使用流程
利用datax批量配置工具来生成对应的脚本和json文件。
进行环境的准备,本步骤需要在迁移机上安装odpscmd与datax工具,
其中datax工具和datax批量工具需要python2.6及以上的运行环境;
在datax批量工具的config.ini中进行相关配置,包括源与目的ODPS的accessID与key、odps及tunnel的endpoint、odpscmd与datax的路径等信息;
在tables.ini中填写调试用到的表列表;
运行python datax_tools.py生成对应的脚本和json配置文件;
检查脚本与json配置文件;
运行run_datax.py脚本,批量顺序执行datax任务;
运行check_datax.py脚本,进行条数的检查;
3.2.2.1 批量配置工具
配置源与目的端的基础信息;
读取并校验源与目的端的表结构和分区信息;
根据校验结果,生成DataX所需的json文件;
生成顺序运行Datax迁移命令的脚本文件;
利用select count(*)的方式进行条数检查;
1. DataX是离线数据同步工具,当需要迁移增量时,建议使用DTS,而不是DataX;
2. 针对离线数据,当数据量很大或表非常多时,建议使用DataX。
此时配置文件可编写脚本批量生成,详见ODPS数据迁移指南
同时可以增大DataX本身的并发,并提高运行DataX的任务机数量,来达到高并发,从而实现快速迁移;
DataX 是一个异构数据源离线同步工具
复杂的网状的同步链路变成了星型数据链路,
DataX作为中间传输载体负责连接各种数据源
当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
https://github.com/alibaba/DataX。
DataX3.0框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。
将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
● Reader:Reader为数据采集模块,将数据发送给Framework。
● Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
● Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
三. DataX3.0插件体系
类型 数据源 Reader(读) Writer(写)
RDBMS 关系型数据库 MySQL √ √
Oracle √ √
SQL Server √ √
PostgreSQL √ √
达梦 √ √
通用RDBMS(支持所有关系型数据库) √ √
阿里云数仓数据存储 MaxCompute(原ODPS) √ √
Analytic DB(原ADS) √
OSS √ √
云数据库Memcache版(原OCS) √ √
NoSQL数据存储 Table Store(原OTS) √ √
Hbase0.94 √ √
Hbase1.1 √ √
MongoDB √ √
无结构化数据存储 TxtFile √ √
FTP √ √
HDFS √ √
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源
单机多线程模式完成同步作业运行
1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。
DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
2.
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。
Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,
目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
1. DataXJob根据分库分表切分成了100个Task。
2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
TaskGroup 25
核心优势
● 可靠的数据质量监控
○ 完美解决数据传输个别类型失真问题
DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
○ 提供作业全链路的流量、数据量运行时监控
DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。
并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。
○ 提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。
DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!
● 丰富的数据转换功能
DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。
● 精准的速度控制
还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
"speed": {
"channel": 5,
"byte": 1048576,
"record": 10000
}
● 强劲的同步性能
DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。性能测试相关详情可以参照每单个数据源的详细介绍:DataX数据源指南
● 健壮的容错机制
DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
○ 线程内部重试
DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
○ 线程级别重试
目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
● 极简的使用体验
○ 易用
下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。请点击:Quick Start
○ 详细
DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。
■ 传输过程中打印传输速度、进度等
■ 传输过程中会打印进程相关的CPU、JVM等
■ 在任务结束之后,打印总体运行情况
配置
(1)、Job基本配置
Job基本配置定义了一个Job基础的、框架级别的配置信息,包括:
{
"job": {
"content": [
{
"reader": {
"name": "",
"parameter": {}
},
"writer": {
"name": "",
"parameter": {}
}
}
],
"setting": {
"speed": {},
"errorLimit": {}
}
}
}
(2) Job Setting配置
{
"job": {
"content": [
{
"reader": {
"name": "",
"parameter": {}
},
"writer": {
"name": "",
"parameter": {}
}
}
],
"setting": {
"speed": {
"channel": 1,
"byte": 104857600
},
"errorLimit": {
"record": 10,
"percentage": 0.05
}
}
}
}
job.setting.speed(流量控制)
Job支持用户对速度的自定义控制,channel的值可以控制同步时的并发数,byte的值可以控制同步时的速度
job.setting.errorLimit(脏数据控制)
Job支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值),当Job传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job报错退出。
==============
{
"job": {
"content":[
{
"reader":{
"name":"odpsreader",
"parameter":{
"accessId":"<accessID>",
"accessKey":"******************************",
"column":[
"col_1",
"col_2"
],
"odpsServer":"http://service.odps.aliyun-inc.com/api",
"partition":[
"dt=20160524"
],
"project":"src_project_name",
"splitMode":"record",
"table":"table_name_1"
}
},
"writer":{
"name":"odpswriter",
"parameter":{
"accessId":"<accessId>",
"accessKey":"******************************",
"accountType":"aliyun",
"column":[
"ci_name",
"geohash"
],
"odpsServer":"http://service.odps.xxx.com/api",
"partition":"dt=20160524",
"project":"dst_project_name",
"table":"nb_tab_http"
}
}
}
],
"setting":{
"speed":{
"channel":20
}
}
}
}
1. 整个配置文件是一个job的描述;
2. job下面有两个配置项,content和setting,其中content用来描述该任务的源和目的端的信息,setting用来描述任务本身的信息;
3. content又分为两部分,reader和writer,分别用来描述源端和目的端的信息;
4. 本例中由于源和目的都是ODPS,所以类型为odpsreader和odpswriter。均包含accessId,accessKey与odpsServer的api地址。
5. 同时预迁移表的project,table以及列名和分区信息都要一一填写清楚。
6. setting中的speed项表示同时起20个并发去跑该任务。
============
从MySQL读取数据 写入ODPS
从MySQL读取数据 写入ODPS
第一步、创建作业的配置文件(json格式)
可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py -r mysqlreader -w odpswriter
DataX (DATAX-OPENSOURCE-1.0), From Alibaba !
Copyright (C) 2010-2015, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the odpswriter document:
https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "odpswriter",
"parameter": {
"accessId": "",
"accessKey": "",
"column": [],
"odpsServer": "",
"partition": "",
"project": "",
"table": "",
"truncate": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
第二步、根据配置文件模板填写相关选项
命令打印里面包含对应reader、writer的文档地址,以及配置json样例,根据json样例填空完成配置即可。
根据模板配置json文件(mysql2odps.json)如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "****",
"password": "****",
"column": ["id","age","name"],
"connection": [
{
"table": [
"test_table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test"
]
}
]
}
},
"writer": {
"name": "odpswriter",
"parameter": {
"accessId": "****",
"accessKey": "****",
"column": ["id","age","name"],
"odpsServer": "http://service.odps.aliyun.com/api",
"partition": "pt='datax_test'",
"project": "datax_opensource",
"table": "datax_opensource_test",
"truncate": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
第三步:启动DataX
$ cd {YOUR_DATAX_DIR_BIN}
$ python datax.py ./mysql2odps.json
同步结束,显示日志如下:
...
2015-12-17 11:20:25.263 [job-0] INFO JobContainer -
任务启动时刻 : 2015-12-17 11:20:15
任务结束时刻 : 2015-12-17 11:20:25
任务总计耗时 : 10s
任务平均流量 : 205B/s
记录写入速度 : 5rec/s
读出记录总数 : 50
读写失败总数 : 0
如果您在试用Windows发现datax打印乱码,请参考:Windows乱码问题解决
从MySQL读取数据 写入ODPS,通过crontab命令实现
配置定时任务(Linux环境):
从MySQL读取数据 写入ODPS,通过crontab命令实现
前置条件:安装crond服务,并已启动
#查看crond服务是否启动,出现以下日志表示已启动
$/sbin/service crond status
crond (pid 30742) is running...
第一步:创建作业的配置文件(json格式) 参考上节内容。这里假设已配置好MySQL到ODPS的配置文件mysql2odps.json
第二步:列出列出crontab文件,命令: crontab -l
(1)若出现以下日志,表示当前用户没有定时任务,用户需要新建crontab文件,并提交crontab文件,参考第三步。
$crontab -l
no crontab for xxx
(2)若出现以下日志,表示当前用户已经有正在运行的定时任务,用户只需用命令crontab -e 来编辑crontab文件,参考第四步。 shell $ crontab -l 0,10,20,35,44,50 * * * * python /home/admin/datax3/bin/datax.py /home/admin/mysql2odps.json >>/home/hanfa.shf/log.`date +\%Y\%m\%d\%H\%M\%S` 2>&1
第三步:若当前用户没有定时任务(之前从未创建过crontab任务)
(1)创建一个新的crontab文件,如取名crondatax
示例1:每天13点5分进行同步作业,并把运行日志输出到目录/home/hanfa.shf/下log.运行时间 文件中,如定时运行是在2016-3-26 13:10:13运行的,产生的日志文件名为:log.20160326131023
$ vim crondatax
#输入以下内容
5 13 * * * python /home/admin/datax3/bin/datax.py /home/admin/mysql2odps.json >>/home/hanfa.shf/log.`date +\%Y\%m\%d\%H\%M\%S` 2>&1
#/home/admin/datax3/bin/datax.py 表示你安装的DataX datax.py所在目录(请替换为您真实的绝对路径目录);
#/home/admin/mysql2odps.json 表示作业的配置文件目录(请替换为您真实的绝对路径目录);
#/home/hanfa.shf/log.`date +\%Y\%m\%d\%H\%M\%S` 表示日志输出位置,并以log.当前时间 命名(请替换为您真实的绝对路径目录)
(2)提交你刚刚创建的crontab文件 shell $ crontab crondatax #crondatax 你创建的crontab文件名
(3)重启crontab服务 shell $ sudo /etc/init.d/crond restart Stopping crond: [ OK ] Starting crond: [ OK ] (4)在13点5分过后,在日志目录会看到对应的日文件 shell $ ls -al /home/hanfa.shf/ -rw-r--r-- 1 hanfa.shf users 12062 Mar 26 13:05 log.20160326130501
第四步:若当前用户已有定时任务(想继续增加定时任务)
(1)编辑已有crontab文件
示例2:每10分钟运行一次同步任务,并把运行日志输出到目录/home/hanfa.shf/下log.运行时间 文件中,如定时运行是在2016-3-26 13:10:13运行的,产生的日志文件名为:log.20160326131023
$ crontab -e
#会进入已有crontab文件编辑界面,继续增加定时任务即可,
本示例增加以下内容,并保存
0,10,20,30,40,50 * * * * python /home/admin/datax3/bin/datax.py /home/admin/mysql2odps.json >>/home/hanfa.shf/log.`date +\%Y\%m\%d\%H\%M\%S` 2>&1
(2)重启crontab服务
```shell
$ sudo /etc/init.d/crond restart Stopping crond: [ OK ] Starting crond: [ OK ] ```