温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何使用ogg将Oracle数据传输到flume刷到kafka

发布时间:2021-09-01 14:35:14 来源:亿速云 阅读:169 作者:chen 栏目:网络安全

本篇内容主要讲解“如何使用ogg将Oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!

源端测试服务器

服务器环境部署:

命令步骤如下:

[root@test ~]# groupadd oinstall

[root@test ~]# groupadd dba

[root@test ~]# useradd -g oinstall -G dba oracle

[root@test ~]#

修改权限:

[root@test ~]#  chown -R oracle:oinstall /data

[root@test ~]# 

2. 设置全局java环境变量

[root@test ~]# cat /etc/redhat-release 

CentOS release 6.4 (Final)

[root@test ~]#

[oracle@test data]$ tar -zxvf jdk-8u60-linux-x64.tar.gz

在root下执行配置:

设置java环境变量:

vi /etc/profile

###jdk

export JAVA_HOME=/data/jdk1.8.0_60

export JAVA_BIN=/data/jdk1.8.0_60/bin

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME JAVA_BIN PATH CLASSPATH

export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH

切换Oracle用户核对:

[root@test ~]# su - oracle

[oracle@test ~]$ java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[oracle@test ~]$

如果不生效:

修改java环境变量:

alternatives --install /usr/bin/java java /data/jdk1.8.0_60/bin/java 100 

alternatives --install /usr/bin/jar jar /data/jdk1.8.0_60/bin/jar 100 

alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100 

update-alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100

#  /usr/sbin/alternatives --config java

[root@test1 data]#   /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

  Selection    Command

-----------------------------------------------

   1           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

*+ 2           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   3           /usr/lib/jvm/jre-1.5.0-gcj/bin/java

   4           /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 4

[root@test1 data]# /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

  Selection    Command

-----------------------------------------------

   1           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

*  2           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   3           /usr/lib/jvm/jre-1.5.0-gcj/bin/java

 + 4           /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 

[root@test1 data]# 

[root@test1 data]# java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[root@test1 data]# 

修改flume 参数配置:

[oracle@test1 conf]$ cat flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.  The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

#  http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied.  See the License for the

# specific language governing permissions and limitations

# under the License.

# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'agent'

agent.sources = r1

agent.channels = fileChannel

agent.sinks = kafkaSink

# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = fileChannel

#

agent.sources.r1.type = avro

agent.sources.r1.port = 14141

agent.sources.r1.bind = 192.168.88.66

agent.sources.r1.channels = fileChannel

# Each sink's type must be defined

agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use

agent.sinks.loggerSink.channel = memoryChannel

#kafka sink

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkaSink.topic = my_schema 

agent.sinks.kafkaSink.brokerList = 192.168.88.1:9092,192.168.88.2:9092,192.168.88.3:9092,192.168.88.4:9092

agent.sinks.kafkaSink.requiredAcks = 1

agent.sinks.kafkaSink.batchSize = 20

agent.sinks.kafkaSink.channel = fileChannel

# Each channel's type is defined.

agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100

#File Channel

agent.channels.fileChannel.type = file

agent.channels.fileChannel.transactionCapacity = 20000000

agent.channels.fileChannel.capacity = 50000000

agent.channels.fileChannel.maxFileSize = 2147483648

agent.channels.fileChannel.minimumRequiredSpace = 52428800

agent.channels.fileChannel.keep-alive = 3

agent.channels.fileChannel.checkpointInterval = 20000

agent.channels.fileChannel.checkpointDir = /data/apache-flume-1.6.0-bin/CheckpointDir

agent.channels.fileChannel.dataDirs = /data/apache-flume-1.6.0-bin/DataDir

[oracle@test1 conf]$ 

############配置OGG

主库在

源库创建新的抽取进程:

dblogin userid goldengate, password goldengate

add extract EXTJMS,tranlog, threads 2,begin now

add exttrail /data/goldengate/dirdat/kf, extract EXTJMS megabytes 200

add schematrandata my_schema

add trandata my_schema.*

原抽取进程:

extract EXTJMS

setenv (ORACLE_SID="testdb")

setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8") 

userid goldengate, password goldengate

TRANLOGOPTIONS DBLOGREADER

exttrail /data/goldengate/dirdat/kf

discardfile  /data/goldengate/dirrpt/EXTJMS.dsc,append

THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000

numfiles 3000

CHECKPOINTSECS 20

DISCARDROLLOVER AT 05:30

dynamicresolution

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

ddl &

include mapped &

exclude objtype 'TRIGGER' &

exclude objtype 'PROCEDURE' &

exclude objtype 'FUNCTION' &

exclude objtype 'PACKAGE' &

exclude objtype 'PACKAGE BODY' &

exclude objtype 'TYPE' &

exclude objtype 'GRANT' &

exclude instr 'GRANT' &

exclude objtype 'DATABASE LINK' &

exclude objtype 'CONSTRAINT' &

exclude objtype 'JOB' &

exclude instr 'ALTER SESSION' &

exclude INSTR 'AS SELECT' &

exclude INSTR 'REPLACE SYNONYM' &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"

FETCHOPTIONS  NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT

TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;

--extract table user

TABLE my_schema.*;

SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;

Database altered.

SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;

Database altered.

SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;

SUPPLEME SUP SUP FOR

-------- --- --- ---

YES      YES YES YES

SQL> 

源端添加新的pump进程:

在my_schema源库测试添加pump进程:

添加pump进程:

添加新的pump:

add extract EDPKF,exttrailsource /data/goldengate/dirdat/kf, begin now

edit param EDPKF

EXTRACT EDPKF

setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)

PASSTHRU

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

RMTHOST 192.168.88.66, MGRPORT 7839

RMTTRAIL /data/ogg_for_bigdata/dirdat/kp

DISCARDFILE ./dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5

TABLE my_schema.* ;

add rmttrail /data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200

edit  param defgen

userid goldengate, password goldengate

defsfile dirdef/my_schema.def

TABLE my_schema.*;

传递定义文件:

./defgen paramfile ./dirprm/defgen.prm

目标端直接端

mgr:

PORT 7839

DYNAMICPORTLIST 7840-7850

--AUTOSTART replicat *

--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10

PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

添加 UE DATA PUMP:

 使用版本:

Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209

ADD EXTRACT LOANFLM, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kp

edit param JMSFLM

GGSCI (localhost.localdomain) 18> view param JMSFLM

EXTRACT JMSFLM

SETENV (GGS_USEREXIT_CONF ="dirprm/JMSFLM.props")

GetEnv (JAVA_HOME)

GetEnv (PATH)

GetEnv (LD_LIBRARY_PATH)

SourceDefs dirdef/my_schema.def

CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores

GetUpdateBefores

NoCompressDeletes

NoCompressUpdates

NoTcpSourceTimer

TABLEEXCLUDE my_schema.MV*;

TABLE my_schema.*;

--alter prodjms extseqno 736, extrba 0

注释: 在目标端完全可以不安装Oracle数据库,可以和flume环境放在一起,最终刷数据到kafka的服务器接收消息。

本案例是 通过flume中转实现的,完全没有问题。

当然也可以直接将数据传输到kafka处理消息,原理都是一样的。

未来更多的大数据融合也是一个不错的方案,无论是mysqlmongodb,hdfs等都可以完美结合。

 参数文件:

$ cat JMSFLM.props

gg.handlerlist=flumehandler

gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler

gg.handler.flumehandler.host=192.168.88.66

gg.handler.flumehandler.port=14141

gg.handler.flumehandler.rpcType=avro

gg.handler.flumehandler.delimiter=\u0001

gg.handler.flumehandler.mode=op

gg.handler.flumehandler.includeOpType=true

# Indicates if the operation timestamp should be included as part of output in the delimited separated values

# true - Operation timestamp will be included in the output

# false - Operation timestamp will not be included in the output

# Default :- true

#gg.handler.flumehandler.includeOpTimestamp=true

#gg.handler.name.deleteOpKey=D

#gg.handler.name.updateOpKey=U

#gg.handler.name.insertOpKey=I

#gg.handler.name.pKUpdateOpKey=P

#gg.handler.name.includeOpType=true

# Optional properties to use the transaction grouping functionality

#gg.handler.flumehandler.maxGroupSize=1000

#gg.handler.flumehandler.minGroupSize=1000

### native library config ###

goldengate.userexit.nochkpt=TRUE

goldengate.userexit.timestamp=utc

goldengate.log.logname=cuserexit

goldengate.log.level=DEBUG

goldengate.log.tofile=true

goldengate.userexit.writers=javawriter

goldengate.log.level.JAVAUSEREXIT=DEBUG

#gg.brokentrail=true

gg.report.time=30sec

gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*

javawriter.stats.full=TRUE

javawriter.stats.display=TRUE

javawriter.bootoptions=-Xmx81920m -Xms20480m -Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties

到此,相信大家对“如何使用ogg将Oracle数据传输到flume刷到kafka”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI