您现在的位置是:首页>博客详情

oracle 高级队列技术(1)

FreshMan2022年12月28日 10:10Oracle,AQ,队列,队列表276

简介Oracle AQ是Oracle数据库中集成的一种消息队列机制,可以用于不同应用程序间的消息交互,例如PL/SQL可以通过相应的Package访问队列、C#应该程序可以通过ODP.NET访问队列、Java应用程序则可以通过OJMS访问队列。

Oracle AQ是Oracle数据库中集成的一种消息队列机制,可以用于不同应用程序间的消息交互,例如PL/SQL可以通过相应的Package访问队列、C#应该程序可以通过ODP.NET访问队列、Java应用程序则可以通过OJMS访问队列。AQ内部是通过数据库表实现的(即消息实际上是存储在数据库表中)。

高级队列的应用范围非常广泛。 除了提供消息在oracle数据库和应用程序以及用户之间传送的功能之外,利用oracle Net Services消息还可以在oracle数据库的客户端和服务端之间或两个数据库之间以及一个oracle 队列到另一个队列之间传递。 而且,基于HTTP/HTTPS或SMTP等传输协议,我们还可以通过Internet执行高级队列操作。 此外ORACLE高级队列通过消息处理网关可以实现同现存的非oracle消息系统的无缝集成。

另外,由于oracle 高级队列集成于数据库,它便具有一些其它消息队列所不具备的特殊优势:

首先,它的操作继承了数据库的所有优点,例如可靠性’完整性’高可用性’安全性以及可伸缩性等。

其次,消息的管理大大方便了, 由于采用数据库表存储消息,因此用户可以利用标准的SQL 语句访问消息信息,包括消息的属性、历史消息、消息负载。同样可以对消息进行审计和跟踪,利用索引可以更好地优化消息管理。

第三,同其它数据库表一样,队列表还可以被导入、导出。

 高级队列消息传递机制
oracle高级队列的具体消息传递机制大致如下:消息“ 生产者” 把消息装入队列( 称为Enqueue,入列) ,消息“消费者”从队列中取消息( 称为Dequeue,出列) 。队列表以数据库表的形式存在,队列存储在队列表中。

下面展示了PL/SQL中使用AQ的基本用法,示例假设了一个场景:A是一个被频繁调用的存储过程,每次调用A之前需要调用过程B,B消耗大量的时间,假设A的执行并不依赖于B的执行结果,我们可以把调用B的上下文先存入AQ中,而后异步地进行处理,从而减小了B对应用程序性能的影响。

1.    创建AQ所需要的权限

   DBMS_AQ  user1;   DBMS_AQADM  user1;
  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(, , FALSE);
  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(, , FALSE);;

需要使用sys或system用户发起这些授权语句,欲创建及管理AQ,需要获得两个至关重要的包dbms_aq, dbms_aqadm的执行权限。后两个通过grant_system_privilege进行的授权是可选的。

2.    创建一个payload类型

   TYPE t_spl_queue_payload  OBJECT
(
  ID             (),
  EXEC_DATE     (),
  PARAMETER1       ,
  PARAMETER2       (),
  FLAG           ()
);

通常我们会定义一个对象,用于存储将来需要放置在AQ队列中的信息。

3.    创建AQ相关表

begin
  dbms_aqadm.create_queue_table(queue_table => 'spl_queue_table',
                                            multiple_consumers => true,
                                            queue_payload_type => 't_spl_queue_payload');
end;

执行的结果是生成了表Spl_queue_table,以及若干个aq$_ spl_queue_table_表。表Spl_queue_table中除了AQ队列自身需要的一些字段外,有一个类型为t_spl_queue_payload的USER_DATA字段,用于存储队列消息,这也印证了上面说的:AQ内部是通过数据库表实现的。

4.    创建及启动AQ

begin
  dbms_aqadm.create_queue(queue_name => 'spl_aq',
  queue_table=> 'spl_queue_table');
end;

如何停止及删除AQ以及自定义类型:

--begin
--  dbms_aqadm.stop_queue(queue_name => 'spl_aq');
--  dbms_aqadm.drop_queue(queue_name => 'spl_aq');
--  dbms_aqadm.drop_queue_table(queue_table => 'spl_queue_table');
--  --删除自定义类型,一般应先删除使用type的相关表,然后删除type
--  --drop type t_spl_queue_payload;
--  --或者进行强制删除type,也就是不删除使用type的相关表的情况下删除type,这样相关表里的对应type的列也被删除
--  --drop type t_spl_queue_payload force;
--end;

5.    消息的入队

declare
  enqueue_options    dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle     raw(16);
  recipients         dbms_aq.aq$_recipient_list_t;
  p_payload          t_spl_queue_payload;
begin
  recipients(1) := sys.aq$_agent('someguy', 'spl_aq', null);
  message_properties.recipient_list := recipients;
  message_properties.priority := -5;
  message_properties.delay := dbms_aq.no_delay;
  message_properties.expiration := dbms_aq.never;
  --enqueue_options.visibility:=dbms_aq.on_commit;
  enqueue_options.visibility         := dbms_aq.immediate;
  enqueue_options.sequence_deviation := null;
  p_payload := t_spl_queue_payload(1,sysdate,1,'二','1');
  dbms_aq.enqueue(queue_name         => 'spl_aq',
                  enqueue_options    => enqueue_options,
                  message_properties => message_properties,
                  payload            => p_payload,
                  msgid              => message_handle);
  --commit;
end;

recipient,其中“someguy”指定的是消息的接收者,出队时你需要指定一样的名字才能接收到消息。(2)visibility,可以是on_commit或者immediate,如果使用on_commit,需要手工调用commit语句之后消息才进入队列(这种情况下,最好使用自治事务);如果使用immediate,则dbms_aq.enqueue完成时消息就进入队列,不需commit,并且默认使用自治事务。

6.    消息的出队

declare
  l_payload          t_spl_queue_payload;
  l_queue_record     NUMBER;
  dequeue_options    dbms_aq.dequeue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle     RAW(16);
BEGIN
  dequeue_options.consumer_name := 'someguy';
  dequeue_options.dequeue_mode  := dbms_aq.remove;
  dequeue_options.navigation    := dbms_aq.next_message;
  dequeue_options.visibility    := dbms_aq.immediate;
  --dequeue_options.wait          := dbms_aq.forever;
  dequeue_options.wait  := dbms_aq.no_wait;
  dequeue_options.msgid := null;
  --
  SELECT COUNT(*)
    INTO l_queue_record
    FROM AQ$SPL_QUEUE_TABLE
   WHERE msg_state = 'READY';
  --
  FOR i IN 1 .. l_queue_record LOOP
    dbms_aq.dequeue(queue_name         => 'SPL_AQ',
                    dequeue_options    => dequeue_options,
                    message_properties => message_properties,
                    payload            => l_payload,
                    msgid              => message_handle);
    -- 
    dbms_output.put_line('--------------------------------------- ');
    dbms_output.put_line('id :' || l_payload.id);
    dbms_output.put_line('PARAMETER1 :' || l_payload.PARAMETER1);
    dbms_output.put_line('PARAMETER2 :' || l_payload.PARAMETER2);
    dbms_output.put_line('FLAG :' || l_payload.FLAG);
    dbms_output.put_line('EXEC_DATE :' ||
                         to_char(l_payload.EXEC_DATE,
                                 'yyyy-mm-dd hh24:mi:ss'));
    dbms_output.put_line('--------------------------------------- ');
  END LOOP;
END;

consumer_name需要和前面在入队时指定的recipient一致。(2)wait的两个值forever和no_wait是指如果当前队列中无消息时,是否进行等待,默认等待。(3) navigation的两个值first_message和next_message,一般出于性能考虑我们使用后者,或者在第一次出队时使用前者而在随后的出队中使用后者。