1、定义发送的对象类型(aq_msg_type)
create or replace type aq_msg_type as object
(
msg_seq number(10),
msg_sender varchar2(10),
msg_receiver varchar2(10),
msg_level varchar2(10),
msg_content varchar2(1000),
msg_create_time date
);
2、创建队列及队列表,并启动
begin
dbms_aqadm.create_queue_table(queue_table => 'aq_msg_qtab',
queue_payload_type => 'aq_msg_type');
dbms_aqadm.create_queue(queue_name => 'aq_msg_queue1',
queue_table => 'aq_msg_qtab');
dbms_aqadm.start_queue(queue_name => 'aq_msg_queue1');
end;
3、消息入队
declare
v_queue_name varchar2(50);
v_queue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_msg_type;
begin
v_queue_name := 'aq_msg_queue1';
v_aq_msg := aq_msg_type(1, 'FreshMan', 'BB', 'a', '内容1', sysdate);
dbms_aq.enqueue(queue_name => v_queue_name,
enqueue_options => v_queue_options,
message_properties => v_message_properties,
payload => v_aq_msg,
msgid => v_message_handle);
commit;
end;
4、手工-浏览/出队
declare
v_queue_name varchar2(50);
v_queue_options dbms_aq.dequeue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_msg_type;
begin
v_queue_name := 'aq_msg_queue1';
--如下 dbms_aq.browse 表示'浏览',无下行配置则出队列
v_queue_options.dequeue_mode := dbms_aq.browse;
dbms_aq.dequeue(queue_name => v_queue_name,
dequeue_options => v_queue_options,
message_properties => v_message_properties,
payload => v_aq_msg,
msgid => v_message_handle);
dbms_output.put_line('--------------------------------------- ');
dbms_output.put_line('msg_seq :' || v_aq_msg.msg_seq);
dbms_output.put_line('msg_sender :' || v_aq_msg.msg_sender);
dbms_output.put_line('msg_receiver :' || v_aq_msg.msg_receiver);
dbms_output.put_line('msg_level :' || v_aq_msg.msg_level);
dbms_output.put_line('msg_content :' || v_aq_msg.msg_content);
dbms_output.put_line('v_message_handle :' || v_message_handle);
dbms_output.put_line('msg_create_time :' ||to_char(v_aq_msg.msg_create_time,'yyyy-mm-dd hh24:mi:ss'));
dbms_output.put_line('current_time :' ||to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'));
commit;
end;
5、自动--通知出队
--创建一个序列
create sequence aq_msg_seq;
--创建一个接受表
create table aq_msg_received
(
aq_msg_seq number,
queue_name varchar2(100),
reeived_time date,
content aq_msg_type,
consumer_name varchar2(512)
);
--创建回调的包
create or replace package call_back_pck as
procedure aq_msg_queue1(context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payload1 number);
end call_back_pck;
/
create or replace package body call_back_pck as
procedure aq_msg_queue1(context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payload1 number) is
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle raw(26);
o_payload aq_msg_type;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into aq_msg_received
(aq_msg_seq, queue_name, reeived_time, content, consumer_name)
values
(aq_msg_seq.nextval,
descr.queue_name,
sysdate,
o_payload,
descr.consumer_name);
commit exception when others then rollback;
end aq_msg_queue1;
end call_back_pck;
/
需要注意:我们应该授予存储过程的所有者适当的权限,通常是对对象的EXECUTE 。在这种情况下,我们知道我们应该再次将这些包上的EXECUTE授予PUBLIC。
SQL> conn / as sysdba
Connected.
SQL> GRANT EXECUTE ON SYS.DBMS_SQL TO PUBLIC;
Grant succeeded.
6、增加回调注册
begin
dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('AQ_MSG_QUEUE1',
dbms_aq.namespace_aq,
'plsql://call_back_pck.aq_msg_queue1',
hextoraw('FF'))),
1);
end;
7、入队测试
declare
v_queue_name varchar2(50);
v_queue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_msg_type;
begin
v_queue_name := 'aq_msg_queue1';
v_aq_msg := aq_msg_type(2, 'BOSS', 'CC', 'b', '挂吊水', sysdate);
dbms_aq.enqueue(queue_name => v_queue_name,
enqueue_options => v_queue_options,
message_properties => v_message_properties,
payload => v_aq_msg,
msgid => v_message_handle);
commit;
end;