001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 package org.dbreplicator.replication.DBHandler; 021 022 import java.sql.*; 023 import java.util.*; 024 025 import org.dbreplicator.replication.*; 026 import org.dbreplicator.replication.column.*; 027 import org.apache.log4j.Logger; 028 029 /** 030 * Method overrides specific to the PostgreSQL database engine, from versions 7.3 to 8.3. 031 */ 032 public class PostgreSQLHandler 033 extends AbstractDataBaseHandler { 034 035 private String pg_publication_TableName; 036 private String pg_subscription_TableName; 037 private String pg_bookmark_TableName; 038 private String pg_rep_TableName; 039 private String pg_log_Table; 040 private String pg_schedule_Table; 041 private String pg_ignoredColumns_Table; 042 private String pg_trackReplicationTablesUpdation_Table; 043 private String pg_trackPrimaryKeyUpdation_Table; 044 protected static Logger log = Logger.getLogger(PostgreSQLHandler.class. 045 getName()); 046 public PostgreSQLHandler() {} 047 048 public PostgreSQLHandler(ConnectionPool connectionPool0) { 049 connectionPool = connectionPool0; 050 vendorType = Utility.DataBase_PostgreSQL; 051 pg_publication_TableName = "public." + publication_TableName; 052 pg_subscription_TableName = "public." + subscription_TableName; 053 pg_bookmark_TableName = "public." + bookmark_TableName; 054 pg_rep_TableName = "public." + rep_TableName; 055 pg_log_Table = "public." + log_Table; 056 pg_schedule_Table = "public." + Schedule_TableName; 057 pg_ignoredColumns_Table="public."+ignoredColumns_Table; 058 pg_trackReplicationTablesUpdation_Table="public."+trackReplicationTablesUpdation_Table; 059 pg_trackPrimaryKeyUpdation_Table ="public."+trackPrimaryKeyUpdation_Table; 060 } 061 062 protected void createSuperLogTable(String pubName) throws SQLException, 063 RepException { 064 StringBuffer logTableQuery = new StringBuffer(); 065 logTableQuery.append(" Create Table ") 066 .append(pg_log_Table) 067 .append(" ( " + RepConstants.logTable_commonId1 + " bigserial , " + 068 RepConstants.logTable_tableName2 + " varchar(255) ) "); 069 runDDL(pubName, logTableQuery.toString()); 070 StringBuffer indexQuery = new StringBuffer(); 071 indexQuery.append("CREATE INDEX ") 072 .append(RepConstants.log_Index) 073 .append(" ON " + getLogTableName()) 074 .append("(") 075 .append(RepConstants.logTable_commonId1) 076 .append(")"); 077 //System.out.println(" Create Index on LogTable : "+indexQuery.toString()); 078 runDDL(pubName, indexQuery.toString()); 079 080 } 081 082 /** 083 * Because changes has been made in structure of RepTable 084 * by Hisar team.So old method has been commented. After 085 * proper testing with all data base it should be deleted. 086 */ 087 088 /* protected void createRepTable(String pubName) throws SQLException, 089 RepException 090 { 091 StringBuffer repTableQuery = new StringBuffer(); 092 repTableQuery.append(" Create Table ").append(getRepTableName()) 093 .append("( "+RepConstants.repTable_pubsubName1+" varchar(255) , "+RepConstants.repTable_tableId2+" bigserial, ") 094 .append(" "+RepConstants.repTable_tableName2+" varchar(255) , "+RepConstants.repTable_filter_clause3+" varchar(255), ") 095 .append(" "+RepConstants.repTable_conflict_resolver4+" varchar(255) , Primary Key ( "+RepConstants.repTable_pubsubName1+" , "+RepConstants.repTable_tableName2+" ) ) "); 096 097 runDDL(pubName, repTableQuery.toString()); 098 } */ 099 100 protected void createRepTable(String pubName) throws SQLException, 101 RepException { 102 StringBuffer repTableQuery = new StringBuffer(); 103 repTableQuery.append(" Create Table ").append(getRepTableName()).append( 104 " ( ") 105 .append(RepConstants.repTable_pubsubName1).append(" varchar(255) , ") 106 .append(RepConstants.repTable_tableId2).append(" bigserial, ") 107 .append(RepConstants.repTable_tableName2).append(" varchar(255) , ") 108 .append(RepConstants.repTable_filter_clause3).append( 109 " varchar(255) , ") 110 .append(RepConstants.repTable_createshadowtable6).append( 111 " char(1) Default 'Y', ") 112 .append(RepConstants.repTable_cyclicdependency7).append( 113 " char(1) Default 'N', ") 114 .append(RepConstants.repTable_conflict_resolver4).append( 115 " varchar(255), ") 116 .append(" Primary Key (").append(RepConstants.repTable_pubsubName1). 117 append(" , ") 118 .append(RepConstants.repTable_tableName2).append(" ) ) "); 119 runDDL(pubName, repTableQuery.toString()); 120 } 121 122 protected void createBookMarkTable(String pubName) throws SQLException, 123 RepException { 124 StringBuffer bookmarkTableQuery = new StringBuffer(); 125 bookmarkTableQuery.append(" Create Table ") 126 .append(getBookMarkTableName()) 127 .append(" ( " + RepConstants.bookmark_LocalName1 + 128 " varchar(255) , " + RepConstants.bookmark_RemoteName2 + 129 " varchar(255) , ") 130 .append(" " + RepConstants.bookmark_TableName3 + " varchar(255) , " + 131 RepConstants.bookmark_lastSyncId4 + " bigint , ") 132 .append( 133 " " + RepConstants.bookmark_ConisderedId5 + " bigint ," + 134 RepConstants.bookmark_IsDeletedTable + 135 " char(1) default 'N' , Primary Key ( " + 136 RepConstants.bookmark_LocalName1 + " , " + 137 RepConstants.bookmark_RemoteName2 + " , " + 138 RepConstants.bookmark_TableName3 + " ) ) "); 139 runDDL(pubName, bookmarkTableQuery.toString()); 140 } 141 142 public void createShadowTable(String pubsubName, String tableName, 143 String allColseq,String[] primaryColumns) throws RepException { 144 StringBuffer shadowTableQuery = new StringBuffer(); 145 shadowTableQuery.append(" Create Table ") 146 .append(RepConstants.shadow_Table(tableName)) 147 .append(" ( " + RepConstants.shadow_sync_id1 + " bigserial , ") 148 .append(" " + RepConstants.shadow_common_id2 + " bigint , ") 149 .append(" " + RepConstants.shadow_operation3 + " character(1) , ") 150 .append(" " + RepConstants.shadow_status4 + " character(1) ") 151 .append(allColseq) 152 .append(" , " + RepConstants.shadow_serverName_n + " varchar(255) ") 153 .append(" , " + RepConstants.shadow_PK_Changed + " char(1) ) "); 154 try { 155 //System.out.println(" shadowTableQuery.toString() ="+shadowTableQuery.toString()); 156 runDDL(pubsubName, shadowTableQuery.toString()); 157 158 } 159 catch (RepException ex) { 160 throw ex; 161 } 162 catch (SQLException ex) { 163 // ex.printStackTrace(); 164 // Ignore the Exception 165 } 166 createIndex(pubsubName,RepConstants.shadow_Table(tableName)); 167 /* bjt - create pk index to mirror main table index, only not unique */ 168 createPkIndex(pubsubName,RepConstants.shadow_Table(tableName),primaryColumns); 169 } 170 171 protected void createScheduleTable(String subName) throws SQLException, 172 RepException { 173 StringBuffer ScheduleTableQuery = new StringBuffer(); 174 175 ScheduleTableQuery.append(" Create Table ") 176 .append(pg_schedule_Table) 177 .append(" ( " + RepConstants.schedule_Name + " varchar(255) , " + 178 RepConstants.subscription_subName1 + " varchar(255) unique , ") 179 .append(" " + RepConstants.schedule_type + " varchar(255) , ") 180 .append(" " + RepConstants.publication_serverName3 + " varchar (255) ," + 181 RepConstants.publication_portNo + " varchar(255) ,") 182 .append(" " + RepConstants.recurrence_type + " varchar(255) , " + 183 RepConstants.replication_type + " varchar(255) ,") 184 .append(" " + RepConstants.schedule_time + " bigint , ") 185 .append(" " + RepConstants.schedule_counter + " bigint , Primary Key (" + 186 RepConstants.schedule_Name + " , " + 187 RepConstants.subscription_subName1 + ") ) "); 188 runDDL(subName, ScheduleTableQuery.toString()); 189 } 190 191 public void createShadowTableTriggers(String pubsubName, String tableName, 192 ArrayList colInfoList, 193 String[] primCols) throws RepException { 194 195 String serverName = getLocalServerName(); 196 // RepPrinter.print(" Columns are :::::: " + java.util.Arrays.asList(columnTypeInfoMap.keySet().toArray(new String[0]))); 197 // String[] colNames = (String[]) columnTypeInfoMap.keySet().toArray(new String[0]); 198 int size = colInfoList.size(); 199 String[] colNames = new String[size]; 200 for (int i = 0; i < size; i++) { 201 colNames[i] = ( (ColumnsInfo) colInfoList.get(i)).getColumnName(); 202 } 203 //RepPrinter.print(" Columns are :::::: " + java.util.Arrays.asList(colNames)); 204 String colNameSeq = getColumnNameSequence(colNames, "").toString(); 205 String colNameSeqPrefixOldRow = getColumnNameSequence(colNames, "old."). 206 toString(); 207 String colNameSeqPrefixNewRow = getColumnNameSequence(colNames, "new."). 208 toString(); 209 String shadowTableName = RepConstants.shadow_Table(tableName); 210 String primColumnNamesSeq = getColumnNameSequence(primCols, "rep_old_"); 211 String primColNameSeqPrefixOldRow = getColumnNameSequence(primCols, "old.").toString(); 212 String primColNameSeqPrefixNewRow = getColumnNameSequence(primCols, "new.").toString(); 213 214 String[] primColsOld = getColumnNameWithOldOrNewPrefix(primCols, "old."); 215 String[] primColsNew = getColumnNameWithOldOrNewPrefix(primCols, "new."); 216 217 //create trigger abc after delete on t2 for each row execute procedure delete_insert(); 218 String table = tableName.substring(tableName.indexOf('.') + 1); 219 StringBuffer insTriggerQuery = new StringBuffer(); 220 insTriggerQuery.append(" Create trigger ") 221 .append(getInsertTriggerName(table)) 222 .append(" after insert on ").append(tableName) 223 .append(" For each Row execute procedure insert_" + table + "()"); 224 225 StringBuffer delTriggerQuery = new StringBuffer(); 226 delTriggerQuery.append(" Create trigger ") 227 .append(getDeleteTriggerName(table)) 228 .append(" after delete on ").append(tableName) 229 .append(" For each Row execute procedure delete_" + table + "()"); 230 231 StringBuffer updTriggerQuery = new StringBuffer(); 232 updTriggerQuery.append(" Create trigger ") 233 .append(getUpdateTriggerName(table)) 234 .append(" after update on ").append(tableName) 235 .append(" For each Row execute procedure update_" + table + "()"); 236 237 try { 238 runDDL(pubsubName, createFunctionHandler()); 239 } 240 catch (RepException ex1) { 241 } 242 catch (SQLException ex1) { 243 } 244 245 try { 246 runDDL(pubsubName, createFunctionLanguage()); 247 } 248 catch (RepException ex2) { 249 } 250 catch (SQLException ex2) { 251 252 } 253 254 try { 255 runDDL(pubsubName, 256 functionForInsertTrigger(tableName, shadowTableName, 257 primColumnNamesSeq, 258 colNameSeq, colNameSeqPrefixNewRow, 259 primColNameSeqPrefixNewRow, serverName)); 260 } 261 catch (RepException ex) { 262 throw ex; 263 } 264 catch (SQLException ex) { 265 // ex.printStackTrace(); 266 // Ignore Exception 267 } try { 268 runDDL(pubsubName, 269 functionForDeleteTrigger(tableName, colNameSeqPrefixOldRow, 270 primColNameSeqPrefixOldRow, 271 shadowTableName, colNameSeq, 272 primColumnNamesSeq, serverName)); 273 274 } 275 catch (RepException ex) { 276 throw ex; 277 } 278 catch (SQLException ex) { 279 // ex.printStackTrace(); 280 // Ignore Exception 281 } 282 try { 283 runDDL(pubsubName, 284 functionForUpdateTrigger(tableName, colNameSeqPrefixOldRow, 285 primColNameSeqPrefixOldRow, 286 shadowTableName, colNameSeq, 287 primColumnNamesSeq, serverName, 288 colNameSeqPrefixNewRow,primColsOld,primColsNew)); 289 290 } 291 catch (RepException ex) { 292 throw ex; 293 } 294 catch (SQLException ex) { 295 // ex.printStackTrace(); 296 // Ignore Exception 297 } 298 try { 299 runDDL(pubsubName, insTriggerQuery.toString()); 300 } 301 catch (RepException ex) { 302 throw ex; 303 } 304 catch (SQLException ex) { 305 // ex.printStackTrace(); 306 // Ignore Exception 307 } 308 try { 309 runDDL(pubsubName, delTriggerQuery.toString()); 310 } 311 catch (RepException ex) { 312 throw ex; 313 } 314 catch (SQLException ex) { 315 // ex.printStackTrace(); 316 // Ignore Exception 317 } 318 try { 319 runDDL(pubsubName, updTriggerQuery.toString()); 320 } 321 catch (RepException ex) { 322 throw ex; 323 } 324 catch (SQLException ex) { 325 // ex.printStackTrace(); 326 // Ignore Exception 327 } 328 } 329 330 public String createFunctionHandler() { 331 StringBuffer sb = new StringBuffer(); 332 sb.append("CREATE FUNCTION plpgsql_call_handler() RETURNS language_handler") 333 .append(" AS '$libdir/plpgsql' ") 334 .append(" LANGUAGE C "); 335 return sb.toString(); 336 } 337 338 public String createFunctionLanguage() { 339 StringBuffer sb = new StringBuffer(); 340 sb.append("CREATE LANGUAGE plpgsql ") 341 .append(" HANDLER plpgsql_call_handler"); 342 return sb.toString(); 343 } 344 345 public String functionForInsertTrigger(String tableName, 346 String shadowTableName, 347 String primColumnNamesSeq, 348 String colNameSeq, 349 String colNameSeqPrefixNewRow, 350 String primColNameSeqPrefixNewRow, 351 String serverName) { 352 353 String table = tableName.substring(tableName.indexOf('.') + 1); 354 StringBuffer sb = new StringBuffer(); 355 sb.append("CREATE FUNCTION " + "\"" + "insert_" + table + "\"" + 356 "() RETURNS Trigger AS '") 357 .append(" BEGIN ") 358 .append(insertRecordIntoLogTable(tableName)).append(" Insert Into ") 359 .append(shadowTableName).append(" ( ") 360 .append(RepConstants.shadow_common_id2).append(", ") 361 .append(RepConstants.shadow_operation3).append(", ") 362 .append(RepConstants.shadow_status4).append(", ") 363 .append(colNameSeq).append(primColumnNamesSeq) 364 .append(RepConstants.shadow_serverName_n) 365 .append(" ) Values ( null , ''I'' , null , ") 366 .append(colNameSeqPrefixNewRow).append(primColNameSeqPrefixNewRow) 367 .append(" ''").append(serverName).append("'') ; ") 368 .append(" RETURN NULL;") 369 .append(" END;") 370 .append(" ' LANGUAGE 'plpgsql' VOLATILE "); 371 return sb.toString(); 372 } 373 374 public String functionForUpdateTrigger(String tableName, 375 String colNameSeqPrefixOldRow, 376 String primColNameSeqPrefixOldRow, 377 String shadowTableName, 378 String colNameSeq, 379 String primColumnNamesSeq, 380 String serverName, 381 String colNameSeqPrefixNewRow,String[] primColsOld,String[] primColsNew) { 382 String table = tableName.substring(tableName.indexOf('.') + 1); 383 StringBuffer sb = new StringBuffer(); 384 sb.append("CREATE FUNCTION " + "\"" + "update_" + table + "\"" + 385 "() RETURNS Trigger AS '") 386 .append(" Declare ") 387 .append(" maxlogid bigint; pkchanged char(1); ") 388 .append(" BEGIN ") 389 .append(insertRecordIntoLogTable(tableName)) 390 .append(" Select max( " + RepConstants.logTable_commonId1 + 391 " ) into maxlogid from ") 392 .append(pg_log_Table).append(" ; ") 393 .append(" if( "); 394 for (int i = 0; i < primColsOld.length; i++) { 395 if (i != 0) 396 sb.append(" or "); // bjt - changed from 'and' to 'or'. if ANY of the values change, the pk changes 397 sb.append(primColsOld[i] ) 398 .append("!=" ) 399 .append(primColsNew[i]); 400 } 401 sb.append(" ) then ") 402 .append(" pkchanged = ''Y''; end if;") // bjt - need two single quotes instead of one 403 .append(" Insert Into ") 404 .append(shadowTableName).append(" ( ") 405 .append(RepConstants.shadow_common_id2).append(", ") 406 .append(RepConstants.shadow_operation3).append(", ") 407 .append(RepConstants.shadow_status4).append(", ") 408 .append(colNameSeq).append(primColumnNamesSeq) 409 .append(RepConstants.shadow_serverName_n) 410 .append(" ) Values ( maxlogid , ''U'' , ''B'' , ") 411 .append(colNameSeqPrefixOldRow).append(primColNameSeqPrefixOldRow) 412 .append(" ''").append(serverName).append("'') ; Insert Into ") 413 .append(shadowTableName).append(" ( ") 414 .append(RepConstants.shadow_common_id2).append(", ") 415 .append(RepConstants.shadow_operation3).append(", ") 416 .append(RepConstants.shadow_status4).append(", ") 417 .append(colNameSeq).append(primColumnNamesSeq) 418 .append(RepConstants.shadow_serverName_n).append(" , ") 419 .append(RepConstants.shadow_PK_Changed) 420 .append(" ) Values ( maxlogid , ''U'' , ''A'' , ") 421 .append(colNameSeqPrefixNewRow).append(primColNameSeqPrefixOldRow) 422 .append(" ''").append(serverName).append("'',pkchanged) ;") 423 .append(" RETURN NULL;") 424 .append(" END;") 425 .append(" ' LANGUAGE 'plpgsql' VOLATILE "); 426 return sb.toString(); 427 } 428 429 public String functionForDeleteTrigger(String tableName, 430 String colNameSeqPrefixOldRow, 431 String primColNameSeqPrefixOldRow, 432 String shadowTableName, 433 String colNameSeq, 434 String primColumnNamesSeq, 435 String serverName) { 436 437 String table = tableName.substring(tableName.indexOf('.') + 1); 438 StringBuffer sb = new StringBuffer(); 439 sb.append("CREATE FUNCTION " + "\"" + "delete_" + table + "\"" + 440 "() RETURNS Trigger AS '") 441 .append(" BEGIN ") 442 .append(insertRecordIntoLogTable(tableName)).append(" Insert Into ") 443 .append(shadowTableName).append(" ( ") 444 .append(RepConstants.shadow_common_id2).append(", ") 445 .append(RepConstants.shadow_operation3).append(", ") 446 .append(RepConstants.shadow_status4).append(", ") 447 .append(colNameSeq).append(primColumnNamesSeq) 448 .append(RepConstants.shadow_serverName_n) 449 .append(" ) Values ( null , ''D'' , null , ") 450 .append(colNameSeqPrefixOldRow).append(primColNameSeqPrefixOldRow) 451 .append("''").append(serverName).append("'') ; ") 452 .append(" RETURN NULL ;") 453 .append(" END;") 454 .append("' LANGUAGE 'plpgsql' VOLATILE "); 455 return sb.toString(); 456 } 457 458 public String insertRecordIntoLogTable(String tableName) { 459 StringBuffer insertLogTable = new StringBuffer(); 460 insertLogTable.append(" Insert into ") 461 .append(pg_log_Table) 462 .append(" ( ").append(RepConstants.logTable_tableName2) 463 .append(" ) values ( ''") 464 .append(tableName).append("''); "); 465 return insertLogTable.toString(); 466 467 } 468 469 private String getInsertTriggerName(String schematable) { 470 String tableInsert; 471 int index = schematable.indexOf('.'); 472 if (index == -1) { 473 tableInsert = schematable; 474 } 475 tableInsert = schematable.substring(schematable.indexOf(".") + 1).trim(); 476 return getObjectName(tableInsert, "tri_"); 477 } 478 479 private String getDeleteTriggerName(String schematable) { 480 String tableDelete; 481 int index = schematable.indexOf('.'); 482 if (index == -1) { 483 tableDelete = schematable; 484 } 485 tableDelete = schematable.substring(schematable.indexOf(".") + 1).trim(); 486 return getObjectName(tableDelete, "trd_"); 487 } 488 489 private String getUpdateTriggerName(String schematable) { 490 String tableUpdate; 491 int index = schematable.indexOf('.'); 492 if (index == -1) { 493 tableUpdate = schematable; 494 } 495 tableUpdate = schematable.substring(schematable.indexOf(".") + 1).trim(); 496 return getObjectName(tableUpdate, "tru_"); 497 } 498 499 public void dropTriggersAndShadowTable(Connection connection, String table, 500 String pubsubName) throws SQLException, 501 RepException { 502 String tableName = table.substring(table.indexOf('.') + 1); 503 String insertTriggerName = getInsertTriggerName(tableName); 504 String updateTriggerName = getDeleteTriggerName(tableName); 505 String DeleteTriggerName = getUpdateTriggerName(tableName); 506 507 fireDropQuery(connection, 508 " drop trigger " + insertTriggerName + " on " + table); 509 510 fireDropQuery(connection, 511 " drop trigger " + updateTriggerName + " on " + table); 512 513 fireDropQuery(connection, 514 " drop trigger " + DeleteTriggerName + " on " + table); 515 516 fireDropQuery(connection, " drop table " + RepConstants.shadow_Table(table)); 517 518 fireDropQuery(connection, " drop function " + "insert_" + tableName + "()"); 519 520 fireDropQuery(connection, " drop function " + "delete_" + tableName + "()"); 521 522 fireDropQuery(connection, " drop function " + "update_" + tableName + "()"); 523 524 fireDropQuery(connection, " delete from " + getLogTableName() + " where " + 525 RepConstants.logTable_tableName2 + " = '" + table + "'"); 526 } 527 528 private static String getObjectName(String schematable, String prefix) { 529 int index = schematable.indexOf('.'); 530 if (index == -1) { 531 return prefix + schematable; 532 } 533 String schema = schematable.substring(0, index); 534 String table = schematable.substring(index + 1); 535 return schema + "." + prefix + table; 536 } 537 538 public void createSubscribedTablesTriggersAndShadowTables(String subName, 539 String[] pubTableQueries, HashMap primCols, int pubVendorType,ArrayList repTables) throws 540 RepException,SQLException { 541 Connection connection = connectionPool.getConnection(subName); 542 MetaDataInfo mdi = Utility.getDatabaseMataData(connection); 543 Statement stt = connection.createStatement(); 544 try { 545 for (int i = 0; i < pubTableQueries.length; i++) { 546 try { 547 stt.execute(pubTableQueries[i].toLowerCase()); 548 } 549 catch (SQLException ex) { 550 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, 551 getTableName(pubTableQueries[i])); 552 String existing = null; 553 try { 554 existing = mdi.getExistingTableQuery(this, sname, pubVendorType). 555 toLowerCase().replaceAll(" ", ""); 556 } 557 catch (RepException ex1) { 558 if (ex1.getRepCode().equalsIgnoreCase("REP033")) { 559 throw ex; 560 } 561 } 562 if (pubTableQueries[i].toLowerCase().replaceAll(" ", 563 "").indexOf(existing.toLowerCase()) != 0) { 564 throw new RepException("REP024", new Object[] {subName, 565 sname.toString()}); 566 } 567 // Ignore the exception if the same table exists with same colums and primary key 568 } 569 } 570 } 571 finally { 572 if (stt != null) 573 stt.close(); 574 connectionPool.returnConnection(connection); 575 } 576 // Just Get the Column Sequenes from Create Table Queries 577 // and create Shadow Table and Triggers on Shadow Table 578 for (int i = 0; i < pubTableQueries.length; i++) { 579 String tableName = getTableName(pubTableQueries[i]); 580 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, 581 getTableName(pubTableQueries[i])); 582 String colSeqenceWithDataType = getColumnSequenceWithDataTypes( 583 pubTableQueries[i]); 584 ArrayList colInfoList = getColumnNamesAndDataTypes(colSeqenceWithDataType. 585 trim()); 586 String[] pcols = (String[]) ( (ArrayList) primCols.get(tableName. 587 toLowerCase())).toArray(new String[0]); 588 String allColSequence = getShadowTableColumnDataTypeSequence(colInfoList, 589 pcols,(RepTable)repTables.get(i)); 590 createShadowTable(subName, sname.toString(), allColSequence,pcols); 591 makeProvisionForLOBDataTypes(colInfoList); 592 createShadowTableTriggers(subName, sname.toString(), colInfoList, pcols); 593 //createShadowTablesAndSubscriptionTableTriggers(subName,tableName,colSeqenceWithDataType,(ArrayList) primCols.get(tableName.toLowerCase())); 594 } 595 } 596 597 public boolean isDataTypeOptionalSizeSupported(TypeInfo typeInfo) { 598 int sqlType = typeInfo.getSqlType(); 599 String typeName = typeInfo.getTypeName(); 600 switch (sqlType) { 601 case 4: 602 case 5: 603 case 7: 604 case -4: 605 case -5: 606 case -7: 607 case 91: 608 case 92: 609 case 93: 610 case -6: 611 case 8: 612 case -2: 613 case -3: 614 case 2000: 615 case 16: 616 case 2004: 617 case 2005: 618 case 1111: 619 case 2006: 620 case -1: 621 622 // case 1 : 623 return false; 624 625 case 12: 626 if (typeInfo.getTypeName().equalsIgnoreCase("text")) { 627 return false; 628 } 629 default: 630 return true; 631 } 632 } 633 634 public void setTypeInfo(TypeInfo typeInfo, ResultSet rs) throws RepException, 635 SQLException { 636 //System.out.println("PostgreSQLHandler = "+typeInfo); 637 switch (typeInfo.getSqlType()) { 638 case Types.BOOLEAN: 639 typeInfo.setTypeName("boolean"); 640 break; // //16; 641 case Types.BIT: 642 if (typeInfo.getTypeName().equalsIgnoreCase("bool")) { 643 typeInfo.setTypeName("bool"); 644 return; 645 } 646 typeInfo.setTypeName("bit"); 647 break; // //-7; 648 case Types.TINYINT: 649 typeInfo.setTypeName("int2"); 650 break; //-6; 651 case Types.SMALLINT: 652 typeInfo.setTypeName("int2"); 653 break; // // 5; 654 case Types.INTEGER: 655 typeInfo.setTypeName("int4"); 656 break; // // 4; 657 case Types.BIGINT: 658 typeInfo.setTypeName("int8"); 659 break; // //-5; 660 case Types.FLOAT: 661 typeInfo.setTypeName("numeric"); 662 break; // // 6; 663 case Types.REAL: 664 typeInfo.setTypeName("float4"); 665 break; // // 7; 666 case Types.DOUBLE: 667 typeInfo.setTypeName("float8"); 668 break; // // 8; 669 case Types.NUMERIC: 670 typeInfo.setTypeName("numeric"); 671 break; // // 2; 672 case Types.DECIMAL: 673 typeInfo.setTypeName("numeric"); 674 break; // // 3; 675 case Types.CHAR: 676 typeInfo.setTypeName(" character"); 677 break; // // 1; 678 case Types.VARCHAR: 679 if (typeInfo.getTypeName().equalsIgnoreCase("text")) { 680 typeInfo.setTypeName("text"); 681 return; 682 } 683 typeInfo.setTypeName("varchar"); 684 break; // //12; 685 case Types.LONGVARCHAR: 686 typeInfo.setTypeName("text"); 687 break; // //-1; 688 case Types.DATE: 689 typeInfo.setTypeName("date"); 690 break; // //91; 691 case Types.TIME: 692 typeInfo.setTypeName("time"); 693 break; // //92; 694 case Types.TIMESTAMP: 695 if (typeInfo.getTypeName().equalsIgnoreCase("DATE")) { 696 typeInfo.setTypeName("DATE"); 697 typeInfo.setSqlType(Types.DATE); 698 return; // 92 699 } 700 typeInfo.setTypeName("timestamp"); 701 break; // //93; 702 case Types.BINARY: 703 typeInfo.setTypeName("bytea"); 704 break; // //-2; 705 case Types.VARBINARY: 706 707 // typeInfo.setTypeName("bytea"); 708 // commented by sunil to test the 709 // case of RAW data type in oracle 710 if (typeInfo.getTypeName().equalsIgnoreCase("RAW")) { 711 typeInfo.setTypeName("text"); 712 typeInfo.setSqlType(Types.VARCHAR); 713 return; // 12 714 } 715 typeInfo.setTypeName("bytea"); 716 break; // //-3; 717 case Types.LONGVARBINARY: 718 typeInfo.setTypeName("bytea"); 719 break; // //-4; 720 case Types.BLOB: 721 typeInfo.setTypeName("bytea"); 722 break; // //2004; 723 case Types.CLOB: 724 typeInfo.setTypeName("text"); 725 break; //2005; 726 case Types.OTHER: 727 if (typeInfo.getTypeName().equalsIgnoreCase("FLOAT")) { //1111 728 typeInfo.setTypeName("numeric"); 729 typeInfo.setSqlType(Types.FLOAT); 730 return; // 6 731 } 732 else if (typeInfo.getTypeName().equalsIgnoreCase("clob")) { 733 typeInfo.setTypeName("text"); 734 typeInfo.setSqlType(Types.CLOB); //2005 735 return; 736 } 737 else if (typeInfo.getTypeName().equalsIgnoreCase("blob")) { 738 throw new RepException("REP031", new Object[] {typeInfo.getTypeName()}); 739 } 740 typeInfo.setTypeName("text"); 741 break; //1111; 742 case Types.NULL: //0 743 case Types.DISTINCT: //2001; 744 case Types.STRUCT: //2002; 745 case Types.ARRAY: //2003; 746 case Types.DATALINK: //70; 747 case Types.REF: 748 case Types.JAVA_OBJECT: 749 // case Types.BLOB : 750 default: 751 throw new RepException("REP031", new Object[] {typeInfo.getTypeName()}); 752 } 753 } 754 755 public AbstractColumnObject getColumnObject(TypeInfo typeInfo) throws 756 RepException { 757 //System.out.println("PostgreSQL typeInfo :: "+typeInfo); 758 int sqlType = typeInfo.getSqlType(); 759 switch (sqlType) { 760 case 1: // char 761 case -1: // long varchar 762 return new StringObject(sqlType,this); 763 case 12: // char varying 764 if (typeInfo.getTypeName().equalsIgnoreCase("text")) { 765 return new ClobStreamObject(sqlType,this); 766 } 767 else { 768 return new StringObject(sqlType,this); 769 } 770 case 2005: // clob 771 case 1111: 772 case 2000: // long varbinary 773 case 2006: // 774 return new ClobStreamObject(sqlType,this); 775 case -3: // bitvarying 776 case 2004: // blob 777 case -2: // binary 778 case -4: // long varbiary 779 return new BlobObject(sqlType,this); 780 case 4: // int 781 case 5: // small int 782 case -6: // tinyint 783 return new IntegerObject(sqlType,this); 784 case -5: // long 785 return new LongObject(sqlType,this); 786 case 3: // decimal 787 case 8: // double precision 788 case 2: // numeric 789 case 6: // float 790 return new DoubleObject(sqlType,this); 791 case 7: // real 792 return new FloatObject(sqlType,this); 793 case -7: // bit 794 case 16: // boolean 795 return new BooleanObject(sqlType,this); 796 case 91: // date 797 return new DateObject(sqlType,this); 798 case 92: // time 799 return new TimeObject(sqlType,this); 800 case 93: // time stamp 801 return new TimeStampObject(sqlType,this); 802 default: 803 throw new RepException("REP031", new Object[] {new Integer(sqlType)}); 804 } 805 } 806 807 public void createSchemas(String pubName, ArrayList schemas) throws 808 SQLException, RepException { 809 int num = schemas.size(); 810 if (num == 0) { 811 return; 812 } 813 Connection con = connectionPool.getConnection(pubName); 814 String authorization = connectionPool.getUserName(); 815 for (int i = 0; i < num; i++) { 816 String schemaName = (String) schemas.get(i); 817 if (schemaName.equalsIgnoreCase("users")) { 818 continue; 819 } 820 StringBuffer sb = new StringBuffer(); 821 sb.append(" Create Schema ").append(schemaName) 822 .append(" authorization ").append(authorization); 823 try { 824 runDDL(pubName, sb.toString()); 825 } 826 catch (SQLException ex) { 827 RepConstants.writeERROR_FILE(ex); 828 // Ignore the Exception 829 } 830 catch (RepException ex) { 831 connectionPool.returnConnection(con); 832 throw ex; 833 } 834 } 835 connectionPool.returnConnection(con); 836 } 837 838 public String getPublicationTableName() { 839 return pg_publication_TableName; 840 } 841 842 public String getSubscriptionTableName() { 843 return pg_subscription_TableName; 844 } 845 846 public String getRepTableName() { 847 return pg_rep_TableName; 848 } 849 850 public String getLogTableName() { 851 return pg_log_Table; 852 } 853 854 public String getBookMarkTableName() { 855 return pg_bookmark_TableName; 856 } 857 858 public boolean isColumnSizeExceedMaximumSize(TypeInfo typeInfo) throws 859 SQLException, RepException { 860 boolean flag = false; 861 int sqlType = typeInfo.getSqlType(); 862 int columnsize = typeInfo.getcolumnSize(); 863 switch (sqlType) { 864 case 1: // char 865 case 12: //varchar 866 if (columnsize > 4192) { 867 flag = true; 868 break; 869 } 870 case -6: // tinyint 871 if (columnsize > 127) { 872 flag = true; 873 break; 874 } 875 } 876 return flag; 877 } 878 879 public boolean isColumnSizeExceedMaximumSize1(ResultSet rs, TypeInfo typeInfo) throws 880 SQLException, RepException { 881 boolean flag = false; 882 int sqlType = typeInfo.getSqlType(); 883 int columnsize = rs.getInt("COLUMN_SIZE"); 884 switch (sqlType) { 885 case 1: // char 886 case 12: //varchar 887 if (columnsize > 4192) { 888 flag = true; 889 break; 890 } 891 case -6: // tinyint 892 if (columnsize > 127) { 893 flag = true; 894 break; 895 } 896 } 897 return flag; 898 } 899 900 public boolean getPrimaryKeyErrorCode(SQLException ex) throws SQLException { 901 if (ex.getMessage().indexOf("_pkey") != -1) { 902 return true; 903 } 904 return false; 905 } 906 907 public int getAppropriatePrecision(int columnSize, String datatypeName) { 908 if (datatypeName.equalsIgnoreCase("numeric") && columnSize > 38) { 909 columnSize = 38; 910 } 911 // new jdbc driver for postgres returns 2147483647 as size of character varying fields with no predetermined size 912 if (datatypeName.equalsIgnoreCase("varchar") && columnSize > 2147483646) 913 columnSize = 0; 914 return columnSize; 915 916 } 917 918 protected void createIndex(String pubsubName, String tableName) throws 919 RepException { 920 StringBuffer createIndexQuery = new StringBuffer(); 921 // create index ind on cmsadm2.R_S_Bank(Rep_sync_id); 922 createIndexQuery.append("create index ") 923 .append(RepConstants.Index_Name(tableName)) 924 .append(" on ") 925 .append(tableName) 926 .append("(") 927 .append(RepConstants.shadow_sync_id1) 928 .append(",") 929 .append(RepConstants.shadow_common_id2) 930 .append(",") 931 .append(RepConstants.shadow_status4) 932 .append(")"); 933 // System.out.println(" createIndexQuery : "+createIndexQuery.toString()); 934 try { 935 runDDL(pubsubName, createIndexQuery.toString()); 936 } 937 catch (RepException ex) { 938 // Ignore the Exception 939 } 940 catch (SQLException ex) { 941 // Ignore the Exception 942 } 943 } 944 945 /* bjt - create pk index to mirror main table index, only not unique */ 946 protected void createPkIndex(String pubsubName, String tableName, String[] primaryColumns) throws 947 RepException { 948 StringBuffer createPkIndexQuery = new StringBuffer(); 949 createPkIndexQuery.append("create index ") 950 .append(RepConstants.Pk_Index_Name(tableName)) 951 .append(" on ") 952 .append(tableName) 953 .append("("); 954 int i; 955 for (i = 0; i < primaryColumns.length - 1; i++) { 956 createPkIndexQuery.append(primaryColumns[i]); 957 createPkIndexQuery.append(","); 958 } 959 createPkIndexQuery.append(primaryColumns[i]); 960 createPkIndexQuery.append(")"); 961 System.out.println(" createPkIndexQuery : "+createPkIndexQuery.toString()); 962 try { 963 runDDL(pubsubName, createPkIndexQuery.toString()); 964 } 965 catch (RepException ex) { 966 // Ignore the Exception 967 } 968 catch (SQLException ex) { 969 // Ignore the Exception 970 } 971 } 972 973 //scale 0 to 23 only 974 public int getAppropriateScale(int columnScale) throws RepException { 975 if (columnScale < 0) { 976 throw new RepException("REP026", new Object[] {"1", "23"}); 977 } 978 else if (columnScale >= 23) { 979 columnScale = 23; 980 } 981 else if (columnScale >= 0 && columnScale < 23) 982 columnScale = columnScale; 983 log.debug("returning columnScale::" + columnScale); 984 return columnScale; 985 } 986 987 public PreparedStatement makePrimaryPreperedStatement(Connection pub_sub_Connection, String[] primaryColumns, String shadowTable, String local_pub_sub_name) throws SQLException, RepException { 988 /* bjt */ 989 StringBuffer query = new StringBuffer(); 990 query.append(" select * from "); 991 query.append(shadowTable); 992 query.append(" where "); 993 query.append(RepConstants.shadow_sync_id1); 994 query.append(" > "); 995 query.append(" ? "); 996 for (int i = 0; i < primaryColumns.length; i++) { 997 query.append(" and "); 998 query.append(primaryColumns[i]); 999 query.append("= ? "); 1000 } 1001 query.append(" order by " + RepConstants.shadow_sync_id1); 1002 query.append(" limit 1 "); 1003 return pub_sub_Connection.prepareStatement(query.toString()); 1004 } 1005 1006 1007 1008 public boolean isForeignKeyException(SQLException ex) throws SQLException { 1009 //System.out.println(" ex Message : "+ex.getMessage()+" ex Error code : "+ex.getErrorCode()); 1010 if (ex.getErrorCode() == 0) 1011 return true; 1012 else 1013 return false; 1014 } 1015 1016 /** 1017 * isPrimaryKeyException 1018 * 1019 * @param ex SQLException 1020 * @return boolean 1021 */ 1022 public boolean isPrimaryKeyException(SQLException ex) throws SQLException 1023 { 1024 if (ex.getMessage().indexOf("_pkey") != -1) 1025 { 1026 return true; 1027 } 1028 return false; 1029 } 1030 1031 public String getIgnoredColumns_Table() { 1032 return pg_ignoredColumns_Table; 1033 } 1034 1035 public String getTrackReplicationTablesUpdation_Table() { 1036 return pg_trackReplicationTablesUpdation_Table; 1037 } 1038 1039 public String getTrackPrimayKeyUpdation_Table() { 1040 return pg_trackPrimaryKeyUpdation_Table; 1041 } 1042 1043 protected void createIgnoredColumnsTable(String pubName) throws SQLException, 1044 RepException { 1045 StringBuffer ignoredColumnsQuery = new StringBuffer(); 1046 ignoredColumnsQuery.append(" Create Table ").append(getIgnoredColumns_Table()). 1047 append(" ( ") 1048 .append(RepConstants.ignoredColumnsTable_tableId1).append(" int , ") 1049 .append(RepConstants.ignoredColumnsTable_ignoredcolumnName2).append( 1050 " varchar(255) , ") 1051 .append(" Primary Key (").append(RepConstants. 1052 ignoredColumnsTable_tableId1).append( 1053 " , ") 1054 .append(RepConstants.ignoredColumnsTable_ignoredcolumnName2).append( 1055 " ) ) "); 1056 runDDL(pubName, ignoredColumnsQuery.toString()); 1057 } 1058 1059 protected void createTrackReplicationTablesUpdationTable(String pubSubName) throws 1060 RepException, SQLException { 1061 StringBuffer trackRepTablesUpdationQuery = new StringBuffer(); 1062 trackRepTablesUpdationQuery.append(" CREATE TABLE ").append(getTrackReplicationTablesUpdation_Table()).append(" ( " + 1063 RepConstants.trackUpdation + " smallint PRIMARY KEY) "); 1064 runDDL(pubSubName, trackRepTablesUpdationQuery.toString()); 1065 runDDL(pubSubName,"Insert into "+getTrackReplicationTablesUpdation_Table()+" values(1)" ); 1066 } 1067 //implement this method for providing provision to stop updations done on shadow table 1068 protected void createTriggerForTrackReplicationTablesUpdationTable(String 1069 pubSubName) throws RepException, SQLException { 1070 /* StringBuffer trackRepTablesUpdationTriggerQuery = new StringBuffer(); 1071 trackRepTablesUpdationTriggerQuery.append(" CREATE TRIGGER TRI_") 1072 .append(getTrackReplicationTablesUpdation_Table()).append( 1073 " ON " + getTrackReplicationTablesUpdation_Table()) 1074 .append(" AFTER INSERT AS DELETE FROM " + 1075 getTrackReplicationTablesUpdation_Table() + " WHERE ") 1076 .append(RepConstants.trackUpdation + " NOT IN(SELECT * FROM inserted)"); 1077 runDDL(pubSubName, trackRepTablesUpdationTriggerQuery.toString());*/ 1078 } 1079 1080 public PreparedStatement makePrimaryPreperedStatementBackwardTraversing(String[] primaryColumns, long lastId, String local_pub_sub_name, String shadowTable) throws SQLException, RepException { 1081 StringBuffer query = new StringBuffer(); 1082 query.append(" select top 1 * from ") 1083 .append(shadowTable) 1084 .append(" where ") 1085 .append(RepConstants.shadow_sync_id1) 1086 .append(" < ? ") 1087 .append(" and ") 1088 .append(RepConstants.shadow_sync_id1) 1089 .append(" > ") 1090 .append(lastId); 1091 for (int i = 0; i < primaryColumns.length; i++) { 1092 query.append(" and ") 1093 .append(primaryColumns[i]) 1094 .append(" = ? "); 1095 } 1096 query.append(" order by limit 1 ") 1097 .append(RepConstants.shadow_sync_id1) 1098 .append(" desc "); 1099 log.debug(query.toString()); 1100 //System.out.println("PostgreSQLHandler makePrimaryPreperedStatementDelete :: " +query.toString()); 1101 Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 1102 return pub_sub_Connection.prepareStatement(query.toString()); 1103 } 1104 1105 /** 1106 * isSchemaSupported 1107 * 1108 * @return boolean 1109 */ 1110 public boolean isSchemaSupported() { 1111 return true; 1112 } 1113 1114 /* 1115 CREATE FUNCTION merge_db (key INT, data TEXT) RETURNS VOID AS 1116 $$ 1117 BEGIN 1118 LOOP 1119 UPDATE db SET b = data WHERE a = key; 1120 IF found THEN 1121 RETURN; 1122 END IF; 1123 1124 BEGIN 1125 INSERT INTO db(a,b) VALUES (key, data); 1126 RETURN; 1127 EXCEPTION WHEN unique_violation THEN 1128 -- do nothing 1129 END; 1130 END LOOP; 1131 END; 1132 $$ 1133 */ 1134 1135 }

