SQLize Online / PHPize Online  /  SQLtest Online

A A A
Share      Blog   Popular
Copy Format Clear
procedure RUN_CHECK_CH is v_log_id number; v_sql varchar2(4000); v_error varchar2(4000); v_subject varchar2(4000); v_message varchar2(4000); v_check_id number; v_check_prev_id number; v_next_date date; v_column_list varchar2(4000); v_comment_list varchar2(4000); v_email_list varchar2(4000); v_str clob; v_blob blob; v_len number; v_str2 varchar2(32000); v_err_count number; v_prev_count number; v_diff_count number; v_cnt_err number; v_flag_stat number :=0; begin v_log_id := well_audit.pckg_logger.log_create( p_procedure_name => 'RUN_CHECK', p_procedure_purpose => 'RUN_CHECK', p_procedure_parameter => null, p_load_type => 5); delete from clickhouse_clob; delete from DQS_CHECK_GROUPED_RESULTS_TMP; delete from error_well_list; delete from dqs_results_tmp; if well.pckg_check_constant.c_server_alias ='TMN' then --Временно. Выполняем все правила во всех сегментах. insert into dqs_rule_segment(rule_id, segment_id) select t.rule_id, t.segment_id from ( select r.id rule_id, s.id segment_id from synchadmin.wi_segment s join dqs_rule r on r.rule_type = 7 where s.id in (1, 3, 5, 7, 9)) t left join dqs_rule_segment rs on rs.rule_id = t.rule_id and rs.segment_id = t.segment_id where rs.id is null; end if; commit; CHECK_TOTAL_ERROR_VIEWS; FILL_RULES_COLUMNS; dbms_output.enable(10000000000000000000000); for dc in (select e.column_name, r.description, r.id, r.name, r.interval, r.view_owner, r.view_total, r.view_error, r.GROUP_HISTORY_RESULTS, replace(r.name, ' ', '_') filename from DQS_RULE r join well_audit.dqs_entity e on e.id=r.entity_id and column_name is not null join well_audit.dqs_rule_segment rs on rs.rule_id = r.id join synchadmin.wi_segment s on s.id = rs.segment_id where r.rule_type = 7 and s.db_name = well.pckg_check_constant.c_server_alias and sysdate >= r.next_date and active = 1 --and r.id = 558 ) loop begin v_prev_count:=0; well_audit.pckg_logger.log_write(p_hdr_id => v_log_id,p_step_inc => 1,p_step_name => dc.name||' - BEGIN',p_beg_date => sysdate,p_end_date => sysdate); --dbms_output.put_line('0'||dc.name); insert into dqs_check(rule_id, date_begin) values(dc.id, sysdate) returning id into v_check_id; select listagg(rc.column_name, ',') within group (order by rc.order_num) into v_column_list from DQS_RULE_COLUMNS rc where rc.rule_id = dc.id; select listagg(nvl(rc.comments, rc.column_name), ';') within group (order by rc.order_num) into v_comment_list from DQS_RULE_COLUMNS rc where rc.rule_id = dc.id; if dc.view_total is not null then delete from DQS_CHECK_GROUPED_RESULTS_TMP; v_sql:= 'INSERT INTO DQS_CHECK_GROUPED_RESULTS_TMP(CHECK_ID, '||dc.column_name||', TOTAL_COUNT) select '||to_char(v_check_id)||','||dc.column_name||',count(*) from '||dc.view_owner||'.'||dc.view_total||' group by '||dc.column_name; execute immediate v_sql; end if; -- записать результаты в clickhouse execute immediate 'select count(*) from '||dc.view_owner||'.'||dc.view_error into v_cnt_err; if v_cnt_err >0 then load_into_clickhouse(p_owner => dc.view_owner, p_view_name => case when instr(dc.view_error,'.')>0 then well.fld(dc.view_error,2,'.') else dc.view_error end, p_check_id => v_check_id); end if; execute immediate 'insert into error_well_list (check_id,'||dc.column_name||',cnt) select '||to_char(v_check_id)||','||dc.column_name||',count(*) from '||dc.view_owner||'.'||dc.view_error||' group by '||dc.column_name; if dc.view_total is not null then well_audit.pckg_logger.log_write(p_hdr_id => v_log_id,p_step_inc => 1,p_step_name => dc.name||' - VIEW_TOTAL IS NOT NULL',p_beg_date => sysdate,p_end_date => sysdate); --TODO if group column = wellid execute immediate 'update DQS_CHECK_GROUPED_RESULTS_TMP gr set gr.error_count = (select count(*) from error_well_list r where r.check_id = gr.check_id and r.'||dc.column_name||' = gr.'||dc.column_name||') where gr.check_id = '||v_check_id; else well_audit.pckg_logger.log_write(p_hdr_id => v_log_id,p_step_inc => 1,p_step_name => dc.name||' - VIEW_TOTAL IS NULL',p_beg_date => sysdate,p_end_date => sysdate); execute immediate 'INSERT INTO DQS_CHECK_GROUPED_RESULTS_TMP(CHECK_ID, '||dc.column_name||', ERROR_COUNT) select CHECK_ID, '||dc.column_name||', CNT from error_well_list'; end if; commit; --- проверить нужна ли запись DQS_CHECK_GROUPED_RESULTS_TMP в clickhouse begin select max(dqc.id) into v_check_prev_id from dqs_check dqc where dqc.rule_id = dc.id and dqc.clickhouse_flag = 1 and dqc.date_end is not null; exception when others then v_check_prev_id := null; end; --dbms_output.put_line('v_check_prev_id='||v_check_prev_id); if v_check_prev_id is not null then v_str:= get_from_clickhouse_sql(p_sql => 'select count(*) from DQS_CHECK_GROUPED_RESULTS t where CHECK_ID = '||to_char(v_check_prev_id)); v_prev_count:=to_number(rtrim(trim(to_char(v_str)),chr(10))); end if; --dbms_output.put_line('dc.id='||dc.id||' v_prev_count='||v_prev_count); if v_prev_count >0 then delete from clickhouse_clob; --dbms_output.put_line('select CHECK_ID, TOTAL_COUNT, ERROR_COUNT, '||dc.column_name||' from DQS_CHECK_GROUPED_RESULTS t where CHECK_ID = '||to_char(v_check_prev_id)); v_str:=get_from_clickhouse_sql(p_sql => 'select CHECK_ID, TOTAL_COUNT, ERROR_COUNT, '||dc.column_name||' from DQS_CHECK_GROUPED_RESULTS t where CHECK_ID = '||to_char(v_check_prev_id)); insert into clickhouse_clob cl values (replace(v_str,'\N','')); v_sql := 'create or replace view VIEW_TEMP (check_id, total_count, error_count, '||dc.column_name||') as select to_number(column1) , to_number(column2), to_number(column3), to_number(column4) from clickhouse_clob d cross join table( lob2table.separatedcolumns( d.c, /* the data LOB */ chr(10), /* row separator */ chr(9) /* column separator */ ) ) t'; --dbms_output.put_line(v_sql); execute immediate v_sql; execute immediate 'select count(*) from ((select total_count, nvl(error_count,0), '||dc.column_name||' from DQS_CHECK_GROUPED_RESULTS_TMP minus select total_count, nvl(error_count,0), '||dc.column_name||' from VIEW_TEMP) union all (select total_count, nvl(error_count,0), '||dc.column_name||' from VIEW_TEMP minus select total_count, nvl(error_count,0), '||dc.column_name||' from DQS_CHECK_GROUPED_RESULTS_TMP) )' into v_diff_count; end if; dbms_output.put_line('v_diff_count='||v_diff_count||' v_check_prev_id= '||v_check_prev_id||' v_prev_count= '||v_prev_count); select count(*) into v_cnt_err from DQS_CHECK_GROUPED_RESULTS_TMP; dbms_output.put_line('DQS_CHECK_GROUPED_RESULTS_TMP '||v_cnt_err); if v_diff_count>0 or v_check_prev_id is null or v_prev_count=0 then dbms_output.put_line('insert into clickhouse_table'); load_into_clickhouse_table(p_owner_source => 'WELL_AUDIT', p_table_name_source => 'DQS_CHECK_GROUPED_RESULTS_TMP', p_table_clickhouse => 'DQS_CHECK_GROUPED_RESULTS'); update dqs_check dq set dq.clickhouse_flag=1 where dq.id=v_check_id; v_flag_stat := 1; end if; ---------SEND--------------------- v_str:= get_from_clickhouse_sql(p_sql => 'select max(RECORD_ID) from DQS_CHECK_RESULTS t where CHECK_ID = '||to_char(v_check_id)); v_err_count:=to_number(rtrim(trim(to_char(v_str)),chr(10))); v_email_list:= GET_EMAIL_LIST(dc.id); dbms_output.put_line(dc.name); dbms_output.put_line(v_email_list); dbms_output.put_line(v_err_count); if v_err_count > 0 and length(v_email_list) > 0 then -- dbms_output.put_line('v_check_id='||v_check_id||' p_rule_id = '||dc.id); v_str:=fn_get_from_clickhouse(p_owner => dc.view_owner, p_view_name => dc.view_error, p_check_id => v_check_id, p_rule_id => dc.id); delete from clickhouse_clob; insert into clickhouse_clob cl values (v_str); v_str:=to_clob(v_comment_list||chr(10)||replace(v_str,chr(9),';')); dbms_lob.createTemporary( v_blob, true ); dbms_lob.open( v_blob, dbms_lob.lob_readwrite ); v_len := length(v_str); if v_len<32000 then v_str2 := substr(v_str,1,length(v_str)); dbms_lob.writeAppend( v_blob, length(v_str2), utl_raw.cast_to_raw(v_str2) ); else for i in 0 .. trunc(v_len/32000) loop v_str2 := dbms_lob.substr(v_str, case when i=trunc(v_len/32000) then mod(v_len,32000) else 32000 end , i*32000+1); dbms_lob.writeAppend( v_blob, length(v_str2), utl_raw.cast_to_raw(v_str2) ); end loop; end if; dbms_lob.close( v_blob ); v_message:= nvl(dc.description, 'select * from '||dc.view_error); well.PCKG_MAIL.ADD_ATTACHMENT(v_blob,'Result.csv','text/html'); dbms_output.put_line(dc.name); dbms_output.put_line(v_email_list); well.PCKG_MAIL.SEND( mailto => v_email_list , subject => dc.name , message => v_message , mailfrom => 'data-quality@lukoil.com' , mimetype => 'text/html' , priority => 1); dbms_lob.freeTemporary( v_blob ); end if; ---------END SEND----------------- update dqs_check t set t.DATE_END=sysdate where t.id = v_check_id; well_audit.pckg_logger.log_write(p_hdr_id => v_log_id,p_step_inc => 1,p_step_name => dc.name||' - END',p_beg_date => sysdate,p_end_date => sysdate); v_sql:='select '||dc.interval||' from dual'; execute immediate v_sql into v_next_date; update dqs_rule r set r.next_date = v_next_date where r.id = dc.id; commit; exception when others then v_error := substr(SQLCODE||' - '||SQLERRM,1,4000); v_subject:= 'ОШИБКА RUN_CHECK_CH'; v_message:= 'ID: '||to_char(dc.id)||' NAME: '||dc.name || ' SQL: '||v_sql||' ERROR: '||v_error; dbms_output.put_line(v_message); well.pckg_mail.SEND(mailto => 'Marina.Abramova@lukoil.com, Anna.Pushkareva@lukoil.com', subject => v_subject, message => v_message, mailfrom => 'wellinfo@lukoil.com'); commit; end; end loop; if v_flag_stat = 1 then --- обновить статистику --------RULE_CHECK_STAT-------- delete from clickhouse_clob; v_str:=get_from_clickhouse_sql(p_sql => 'select CHECK_ID, sum(TOTAL_COUNT) TOTAL_COUNT, sum(ERROR_COUNT) ERROR_COUNT from DQS_CHECK_GROUPED_RESULTS group by CHECK_ID'); insert into clickhouse_clob cl values (replace(v_str,'\N','')); commit; delete from RULE_CHECK_STAT; insert into RULE_CHECK_STAT(rule_id, day_date, CHECK_ID, TOTAL_CNT, ERROR_CNT) with cc as ( select to_number(replace(column1, '.',',')) CHECK_ID, to_number(replace(column2, '.',',')) TOTAL_CNT, to_number(replace(column3, '.',',')) ERROR_CNT from clickhouse_clob d cross join table(well_audit.lob2table.separatedcolumns(d.c, chr(10), chr(9))) t ), max_days as ( --Берем последнюю дату проверки select ch.rule_id, d.day_date, max(ch.id) mid from dqs_check ch join dqs_rule r on r.id = ch.rule_id join wi_web.days d on d.day_date >= ch.date_begin and d.day_date < sysdate --where r.id = 580 group by ch.rule_id, d.day_date ), md as ( --Берем последнюю дату проверки, по которой есть данные select ch.rule_id, md.day_date, max(ch.id) mid from cc cc join dqs_check ch on ch.id = cc.CHECK_ID join max_days md on md.rule_id = ch.rule_id and md.day_date >= ch.date_begin group by ch.rule_id, md.day_date ) select md.rule_id, md.day_date, cc.CHECK_ID, TOTAL_CNT, ERROR_CNT from md md join cc cc on cc.CHECK_ID = md.mid; commit; ---------------- end if; update well_audit.LOG_HDR h set h.date_end = sysdate where h.id = v_log_id; commit; end;

Stuck with a problem? Got Error? Ask ChatGPT!

Copy Clear