您现在的位置是:首页>博客详情

oracle 高级队列技术入队,出队,回调(2)

FreshMan2022年12月28日 14:02oracle ,入队,出队,回调1436

简介oracle 高级队列的最基本入队、出队操作以及回调

1、定义发送的对象类型(aq_msg_type)

create or replace type aq_msg_type as object

(

  msg_seq         number(10),

  msg_sender      varchar2(10),

  msg_receiver    varchar2(10),

  msg_level       varchar2(10),

  msg_content     varchar2(1000),

  msg_create_time date

);

2、创建队列及队列表,并启动

begin

  dbms_aqadm.create_queue_table(queue_table        => 'aq_msg_qtab',

                                queue_payload_type => 'aq_msg_type');

  dbms_aqadm.create_queue(queue_name  => 'aq_msg_queue1',

                          queue_table => 'aq_msg_qtab');

  dbms_aqadm.start_queue(queue_name => 'aq_msg_queue1');

end;

3、消息入队


declare

  v_queue_name         varchar2(50);

  v_queue_options      dbms_aq.enqueue_options_t;

  v_message_properties dbms_aq.message_properties_t;

  v_message_handle     raw(16);

  v_aq_msg             aq_msg_type;

begin

  v_queue_name := 'aq_msg_queue1';

  v_aq_msg     := aq_msg_type(1, 'FreshMan', 'BB', 'a', '内容1', sysdate);

  dbms_aq.enqueue(queue_name         => v_queue_name,

                  enqueue_options    => v_queue_options,

                  message_properties => v_message_properties,

                  payload            => v_aq_msg,

                  msgid              => v_message_handle);

  commit;

end;

4、手工-浏览/出队

declare

  v_queue_name         varchar2(50);

  v_queue_options      dbms_aq.dequeue_options_t;

  v_message_properties dbms_aq.message_properties_t;

  v_message_handle     raw(16);

  v_aq_msg             aq_msg_type;

begin

  v_queue_name := 'aq_msg_queue1';

  --如下 dbms_aq.browse 表示'浏览',无下行配置则出队列

  v_queue_options.dequeue_mode := dbms_aq.browse;

  dbms_aq.dequeue(queue_name         => v_queue_name,

                  dequeue_options    => v_queue_options,

                  message_properties => v_message_properties,

                  payload            => v_aq_msg,

                  msgid              => v_message_handle);

  dbms_output.put_line('--------------------------------------- ');

  dbms_output.put_line('msg_seq :' || v_aq_msg.msg_seq);

  dbms_output.put_line('msg_sender :' || v_aq_msg.msg_sender);

  dbms_output.put_line('msg_receiver :' || v_aq_msg.msg_receiver);

  dbms_output.put_line('msg_level :' || v_aq_msg.msg_level);

  dbms_output.put_line('msg_content :' || v_aq_msg.msg_content);

  dbms_output.put_line('v_message_handle :' || v_message_handle);

  dbms_output.put_line('msg_create_time :' ||to_char(v_aq_msg.msg_create_time,'yyyy-mm-dd hh24:mi:ss'));

  dbms_output.put_line('current_time :' ||to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'));

  commit;

end;

5、自动--通知出队

--创建一个序列

create sequence aq_msg_seq;

--创建一个接受表

create table aq_msg_received

(

aq_msg_seq number,

queue_name varchar2(100),

reeived_time date,

content aq_msg_type,

consumer_name varchar2(512)

);

--创建回调的包

create or replace package call_back_pck as

  procedure aq_msg_queue1(context  raw,

                          reginfo  sys.aq$_reg_info,

                          descr    sys.aq$_descriptor,

                          payload  raw,

                          payload1 number);

end call_back_pck;

/

create or replace package body call_back_pck as

  procedure aq_msg_queue1(context  raw,

                          reginfo  sys.aq$_reg_info,

                          descr    sys.aq$_descriptor,

                          payload  raw,

                          payload1 number) is

    r_dequeue_options    dbms_aq.dequeue_options_t;

    r_message_properties dbms_aq.message_properties_t;

    v_message_handle     raw(26);

    o_payload            aq_msg_type;

  begin

    r_dequeue_options.msgid         := descr.msg_id;

    r_dequeue_options.consumer_name := descr.consumer_name;

    dbms_aq.dequeue(queue_name         => descr.queue_name,

                    dequeue_options    => r_dequeue_options,

                    message_properties => r_message_properties,

                    payload            => o_payload,

                    msgid              => v_message_handle);

    insert into aq_msg_received

      (aq_msg_seq, queue_name, reeived_time, content, consumer_name)

    values

      (aq_msg_seq.nextval,

       descr.queue_name,

       sysdate,

       o_payload,

       descr.consumer_name);

    commit exception when others then rollback;

  end aq_msg_queue1;

end call_back_pck;

/

需要注意:我们应该授予存储过程的所有者适当的权限,通常是对对象的EXECUTE 。在这种情况下,我们知道我们应该再次将这些包上的EXECUTE授予PUBLIC

SQL> conn / as sysdba
Connected.
SQL> GRANT EXECUTE ON SYS.DBMS_SQL TO PUBLIC;

Grant succeeded.

6、增加回调注册

begin

  dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('AQ_MSG_QUEUE1',

                                                          dbms_aq.namespace_aq,

                                                          'plsql://call_back_pck.aq_msg_queue1',

                                                          hextoraw('FF'))),

                   1);

end;

7、入队测试

declare

  v_queue_name         varchar2(50);

  v_queue_options      dbms_aq.enqueue_options_t;

  v_message_properties dbms_aq.message_properties_t;

  v_message_handle     raw(16);

  v_aq_msg             aq_msg_type;

begin

  v_queue_name := 'aq_msg_queue1';

  v_aq_msg     := aq_msg_type(2, 'BOSS', 'CC', 'b', '挂吊水', sysdate);

  dbms_aq.enqueue(queue_name         => v_queue_name,

                  enqueue_options    => v_queue_options,

                  message_properties => v_message_properties,

                  payload            => v_aq_msg,

                  msgid              => v_message_handle);

  commit;

end;