`
aswang
  • 浏览: 837984 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

基于Oracle Streams + Oracle AQ 捕获变更,发布变更(一)

阅读更多
要求:使用Oracle Streams捕获某个用户下部分表的DML操作变更,并通过Oracle的AQ(高级队列)对外发布,然后Java端通过JMS来获取变更,并执行后续同步操作。
数据库部分:
1、使用Streams要求Oracle以归档模式运行,归档日志默认存放在DB_RECOVERY_FILE_DEST指定的位置,由于该区域有大小限制,
所以,为了避免空间不足导致的后续问题,首先需要修改Oracle归档日志目录
 
alter system set log_archive_dest='/home/dev/app/dev/oradata/archivelog';
--其中archivelog目录为手动创建
--如果该语句执行失败,可以先尝试将DB_RECOVERY_FILE_DEST置空,再执行上述语句。
alter system set DB_RECOVERY_FILE_DEST='';
 
2、修改数据库为归档模式运行:
shutdown immediate;
startup nomount;
alter database mount;
alter database archivelog;
archive log list;
3、修改系统参数:
alter system set aq_tm_processes=2 scope=both;
alter system set global_names=true scope=both;
alter system set job_queue_processes=10 scope=both;
alter system set parallel_max_servers=20 scope=both;
alter system set undo_retention=3600 scope=both;
alter system set streams_pool_size=25M scope=spfile; 
 
4、更改数据设置,增加补充日志信息
alter database add supplemental log data;
--该语句是在数据库级别添加的补充日志,也可以单独为表设置
alter table user add supplemental log group g_user (id) always;
 
5、创建用于Oracle Streams的表空间和用户
create tablespace streams_tbs datafile '/home/dev/app/dev/oradata/stream_tbs.dbf'
  size 25M Reuse autoextend on maxsize unlimited;
 
create user stradmin
identified by stradmin
default tablespace streams_tbs quota unlimited on streams_tbs;  
6、授权
grant dba,select_catalog_role to strmadmin; 
 
--执行系统存储过程,分配权限
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
 privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
 grantee => 'stradmin',
 grant_option => FALSE);
END;
/
 
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
 privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
 grantee => 'stradmin',
 grant_option => FALSE);
END;
/
 
7、创建用于捕获进程的队列:
exec DBMS_STREAMS_ADM.SET_UP_QUEUE(
  queue_table => 'streams_queue_table', 
  queue_name => 'streams_queue');
 
8、创建用于捕获数据变更的捕获进程:
begin
   dbms_streams_adm.add_schema_rules(
     schema_name => 'scott',
     streams_type => 'capture',
     streams_name => 'capture_jms',
     queue_name => 'strmadmin.streams_queue',
     include_dml => true,
     include_ddl => false,
  source_database => 'orcl',
     inclusion_rule => true,
  and_condition => ':lcr.get_compatible()<dbms_streams.max_compatible()'
  );
 end;
/
说明:其中include_dml和include_ddl用于指明是否希望捕获DML操作和DDL操作。
and_condition用于添加捕获规则。
该捕获进程会捕获scott用户下所有表的dml变更
 
9、创建用于Oracle AQ的消息队列
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table => 'jms_queue_table',
        Queue_payload_type => 'SYS.AQ$_jms_bytes_message',
        multiple_consumers => true);
   DBMS_AQADM.CREATE_QUEUE(
      Queue_name => 'jms_queue',
      Queue_table => 'jms_queue_table');
   DBMS_AQADM.START_QUEUE(queue_name=> 'jms_queue');
END;
/
说明:
    应用进程会将streams_queue队列中的消息出队,然后经过转换插入到该队列中。
    queue_payload_type用于指定该队列存放的用户数据的类型。
 
10、初始化SCN
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN(
 source_schema_name => 'scott',
    source_database_name => 'orcl',
    instantiation_scn => iscn,
    recursive => true);
END;
/
11、为AQ消息队列创建代理和订阅者
--6、为消息队列创建代理
exec DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'jms_agent');
exec DBMS_AQADM.ENABLE_DB_ACCESS(agent_name => 'jms_agent',
  db_username => 'strmadmin');
 
DECLARE
 subscriber SYS.AQ$_AGENT;
BEGIN
 subscriber := SYS.AQ$_AGENT('jms_agent', NULL, NULL);
 SYS.DBMS_AQADM.ADD_SUBSCRIBER(
  queue_name => 'strmadmin.jms_queue',
  subscriber => subscriber,
  rule => NULL,
  transformation => NULL);
END;
/
备注:这里的agent_name和jms_queue会在java的JMS中使用到。
 
12、创建应用进程
BEGIN
   dbms_streams_adm.add_schema_rules(
      schema_name => 'scott',
      streams_type => 'apply',
      streams_name => 'apply_jms',
      queue_name => 'strmadmin.streams_queue',
      include_dml => true,
      include_ddl => false,
      source_database => 'orcl',
      inclusion_rule => true);
END;
/
说明:默认情况下,应用进程会将streams_queue中的消息(LCR)出队,然后将消息对应的变更应用到指定的数据库中。
但在这里,我们希望通过应用进程进行消息转换,并插入到AQ的消息队列中,所以后面还需要针对目标表设置处理器。
 
13、创建用于应用进程的消息处理存储过程,对LCR进行转换,并插入到jms_queue队列
CREATE OR REPLACE PROCEDURE enq_jms_lcr(in_any IN SYS.ANYDATA) IS
  message SYS.AQ$_jms_bytes_message;
  enqueue_options dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  msgid raw(16);
  lcr SYS.LCR$_ROW_RECORD;
  rc PLS_INTEGER;
  agent sys.aq$_agent := sys.aq$_agent('jms_agent', null, NULL);
 
  ID number;
  tablename varchar2(100);
  action varchar2(100);
  updatetime date;
BEGIN
 rc := in_any.GETOBJECT(lcr);
 tablename := lcr.get_object_name();
 action := lcr.get_command_type();
 updatetime := lcr.get_source_time();
 
 if action = 'UPDATE' or action = 'DELETE' then
  ID := lcr.get_value('OLD', 'ID').ACCESSnumber();
 end if;
 
 if action = 'INSERT' then
  ID := lcr.get_value('NEW', 'ID').ACCESSnumber();
 end if;
 
 message := SYS.AQ$_jms_bytes_message.construct;
    message.set_type('map');
    message.set_userid('strmadmin');
    message.set_appid('jms_sync');
    message.set_groupid('jms');
    message.set_groupseq(1);
 
 message.set_long_property('id',ID);
 message.set_long_property('areacode',areacode);
 message.set_string_property('tablename', tablename);
 message.set_string_property('action', action);
 
 dbms_aq.enqueue(queue_name => 'strmadmin.jms_queue',
                  enqueue_options => enqueue_options,
                  message_properties => message_properties,
                  payload => message,
                  msgid => msgid);
END;
/
说明:在这个存储过程中,我们仅仅抓取了数据变更的关键信息:主键ID,表名,DML动作以及时间。
然后将信息封装在SYS.AQ$_jms_bytes_message中,并放入队列jms_queue中。
 
14、创建一个空的消息处理存储过程
CREATE OR REPLACE PROCEDURE empty_jms_lcr(in_any IN SYS.ANYDATA) IS
i number;
BEGIN
i:= 1;
END;
/
说明:由于前面的捕获进程会捕获用户下所有受支持的表的dml变更,但有少数表,我们不希望对其处理,
这个时候我们就可以通过一个什么都不做的存储过程,来覆盖应用进程的默认行为。否则,应用进程会默认将
这些表的变更进行处理,这时就会报错ORA-25215 user_data type and queue type do not match.
 
15、在上面建立了两个消息处理存储过程以后,我们需要将其设置为对应表的处理器。
--工具函数
create or replace PROCEDURE set_dml_handler(tablename varchar2, action varchar2, handler varchar2) is
begin
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => tablename ,
    object_type => 'TABLE',
    operation_name => action,
    error_handler => false,
    user_procedure => handler,
    apply_database_link => NULL);
end;
/
 
--为目标表配置处理器stradmin.enq_jms_lcr
declare
  here number;
  type array_type is table of varchar2(100);
  --定义需要同步的表
  tables array_type := array_type('scott.USER');
 
  cursor c_table is
    select T.OWNER||'.'||T.TABLE_NAME tablename
      from dba_tables t
     where t.owner = 'scott'
       and instr(t.table_name,'EST') = 1 ;
 
begin
  for t in c_table loop
  here := 0;
    for i IN 1..tables.count loop
      if tables(i) = t.tablename then
        here := 1;
      end if;
    end loop;
 
 if here = 1 then
  dbms_output.put_line('enq_jms_lcr : ' || t.tablename);
        set_dml_handler(t.tablename, 'UPDATE', 'strmadmin.enq_jms_lcr');
        set_dml_handler(t.tablename, 'INSERT', 'strmadmin.enq_jms_lcr');
        set_dml_handler(t.tablename, 'DELETE', 'strmadmin.enq_jms_lcr');
 end if;
 
 if here = 0 then
  dbms_output.put_line('empty_jms_lcr : ' || t.tablename);
        set_dml_handler(t.tablename, 'UPDATE', 'strmadmin.empty_jms_lcr');
        set_dml_handler(t.tablename, 'INSERT', 'strmadmin.empty_jms_lcr');
        set_dml_handler(t.tablename, 'DELETE', 'strmadmin.empty_jms_lcr');
 end if;
  end loop;
end;
/
说明:上述存储过程,对应希望同步的表会设置enq_jms_lcr,而对于其它表则设置empty_jms_lcr。
 
16、启动应用进程以及捕获进程
exec DBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'apply_jms',parameter => 'disable_on_error',value => 'n');
exec DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_jms');
exec DBMS_CAPTURE_ADM.START_CAPTURE(capture_name => 'capture_jms');
17、在完成上述操作以后,我们可以尝试对USER表进行INSERT、UPDATE和DELTE操作,然后查看jms_queue_table表中是否包含对应的转换后的数据
select t.rowid, t.* from jms_queue_table t;
如果有对应的记录,则说明配置成功,后面可以编写java代码来接受消息并处理了。
 
过程中的坑:
        在为应用进程创建消息处理存储过程以后,应用进程运行一次或几次之后,总会报错,状态为ABORT,错误信息为:ORA-25215 user_data type and queue type do not match.
错误原因描述的很清楚了,但是在检查了前后的语句,并没有发现数据类型与队列的数据类型不一致,
存储过程中放入队列的数据类型与创建队列时的类型均为SYS.AQ$_jms_bytes_message。
在google了N多信息,大体的解释都一样,说是保证进入队列的数据类型与队列表中的数据类型一致。
      于是在这个地方卡了很久,最后突然意识到,上面的捕获进程默认捕获用户下所有的表的变更,
而应用进程会自动的从捕获进程的队列strems_queue中获取所有变更,并将其应用到jms_queue队列。
但在刚开始时,默认只给需要同步的表设置了消息处理器,那么其它的表的变更消息在应用进程获取以后,
就没有转换为SYS.AQ$_jms_bytes_message,而是直接尝试进入队列,于是就报错类型不匹配了。
      在知道这个原因后,就为其它不需要同步的表,建立了一个空的消息处理器,来覆盖应用进程默认的行为,
从而确保应用进程不再报错终止运行。
 
 
错误排查:
查看捕获进程状态:
SELECT t.capture_name, t.capture_user, t.capture_type, t.status FROM DBA_CAPTURE t Where t.capture_name = 'CAPTURE_JMS';
 
查看应用进程状态:
select apply_name, queue_name, status, t.error_number, t.error_message from dba_apply t;
 
查看应用进程错误信息:
select t.rowid, t.* from dba_apply_error t where t.apply_name = 'APPLY_JMS';
查看消息队列:
SELECT t.rowid, t.* from JMS_QUEUE_TABLE t;
查看相关规则:
select * from dba_streams_rules;
分享到:
评论

相关推荐

    高校学生选课系统项目源码资源

    项目名称: 高校学生选课系统 内容概要: 高校学生选课系统是为了方便高校学生进行选课管理而设计的系统。该系统提供了学生选课、查看课程信息、管理个人课程表等功能,同时也为教师提供了课程发布和管理功能,以及管理员对整个选课系统的管理功能。 适用人群: 学生: 高校本科生和研究生,用于选课、查看课程信息、管理个人课程表等。 教师: 高校教师,用于发布课程、管理课程信息和学生选课情况等。 管理员: 系统管理员,用于管理整个选课系统,包括用户管理、课程管理、权限管理等。 使用场景及目标: 学生选课场景: 学生登录系统后可以浏览课程列表,根据自己的专业和兴趣选择适合自己的课程,并进行选课操作。系统会实时更新学生的选课信息,并生成个人课程表。 教师发布课程场景: 教师登录系统后可以发布新的课程信息,包括课程名称、课程描述、上课时间、上课地点等。发布后的课程将出现在课程列表中供学生选择。 管理员管理场景: 管理员可以管理系统的用户信息,包括学生、教师和管理员账号的添加、删除和修改;管理课程信息,包括课程的添加、删除和修改;管理系统的权限控制,包括用户权限的分配和管理。 目标: 为高校学生提

    TC-125 230V 50HZ 圆锯

    TC-125 230V 50HZ 圆锯

    影音娱乐北雨影音系统 v1.0.1-bymov101.rar

    北雨影音系统 v1.0.1_bymov101.rar 是一个计算机专业的 JSP 源码资料包,它为用户提供了一个强大而灵活的在线影音娱乐平台。该系统集成了多种功能,包括视频上传、播放、分享和评论等,旨在为用户提供一个全面而便捷的在线视频观看体验。首先,北雨影音系统具有强大的视频上传功能。用户可以轻松地将本地的视频文件上传到系统中,并与其他人分享。系统支持多种视频格式,包括常见的 MP4、AVI、FLV 等,确保用户能够方便地上传和观看各种类型的视频。其次,该系统提供了丰富的视频播放功能。用户可以选择不同的视频进行观看,并且可以调整视频的清晰度、音量等参数,以适应不同的观看需求。系统还支持自动播放下一个视频的功能,让用户可以连续观看多个视频,无需手动切换。此外,北雨影音系统还提供了一个社交互动的平台。用户可以在视频下方发表评论,与其他观众进行交流和讨论。这为用户之间的互动提供了便利,增加了观看视频的乐趣和参与感。最后,该系统还具备良好的用户体验和界面设计。界面简洁明了,操作直观易用,让用户可以快速上手并使用各项功能。同时,系统还提供了个性化的推荐功能,根据用户的观看历史和兴趣,为用户推荐

    Tripp Trapp 儿童椅用户指南 STOKKE

    Tripp Trapp 儿童椅用户指南

    node-v8.13.0-linux-armv6l.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    谷歌浏览器 64位-89.0.4389.128.exe

    Windows版本64位谷歌浏览器,是由Google谷歌公司开发的一款电脑版网络浏览器,可以运行在Windows 10/8.1/8/7 64位的操作系统上。该浏览器是基于其它开放原始码软件所撰写,包括WebKit和Mozilla,目标是提升稳定性、速度和安全性,并创造出简单且有效率的使用者界面。软件的特点是简洁、快速。并且支持多标签浏览,每个标签页面都在独立的“沙箱”内运行,在提高安全性的同时,一个标签页面的崩溃也不会导致其他标签页面被关闭。此外,谷歌浏览器(Google Chrome)基于更强大的JavaScript V8引擎,这是当前Web浏览器所无法实现的。

    适用于鲲鹏麒麟的OpenJDK1.8

    适用于鲲鹏麒麟的OpenJDK1.8

    毕业设计-基于SSH的任务调度系统的设计与实现

    任务调度试系统,基本功能包括:用户的注册、用户的登录、发起项目、项目详细及搜索等。本系统结构如下: (1)用户的注册登录: 注册模块:完成用户注册功能; 登录模块:完成用户登录功能; (2)发起项目: 发起项目模块:完成了项目及项目下一个或者多个任务的添加; 项目详细:点击项目名称,可以看到项目及任务详细信息; 搜索项目:完成对项目名称的模糊搜索功能 任务调度试系统,基本功能包括:用户的注册、用户的登录、发起项目、项目详细及搜索等。本系统结构如下: (1)用户的注册登录: 注册模块:完成用户注册功能; 登录模块:完成用户登录功能; (2)发起项目: 发起项目模块:完成了项目及项目下一个或者多个任务的添加; 项目详细:点击项目名称,可以看到项目及任务详细信息; 搜索项目:完成对项目名称的模糊搜索功能

    30个炫酷的数据可视化大屏(含源码)

    大屏数据可视化是以大屏为主要展示载体的数据可视化设计,30个可视化大屏包含源码,直接运行文件夹中的index.html,即可看到大屏。 内含:数据可视化页面设计;数据可视化演示系统;大数据可视化监管平台;智能看板;翼兴消防监控;南方软件视频平台;全国图书零售监测数据;晋城高速综合管控大数据;无线网络大数据平台;设备大数据;游戏数据大屏;厅店营业效能分析;车辆综合管控平台;政务大数据共享交换平台;智慧社区;物流云数据看板平台;风机可视化大屏等。

    基于yolov5识别算法实现的DNF自动脚本源码.zip

    优秀源码设计,详情请查看资源源码内容

    毕业设计:基于SSM的mysql-在线网上书店(源码 + 数据库 + 说明文档)

    毕业设计:基于SSM的mysql_在线网上书店(源码 + 数据库 + 说明文档) 2.系统分析与设计 3 2.1系统分析 3 2.1.1需求分析 3 2.1.2必要性分析 3 2.2系统概要设计 3 2.2.1 项目规划 3 2.2.2系统功能结构图 4 2.3开发及运行环境 4 2.4逻辑结构设计 5 2.4.1 数据库概要说明 5 2.4.2 主要数据表结构 6 2.5文件夹架构 9 2.6编写JAVA BEAN 9 3.网站前台主要功能模块设计 10 3.1前台首页架构设计 10 3.2网站前台首页设计 11 3.3新书上市模块设计 12 3.4特价书籍模块设计 13 3.5书籍分类模块设计 14 3.6会员管理模块设计 15 3.7购物车模块设计 17 3.8收银台设计模块 19 3.9畅销书籍模块设计 20 4.网站后台主要功能模块设计 21 4.1网站后台文件夹架构设计 21 4.2后台主页面设计 21 4.3书籍管理模块设计 22 4.4会员管理模块设计 25 4.5订单管理模块设计 26 4.6公告管理模块设计 28 4.7退出系统页面设计 29 5.网站制作中遇到的问

    python 开发 python爬虫数据可视化分析项目源码加课题报告,源码注解清晰一看就懂,适合新手.zip

    python 开发 python爬虫数据可视化分析项目源码加课题报告,源码注解清晰一看就懂,适合新手

    node-v8.0.0-linux-armv7l.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    使用FPGA发送一个经过曼彻斯特编码的伪随机序列

    rtl中存放的是设计文件 sim中存放的是仿真文件

    基于Java的班级管理系统课程设计源码

    附件是基于 Java的班级管理系统课程设计源码,包含程序说明和运行环境要求,文件绿色安全,仅供学习交流使用,欢迎大家下载学习交流!

    最新获取QQ微信头像橘头像阁PHP源码下载.rar

    最新获取QQ微信头像橘头像阁PHP源码下载.rar最新获取QQ微信头像橘头像阁PHP源码下载.rar

    K-750 管道疏通机手册

    K-750 管道疏通机手册 Drain Cleaner Manual K-750 Drain Cleaning Machine

    基于哈希链表的简单人员信息管理系统

    实现基于哈希表的员工信息管理系统,该系统主要用于处理员工信息,主要包括员工个人信息的录入、删除、查找、修改等,同时支持数据的导入导出

    node-v6.16.0.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    3D模型007,可用于建模、GIS、BIM、CIM学习

    3D模型007,可用于建模、GIS、BIM、CIM学习

Global site tag (gtag.js) - Google Analytics