001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 021 package org.dbreplicator.replication.synchronize; 022 023 import java.io.BufferedWriter; 024 import org.dbreplicator.replication.RepConstants; 025 import org.dbreplicator.replication.RepException; 026 import java.io.*; 027 import java.sql.Timestamp; 028 import org.apache.log4j.Logger; 029 import org.dbreplicator.replication.DBHandler.AbstractDataBaseHandler; 030 import org.dbreplicator.replication.column.AbstractColumnObject; 031 import org.dbreplicator.replication.DBHandler.PostgreSQLHandler; 032 import org.dbreplicator.replication.column.BlobObject; 033 import org.dbreplicator.replication.column.ClobObject; 034 import java.sql.Connection; 035 import java.sql.SQLException; 036 import org.dbreplicator.replication.Utility; 037 import java.util.TreeMap; 038 import org.dbreplicator.replication.MetaDataInfo; 039 import org.dbreplicator.replication.RepTable; 040 import org.dbreplicator.replication.xml.XMLElement; 041 import java.util.ArrayList; 042 import java.sql.ResultSet; 043 import java.sql.PreparedStatement; 044 import java.sql.Statement; 045 import java.sql.ResultSetMetaData; 046 047 public class AbstractSynchronize { 048 protected static Logger log = Logger.getLogger(AbstractSynchronize.class.getName()); 049 050 protected boolean setAutoCommitFlag = true; 051 protected boolean isFirstPass = false, isCurrentTableCyclic = false; 052 protected TreeMap allColumnsMap; 053 protected MetaDataInfo mdi; 054 protected BufferedWriter bw; 055 protected AbstractDataBaseHandler dbHandler; 056 protected RepTable repTable; 057 protected Connection connection; 058 protected TreeMap columnObjectTreeMap; 059 protected String remoteServerName, shadowTable, tableName,replicationType, transactionLogType; 060 public int insertCount = 0,updateCount = 0, deleteCount=0; 061 protected String[] primaryColumnNames, changedColumnNames, changedColumnValues, tableColumnNames; 062 protected Object conisderedId, lastShadowUid, lastSyncId; /* bjt - add last sync id */ 063 protected XMLElement xmlElement_NULL; 064 protected AbstractColumnObject[] primaryKeyColumnsObject; 065 protected PreparedStatement PSForLastRecordSameRecordUpdatedExceptPK,PSToGetSyncidForSameOldPKEqualsNewPks; 066 protected String localName; 067 protected String remoteName; 068 069 public AbstractSynchronize() { 070 } 071 072 /** 073 * This method has been implemented to 074 * write the insert operation in transaction 075 * file. 076 */ 077 protected void writeInsertOperationInTransactionLogFile(BufferedWriter bw, 078 String tableName, Object[] insertedRecrods, String replicationType, 079 String transactionLogType) throws RepException { 080 081 if (Utility.createTransactionLogFile && transactionLogType.equalsIgnoreCase("true")) { 082 try { 083 // Timestamp dt = new Timestamp(System.currentTimeMillis()); 084 // bw.write("\n\n"); 085 // bw.write( (dt + "\n")); 086 087 bw.write("\n"); 088 bw.write("[" + replicationType + "]"); 089 bw.write("[" + tableName + "]"); 090 bw.write("[" + RepConstants.insert_operation + "]"); 091 bw.write("["); 092 for (int i = 0; i < insertedRecrods.length; i++) { 093 if (i != 0) { 094 bw.write("," + insertedRecrods[i]); 095 } 096 bw.write("" + insertedRecrods[i]); 097 } 098 bw.write("]"); 099 bw.flush(); 100 } 101 catch (IOException ex1) { 102 RepConstants.writeERROR_FILE(ex1); 103 throw new RepException("REP351", new Object[] {ex1.getMessage()}); 104 } 105 catch (Exception ex) { 106 RepConstants.writeERROR_FILE(ex); 107 } 108 } 109 } 110 111 /** 112 * This method has been implemented to 113 * write the Delete operation in transaction 114 * file. 115 */ 116 117 protected void writeDeleteOperationInTransactionLogFile(BufferedWriter bw, 118 String tableName, String[] pkCols, Object[] deletedRecordPky, 119 String replicationType, String transactionLogType) throws RepException { 120 if (Utility.createTransactionLogFile && transactionLogType.equalsIgnoreCase("true")) { 121 try { 122 // Timestamp dt = new Timestamp(System.currentTimeMillis()); 123 // bw.write("\n\n"); 124 // bw.write( (dt + "\n")); 125 126 bw.write("\n"); 127 bw.write("[" + replicationType + "]"); 128 bw.write("[" + tableName + "]"); 129 bw.write("[" + RepConstants.delete_operation + "]"); 130 bw.write("[PRIMARY KEY VALUES "); 131 for (int i = 0; i < deletedRecordPky.length; i++) { 132 if (i != 0) { 133 bw.write("," + pkCols[i] + " = " + deletedRecordPky[i]); 134 } 135 bw.write(pkCols[i] + " = " + deletedRecordPky[i]); 136 } 137 bw.write("]"); 138 bw.flush(); 139 } 140 catch (IOException ex1) { 141 RepConstants.writeERROR_FILE(ex1); 142 throw new RepException("REP351", new Object[] {ex1.getMessage()}); 143 } 144 catch (Exception ex) { 145 RepConstants.writeERROR_FILE(ex); 146 } 147 } 148 } 149 150 /** 151 * This method has been implemented to 152 * write the Update operation in transaction 153 * file. 154 */ 155 protected void writeUpdateOperationInTransactionLogFile(BufferedWriter bw, 156 String tableName, String[] pkCols, Object[] oldPky, 157 String updatedColsName[], String[] updatedValues, String replicationType, 158 String transactionLogType) throws RepException { 159 if (Utility.createTransactionLogFile && transactionLogType.equalsIgnoreCase("true")) { 160 try { 161 // Timestamp dt = new Timestamp(System.currentTimeMillis()); 162 // bw.write("\n\n"); 163 // bw.write( (dt + "\n")); 164 165 bw.write("\n"); 166 bw.write("[" + replicationType + "]"); 167 bw.write("[" + tableName + "]"); 168 bw.write("[" + RepConstants.update_operation + "]"); 169 bw.write("[PRIMARY KEY VALUES "); 170 for (int i = 0; i < oldPky.length; i++) { 171 if (i != 0) { 172 bw.write("," + pkCols[i] + " = " + oldPky[i]); 173 } 174 bw.write(pkCols[i] + " = " + oldPky[i]); 175 } 176 bw.write("]"); 177 178 bw.write("[ CHANGED COLUMNS "); 179 for (int i = 0; i < updatedColsName.length; i++) { 180 if (i != 0) { 181 bw.write("," + updatedColsName[i] + " = " + updatedValues[i]); 182 } 183 bw.write("" + updatedColsName[i] + " = " + updatedValues[i]); 184 } 185 bw.write("]"); 186 bw.flush(); 187 } 188 catch (IOException ex1) { 189 RepConstants.writeERROR_FILE(ex1); 190 throw new RepException("REP351", new Object[] {ex1.getMessage()}); 191 } 192 193 catch (Exception ex) { 194 RepConstants.writeERROR_FILE(ex); 195 } 196 197 } 198 } 199 200 public static void writeOperationInTransactionLogFile(BufferedWriter bw, 201 int insertOperation, 202 int updateOperation, 203 int deleteOperation, 204 String replicationType) throws Exception { 205 bw.write("\n\n"); 206 bw.write(" Synchronization Type [" + replicationType + "]"); 207 bw.write("\n\n"); 208 bw.write(" Inserts [" + insertOperation + "]"); 209 bw.write("\n"); 210 bw.write(" Updates [" + updateOperation + "]"); 211 bw.write("\n"); 212 bw.write(" Deletes [" + deleteOperation + "]"); 213 bw.write("\n\n"); 214 bw.write(" [" + replicationType + " COMPLETED SUCCESSFULLY]"); 215 bw.write("\n"); 216 217 } 218 219 public static void writeDateInTransactionLogFile(BufferedWriter bw) throws 220 Exception { 221 if(Utility.createTransactionLogFile){ 222 Timestamp dt = new Timestamp(System.currentTimeMillis()); 223 bw.write("\n\n"); 224 bw.write(" Operation Performed on Date: "); 225 bw.write( (dt + "\n")); 226 } 227 } 228 229 public static void writeUnsuccessfullOperationInTransaction(BufferedWriter bw) { 230 try { 231 if(Utility.createTransactionLogFile) { 232 bw.write("\n\n"); 233 bw.write(" [ SYNCHRONIZE OPERATION NOT COMPLETED SUCCESSFULLY]"); 234 bw.write("\n"); 235 } 236 } 237 catch (Exception ex1) { 238 } 239 } 240 241 protected void loggingInsertOperation(String tableName, 242 Object[] insertedRecrods, 243 String replicationType) { 244 245 log.debug("[" + replicationType + "]" + "[" + tableName + "]" + "[" + 246 RepConstants.insert_operation + "]"); 247 for (int i = 0; i < insertedRecrods.length; i++) { 248 log.debug("" + insertedRecrods[i]); 249 } 250 } 251 252 protected void loggingDeleteOperation(String tableName, String[] pkCols, 253 Object[] deletedRecordPky, 254 String replicationType) { 255 log.debug("[" + replicationType + "]" + "[" + tableName + "]" + "[" + 256 RepConstants.delete_operation + "]" + "[PRIMARY KEY VALUES "); 257 for (int i = 0; i < deletedRecordPky.length; i++) { 258 log.debug(pkCols[i] + " = " + deletedRecordPky[i]); 259 260 } 261 } 262 263 protected void loggingUpdateOperation(String tableName, String[] pkCols, 264 Object[] oldPky, String updatedColsName[], 265 String[] updatedValues, 266 String replicationType) { 267 log.debug("[" + replicationType + "]" + "[" + tableName + "]" + "[" +RepConstants.update_operation + "]" + "PRIMARY KEY VALUES "); 268 for (int i = 0; i < oldPky.length; i++) { 269 log.debug(pkCols[i] + " = " + oldPky[i]); 270 } 271 log.debug(" CHANGED COLUMNS "); 272 for (int i = 0; i < updatedColsName.length; i++) { 273 log.debug("" + updatedColsName[i] + " = " + updatedValues[i]); 274 } 275 } 276 277 /** 278 * This has been implemented to handle the problem 279 * related to CLOB and BLOB data type. Postgre 280 * do not to insert LOB object in autocommit mode. 281 * @param dbHandler AbstractDataBaseHandler 282 * @param aco AbstractColumnObject 283 * @return boolean 284 */ 285 public boolean checkAutocommit(AbstractDataBaseHandler dbHandler, 286 AbstractColumnObject aco) { 287 if (dbHandler instanceof PostgreSQLHandler && 288 (aco instanceof ClobObject || aco instanceof BlobObject)) { 289 return false; 290 } 291 return true; 292 } 293 294 public void setAutocomitTrueAndCommitRecord(Connection conn) throws 295 SQLException { 296 conn.commit(); 297 conn.setAutoCommit(true); 298 } 299 300 ////////////////////////////////// 301 302 303 /** 304 * It return false if record is not updated else return true 305 */ 306 protected boolean isPrimaryKeyUpdated(Object[] priamryColValues,ArrayList updatedPrimaryKey) throws SQLException, RepException { 307 boolean isPrimaryKeyUpdated = false; 308 if(updatedPrimaryKey.size()==0) { 309 return isPrimaryKeyUpdated; 310 } 311 312 for (int i = 0; i < updatedPrimaryKey.size(); i++) { 313 Object[] updatedPk = (Object[]) updatedPrimaryKey.get(i); 314 for (int j = 0; j < updatedPk.length; j++) { 315 if (updatedPk[j].equals(priamryColValues[j])) { 316 isPrimaryKeyUpdated=true; 317 } 318 } 319 if(isPrimaryKeyUpdated) { 320 updatedPrimaryKey.remove(i); 321 break; 322 } 323 } 324 return isPrimaryKeyUpdated; 325 } 326 327 /* 328 SELECT * FROM Rep_Shadow_PUSHTABLE WHERE (Rep_sync_id = (SELECT MAX(rep_sync_id) 329 FROM Rep_Shadow_PUSHTABLE WHERE rollno = 11 AND (rep_old_rollNo = 11) AND rep_Status='A')) 330 331 This method is used if same record is updated many times(except primary column i.e primary 332 column is not updated) This leads to improving performance(As suggested by parveen sir) And 333 to avoid traversing of the syncIds which couldnt add in the viewId we use 334 addSyncidToViewIdForSameOldPKEqualsNewPks(String primaryCols[],Object[] primaryColValues, 335 Object MaxUID) method. 336 */ 337 protected Object getUIDRecordUpdatedExceptPK(String 338 primaryCols[], Object[] primaryColValues, Object[] oldPrimaryColValues) throws SQLException, RepException { 339 ResultSet rs = null; 340 try { 341 //check if old primary keys and new primary keys are same or not. 342 // If Yes,then return else coontinue 343 for (int i = 0; i < primaryColValues.length; i++) { 344 if (!primaryColValues[i].toString().equals(oldPrimaryColValues[i].toString())) 345 return null; 346 } 347 PSForLastRecordSameRecordUpdatedExceptPK = makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(primaryColumnNames); 348 int indexPrimaryColumn = 0; 349 for (indexPrimaryColumn = 0; indexPrimaryColumn < primaryCols.length;indexPrimaryColumn++) { 350 PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +1, primaryColValues[indexPrimaryColumn]); 351 } 352 for (int j = 0; j < primaryCols.length; j++) { 353 PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +j + 1, oldPrimaryColValues[j]); 354 } 355 // long startTime =System.currentTimeMillis(); 356 rs = PSForLastRecordSameRecordUpdatedExceptPK.executeQuery(); 357 //System.out.println(" SSSSSSSSSS TIME TAKEN IN UID QUERY EXECUTION :: "+(System.currentTimeMillis()-startTime)); 358 rs.next(); 359 Object Uid = rs.getObject(1); 360 //Add the SyncIds to viewIds Which we have considered indirectly through above Query 361 // addSyncidToViewIdForSameOldPKEqualsNewPks(primaryColValues, Uid); 362 return Uid; 363 } 364 finally { 365 if (rs != null) { 366 try { 367 rs.close(); 368 } 369 catch (SQLException ex2) { //ignore SQLException 370 } 371 } 372 if (PSForLastRecordSameRecordUpdatedExceptPK != null) { 373 try { 374 PSForLastRecordSameRecordUpdatedExceptPK.close(); 375 } 376 catch (SQLException ex2) { //ignore SQLException 377 } 378 379 } 380 } 381 } 382 383 /* 384 Query to get syncIds : 385 " SELECT REP_SYNC_ID FROM SHADOW_TABLE WHERE 386 primarycolumns=rep_old_primarycolumns and pk=?; " 387 SyncID which we get through this query if not 388 already present in viewIDs and less than the MaxUId 389 then add into viewID. MaxUId is the SyncId upto which 390 we have considered shadow table record for that particular 391 primary key 392 */ 393 394 protected void addSyncidToViewIdForSameOldPKEqualsNewPks(Object[] 395 primaryColValues, Object MaxUID,ArrayList viewedIds) throws SQLException, RepException { 396 ResultSet rs = null; 397 try { 398 PSToGetSyncidForSameOldPKEqualsNewPks = makeQueryToGetViewId(); 399 for (int i = 0; i < primaryColValues.length; i++) 400 PSToGetSyncidForSameOldPKEqualsNewPks.setObject(i + 1,primaryColValues[i]); 401 rs = PSToGetSyncidForSameOldPKEqualsNewPks.executeQuery(); 402 while (rs.next()) { 403 Object Uid = rs.getObject(1); 404 if (!viewedIds.contains(Uid) && 405 ( (Number) Uid).longValue() < ( (Number) MaxUID).longValue()) { 406 viewedIds.add(Uid); 407 } 408 } 409 } 410 finally { 411 if (rs != null) { 412 try { 413 rs.close(); 414 } 415 catch (SQLException ex2) { //ignore SQLException 416 } 417 } 418 } 419 } 420 421 protected PreparedStatement makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(String[] primaryColsName) throws SQLException, RepException { 422 StringBuffer query = new StringBuffer(); 423 query.append("SELECT ").append(RepConstants.shadow_sync_id1).append(" FROM ") 424 .append(shadowTable).append(" WHERE (").append(RepConstants.shadow_sync_id1) 425 .append(" = (SELECT MAX(").append(RepConstants.shadow_sync_id1).append(") FROM ") 426 .append(shadowTable).append(" WHERE "); 427 for (int i = 0; i < primaryColsName.length; i++) { 428 if (i != 0) { 429 query.append(" AND "); 430 } 431 query.append(primaryColsName[i]).append("= ? "); 432 } 433 query.append(" AND "); 434 for (int i = 0; i < primaryColsName.length; i++) { 435 if (i != 0) { 436 query.append(" AND "); 437 } 438 query.append(" REP_OLD_").append(primaryColsName[i]).append("= ? "); 439 } 440 query.append(" AND ").append(RepConstants.shadow_status4) 441 .append(" = '").append(RepConstants.afterUpdate).append(" ')) "); 442 return connection.prepareStatement(query.toString()); 443 } 444 445 protected PreparedStatement makeQueryToGetViewId() throws SQLException, RepException { 446 StringBuffer query = new StringBuffer(); 447 query.append("SELECT ").append(RepConstants.shadow_sync_id1) 448 .append(" FROM ").append(shadowTable).append(" WHERE "); 449 for (int i = 0; i < primaryColumnNames.length; i++) { 450 if (i != 0) 451 query.append(" AND "); 452 query.append(primaryColumnNames[i]).append("= ").append(" REP_OLD_").append(primaryColumnNames[i]); 453 } 454 query.append(" AND "); 455 for (int i = 0; i < primaryColumnNames.length; i++) { 456 if (i != 0) 457 query.append(" AND "); 458 query.append(primaryColumnNames[i]).append("= ?"); 459 } 460 return connection.prepareStatement(query.toString()); 461 } 462 463 464 }

