您现在的位置是:网站首页> 编程资料编程资料

Oracle同步数据到kafka的方法_oracle_

2023-05-27 540人已围观

简介 Oracle同步数据到kafka的方法_oracle_

环境准备

软件准备

  • CentOS Linux 7.6.1810 (2台,A主机,B主机)
  • Oracle 11.2.0.4(A主机安装)
  • Kafka 2.13-2.6.0 (B主机安装)
  • kafka-connect-oracle-master (B主机安装,开源程序,用于同步Oracle数据到kafka)
  • apache-maven 3.6.3 (B主机安装,kafka-connect-oracle-master 的打包工具)
  • jdk-8u261-linux-x64.rpm (B主机安装)

下载地址

实施过程

Oracle主机(A)配置

Oracle实例配置项:

  • 开启归档日志
  • 开启附加日志
  • 创建kafka-connect-oracle-master连接用户
  • 创建测试数据生成用户及测试表
--开启归档日志 sqlplus / as sysdba SQL>shutdown immediate SQL>startup mount SQL>alter database archivelog; SQL>alter database open; --开启附加日志 SQL>alter database add supplemental log data (all) columns; --创建kafka-connect-oracle-master连接用户 create role logmnr_role; grant create session to logmnr_role; grant execute_catalog_role,select any transaction ,select any dictionary to logmnr_role; create user kminer identified by kminerpass; grant logmnr_role to kminer; alter user kminer quota unlimited on users; --创建测试数据生成用户及测试表 create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100M autoextend on next 10M; create user whtest identified by whtest default tablespace test_date; grant connect,resource to whtest; grant execute on dbms_scheduler to whtest; grant execute on dbms_random to whtest; grant create job to whtest; create table t1 ( id int , name char(10), createtime date default sysdate ); alter table WHTEST.T1 add constraint PK_ID_T1 primary key (ID) using index tablespace TEST_DATE; create table t2 ( id int , name char(10), createtime date default sysdate ); alter table WHTEST.T2 add constraint PK_ID_T2 primary key (ID) using index tablespace TEST_DATE; create table t3 ( id int , name char(10), createtime date default sysdate ); alter table WHTEST.T3 add constraint PK_ID_T3 primary key (ID) using index tablespace TEST_DATE; begin dbms_scheduler.create_job( job_name=> 't1_job', job_type=> 'PLSQL_BLOCK', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t1 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't2_job', job_type=> 'PLSQL_BLOCK', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t2 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't3_job', job_type=> 'PLSQL_BLOCK', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t3 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t3 every 5 sec'); end; / --JOB创建之后,暂时先diable,待kafka配置完成之后再enable exec DBMS_SCHEDULER.DISABLE('T1_JOB'); exec DBMS_SCHEDULER.DISABLE('T2_JOB'); exec DBMS_SCHEDULER.DISABLE('T3_JOB'); exec DBMS_SCHEDULER.ENABLE('T1_JOB'); exec DBMS_SCHEDULER.ENABLE('T2_JOB'); exec DBMS_SCHEDULER.ENABLE('T3_JOB');

Kafka主机(B)配置

​ 将下载好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上传至B主机/soft目录待使用。

主机hosts文件添加解析

[root@softdelvily ~]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.20.44 softdelvily localhost

安装JDK

[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm warning: jdk-8u261-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY Preparing... ################################# [100%] Updating / installing... 1:jdk1.8-2000:1.8.0_261-fcs ################################# [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar...

配置apache-maven工具

​ 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。

[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/ apache-maven-3.6.3/README.txt apache-maven-3.6.3/LICENSE ..... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 2 root root 6 Apr 11 2018 bin ..... [root@softdelvily local]# vi /etc/profile ....... ##添加如下环境变量 MAVEN_HOME=/usr/local/apache-maven-3.6.3 export MAVEN_HOME export PATH=${PATH}:${MAVEN_HOME}/bin [root@softdelvily local]# source /etc/profile [root@softdelvily local]# mvn -v Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: /usr/local/apache-maven-3.6.3 Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"

配置Kafka 2.13-2.6.0

​ 解压Kafka 2.13-2.6.0 至/usr/local目录。

[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/ kafka_2.13-2.6.0/ kafka_2.13-2.6.0/LICENSE ...... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0 .....

​ 开启kafka,并创建对应同步数据库过的topic

--1、session1 启动ZK [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties [2020-09-23 10:06:49,158] INFO Reading configuration from: ../config/zookeeper.properties ....... [2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager) --2、session2 启动kafka [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties --3、session3 创建cdczztar [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar Created topic cdczztar. [root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets cdczztar

配置kafka-connect-oracle-maste

解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。

--解压zip包 [root@softdelvily soft]# unzip kafka-connect-oracle-master.zip [root@softdelvily soft]# ll total 201180 -rw-r--r--. 1 root root 9506321 Sep 22 16:05 apache-maven-3.6.3-bin.tar.gz -rw-r--r--. 1 root root 127431820 Sep 8 10:43 jdk-8u261-linux-x64.rpm -rw-r--r--. 1 root root 65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz drwxr-xr-x. 5 root root 107 Sep 8 15:48 kafka-connect-oracle-master -rw-r--r--. 1 root root 3522729 Sep 22 14:14 kafka-connect-oracle-master.zip [root@softdelvily soft]# cd kafka-connect-oracle-master/config/ [root@softdelvily config]# ll total 4 -rw-r--r--. 1 root root 1135 Sep 8 15:48 OracleSourceConnector.properties --调整properties配置文件 --需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考README.md [root@softdelvily config]# vi OracleSourceConnector.properties name=oracle-logminer-connector connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector db.name.alias=zztar tasks.max=1 topic=cdczztar db.name=zztar db.hostname=192.168.xx.xx db.
                
                

-六神源码网