001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 package org.dbreplicator.replication.xml; 021 022 import java.io.*; 023 import java.net.*; 024 import java.sql.*; 025 import java.util.*; 026 027 import org.dbreplicator.replication.*; 028 import org.dbreplicator.replication.DBHandler.*; 029 import org.dbreplicator.replication.zip.*; 030 import org.apache.log4j.Logger; 031 import java.math.BigDecimal; 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 035 /** 036 * This class was implemented for handling all the issues that will occure at 037 * the time of creating XML file for synchronization purpose. 038 */ 039 040 public class SyncXMLCreator { 041 042 String local_pub_sub_name, shadowTable, remoteServerName; 043 ConnectionPool connectionPool; 044 AbstractDataBaseHandler dbDataTypeHandler; 045 // private ArrayList viewedIds; 046 private HashMap viewedIds; 047 private String[] primaryColNames; 048 private int[] primaryColumnTypes; 049 private PreparedStatement primaryPreparedStatement,primaryPreparedStatementBackwardTraversing ,commonPreparedStatement, 050 commonPreparedStatementForBackwardTraversing, PSForLastRecordSameRecordUpdatedExceptPK, 051 PSToGetSyncidForSameOldPKEqualsNewPks; 052 private Statement commonStatement=null; 053 public ServerSocket serverSocket = null; 054 RepTable repTable; 055 String filterClause; 056 String tableName; 057 SchemaQualifiedName sname; 058 private String[] parameters; 059 private boolean USE_getLastRecord = true; 060 protected static Logger log = Logger.getLogger(SyncXMLCreator.class.getName()); 061 int countWriteDelementelement=0,countWriteupdateElement=0,countWriteInsertElement=0,commPreCount=0,primaryCount=0; 062 /** 063 * Creates an XML file for synchronizing data between server and client i.e. Subscriber and publisher. 064 * @param pub_sub_name0 065 * @param connectionPool0 066 * @param dbDataTypeHandler0 067 */ 068 public SyncXMLCreator(String pub_sub_name0, ConnectionPool connectionPool0, AbstractDataBaseHandler dbDataTypeHandler0) { 069 local_pub_sub_name = pub_sub_name0; 070 connectionPool = connectionPool0; 071 dbDataTypeHandler = dbDataTypeHandler0; 072 } 073 074 /** 075 if operation is I , insert the record 076 if operation is U check for its last record and also make extra tag for primary key of the initial record 077 and also make attribute for columns which are updated. 078 if operation id D then mark it for deletion. 079 080 case1. 081 ===== 082 If a user isnert a new record then to delete it. It does not considered 083 and we do not write it in XML file. 084 085 If a new record is inserted and after that it is updated in that case it 086 considered as a new inserted record not a updated record 087 */ 088 public Object[] createXMLFile(String xmlFileURL, String zipFileURL, 089 String xmlFileName, String remote_Pub_Sub_Name, 090 ArrayList pubRepTables, 091 String clientServerName, 092 int noOfTables, boolean DeleteXML, 093 String local_pub_subName,boolean isSchemaSupported, _FileUpload fileUpload,String localMachineAddress,String remoteMachineAddress) throws RepException { 094 Connection pub_sub_Connection = null; 095 ResultSet rows = null; 096 try { 097 ArrayList EnCodedcols; 098 FileOutputStream fos = new FileOutputStream(xmlFileURL); 099 OutputStreamWriter os = new OutputStreamWriter(fos); 100 BufferedWriter bw = new BufferedWriter(os); 101 pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 102 XMLWriter xmlWriter = new XMLWriter(bw, dbDataTypeHandler,pub_sub_Connection); 103 bw.write("<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>"); 104 bw.write("<root>"); 105 ArrayList usedActualTables = new ArrayList(); 106 String[] primarycols; 107 remoteServerName = clientServerName; 108 Object[] lastIdArray = new Object[noOfTables]; 109 /* bjt */ 110 /* grab database at this point and ignore further changes until next 111 * sync session by starting transaction and setting isolation level to 112 * 'serializable'. This is SQL92 compliant, so should be fine... */ 113 /* begin work */ 114 /* set transaction isolation level serializable */ 115 Statement transtmt = pub_sub_Connection.createStatement(); 116 transtmt.executeUpdate("begin work"); /* bjt */ 117 /* bjt - only do for postgres...until we test on other dbs */ 118 if (dbDataTypeHandler.getvendorName() == Utility.DataBase_PostgreSQL) { 119 transtmt.executeUpdate("set transaction isolation level serializable"); /* bjt */ 120 } 121 /* bjt - set autocommit false for select */ 122 for (int i = 0; i < noOfTables; i++) { 123 try { 124 repTable = ( (RepTable) pubRepTables.get(i)); 125 if (repTable.getCreateShadowTable().equalsIgnoreCase(RepConstants.NO)) 126 continue; 127 sname = repTable.getSchemaQualifiedName(); 128 tableName = isSchemaSupported ? sname.toString() : sname.getTableName(); 129 EnCodedcols = PathHandler.getEncodedColumns(tableName); 130 shadowTable = RepConstants.shadow_Table(repTable.getSchemaQualifiedName().toString()); 131 lastIdArray[i] = getLastUIDFromShadowTable(shadowTable); 132 // viewedIds = new ArrayList(); 133 viewedIds = new HashMap(); 134 long lastId = getLastSyncId(pub_sub_Connection, remote_Pub_Sub_Name, tableName); 135 primaryColNames = repTable.getPrimaryColumns(); 136 xmlWriter.setNoOFPrimaryColumnNumber(primaryColNames.length); 137 primaryPreparedStatement = dbDataTypeHandler. 138 makePrimaryPreperedStatement(pub_sub_Connection, primaryColNames, shadowTable, local_pub_sub_name); 139 commonPreparedStatement = makeCommonPreparedStatement(pub_sub_Connection, shadowTable); 140 //statement whose resulset is set in tracer in getlastrecord in update case 141 commonStatement = pub_sub_Connection.createStatement(); 142 // deleting records from shadow table with primaryColumns of Main Table as NULL 143 deleteRecordsFromShadowTableWithNullPk(); 144 145 /* bjt */ 146 pub_sub_Connection.setAutoCommit(false); 147 String query = "Select * from " + shadowTable + " where " + 148 RepConstants.shadow_sync_id1 + 149 " > " + lastId + " and " + RepConstants.shadow_serverName_n + 150 " != '" + remoteServerName + "' order by " +RepConstants.shadow_sync_id1; 151 filterClause = repTable.getFilterClause(); 152 rows = getResultSet(pub_sub_Connection, query); 153 primaryColumnTypes = new int[primaryColNames.length]; 154 ResultSetMetaData rsmt = rows.getMetaData(); 155 int noOfColumns = rsmt.getColumnCount(); 156 String operation; 157 ArrayList updatedPrimaryKey = getUpdatedPrimaryKey(); /* bjt */ 158 if (rows.next()) { 159 bw.write("<tableName>"); 160 bw.write(tableName); 161 //if (usedActualTables.contains(tableName)) 162 /* bjt - should be negative...if it does not contain the tableName, add it */ 163 if (!usedActualTables.contains(tableName)) 164 usedActualTables.add(tableName); 165 do { 166 // int icount =0; 167 // long time = System.currentTimeMillis(); 168 operation = rows.getString(RepConstants.shadow_operation3); 169 if (operation.equalsIgnoreCase(RepConstants.insert_operation)) { 170 // Write the insert element in XML file. 171 makeInsertElement(bw, operation, noOfColumns, xmlWriter, rows,rsmt, remoteServerName, EnCodedcols, updatedPrimaryKey); /* bjt */ 172 } 173 else if (operation.equals(RepConstants.update_operation)) { 174 // Write the update element in XML file. 175 makeUpdateElement(bw, noOfColumns, rsmt, xmlWriter, rows,shadowTable, remoteServerName,EnCodedcols, updatedPrimaryKey); /* bjt */ 176 } 177 else if (operation.equals(RepConstants.delete_operation)) { 178 // Write the delete element in XML file. 179 makeDeleteElement(bw, noOfColumns, xmlWriter, rows, EnCodedcols); 180 } 181 182 // System.out.println((i++)+" time taken in insert "+ (System.currentTimeMillis()-time)); 183 } 184 while (rows.next()); 185 bw.write("</tableName>\r\n"); 186 } 187 /* bjt - set autocommit back to true */ 188 pub_sub_Connection.setAutoCommit(true); 189 190 // change last_sync id in bookmarks table 191 // updateBookMarkLastSyncId(shadowTable, tableName, remote_Pub_Sub_Name); 192 } 193 finally { 194 try { 195 if (rows != null) { 196 Statement st = rows.getStatement(); 197 rows.close(); 198 st.close(); 199 } 200 if (primaryPreparedStatement != null) { 201 primaryPreparedStatement.close(); 202 } 203 if (commonPreparedStatement != null) { 204 commonPreparedStatement.close(); 205 } 206 if (commonStatement != null) { 207 commonStatement.close(); 208 } 209 if(primaryPreparedStatementBackwardTraversing!=null){ 210 primaryPreparedStatementBackwardTraversing.close(); 211 }if(commonPreparedStatementForBackwardTraversing!=null){ 212 commonPreparedStatementForBackwardTraversing.close(); 213 } 214 } 215 catch (SQLException ex1) { 216 //Ignore Exception 217 } 218 } 219 220 } 221 //commented By Nancy on 29-03-2005 222 //to avoid record skipping during realTime Scheduling 223 /* for (int i = 0; i < noOfTables; i++) { 224 repTable = ( (RepTable) pubRepTables.get(i)); 225 tableName = repTable.getSchemaQualifiedName().toString(); 226 shadowTable = RepConstants.shadow_Table(repTable.getSchemaQualifiedName().toString()); 227 updateBookMarkLastSyncId(shadowTable, tableName, remote_Pub_Sub_Name,lastIdArray[i]); 228 }*/ 229 230 /* bjt */ 231 /* commit */ 232 transtmt.executeUpdate("commit"); /* bjt */ 233 transtmt.close(); 234 bw.write("</root>"); 235 bw.close(); 236 os.close(); 237 fos.close(); 238 if(!localMachineAddress.equalsIgnoreCase(remoteMachineAddress)) { 239 // making zip file from xml file 240 ZipHandler.makeZip(zipFileURL, xmlFileURL, xmlFileName); 241 // writing zip file on socket 242 // writeZIPFileOnClientSocket(socket, zipFileURL); 243 WriteOnSocket writeOnSocket = new WriteOnSocket(zipFileURL, xmlFileURL,DeleteXML, xmlFileName, fileUpload, true); 244 writeOnSocket.start(); 245 writeOnSocket.join(); 246 } 247 return new Object[] { 248 usedActualTables, lastIdArray}; 249 } 250 catch (Exception ex) { 251 log.error(ex.getMessage(), ex); 252 RepException rep = new RepException("REP057", new Object[] {ex.getMessage()}); 253 rep.setStackTrace(ex.getStackTrace()); 254 throw rep; 255 } 256 finally { 257 connectionPool.returnConnection(pub_sub_Connection); 258 } 259 } 260 261 /** 262 * prepared statement for getting common record for the copmmon id 263 * @param shadowTable 264 * @return 265 * @throws SQLException 266 */ 267 private PreparedStatement makeCommonPreparedStatement(Connection pub_sub_Connection, String shadowTable) throws SQLException, RepException { 268 StringBuffer query = new StringBuffer(); 269 query.append(" select * from "); 270 query.append(shadowTable); 271 query.append(" where "); 272 query.append(RepConstants.shadow_sync_id1); 273 query.append(" > "); 274 query.append("? "); 275 query.append(" and "); 276 query.append(RepConstants.shadow_common_id2); 277 query.append(" = "); 278 query.append("? "); 279 query.append(" and "); 280 query.append(RepConstants.shadow_status4); 281 query.append(" = '"); 282 query.append(RepConstants.afterUpdate); 283 query.append("' "); 284 //check (dbDataTypeHandler.getvendorName() == Utility.DataBase_DB2) added by nancy to handle blob clob case 285 //System.out.println(" commonPreapredStatement:: "+query.toString().toUpperCase()); 286 // if (dbDataTypeHandler.getvendorName() == Utility.DataBase_PostgreSQL || 287 // dbDataTypeHandler.getvendorName() == Utility.DataBase_DB2) { 288 return pub_sub_Connection.prepareStatement(query.toString()); 289 // } 290 // return pub_sub_Connection.prepareStatement(query.toString(), 291 // ResultSet.TYPE_SCROLL_INSENSITIVE, 292 // ResultSet.CONCUR_UPDATABLE); 293 294 } 295 296 /** 297 * returns last synchronization id from bookmark TableName 298 * @param remote_Pub_Sub_subName 299 * @param tableName 300 * @return 301 */ 302 private long getLastSyncId(Connection pub_sub_Connection, String remote_Pub_Sub_subName, String tableName) { 303 ResultSet rs = null; 304 try { 305 StringBuffer query = new StringBuffer(); 306 query.append(" Select ").append(RepConstants.bookmark_lastSyncId4).append(" from ") 307 .append(dbDataTypeHandler.getBookMarkTableName()).append(" where ") 308 .append(RepConstants.bookmark_LocalName1) 309 .append(" = '").append(local_pub_sub_name).append("' and ") 310 .append(RepConstants.bookmark_RemoteName2) 311 .append(" = '").append(remote_Pub_Sub_subName).append("'") 312 .append(" and ").append(RepConstants.bookmark_TableName3) 313 .append("= '") 314 .append(tableName).append("'"); 315 log.debug(query.toString()); 316 rs = getResultSet(pub_sub_Connection, query.toString()); 317 Object lastIdObj = null; 318 if (rs.next()) { 319 // It may rise class cast exception and require casting after check object type 320 // use insetanceof java feature for multiple handling 321 lastIdObj = rs.getObject(RepConstants.bookmark_lastSyncId4); 322 } 323 long lastId = 0; 324 if (lastIdObj instanceof Long) { 325 lastId = lastIdObj == null ? 0 : ( (Long) lastIdObj).longValue(); 326 } 327 else if (lastIdObj instanceof BigDecimal) { 328 lastId = lastIdObj == null ? 0 : ( (BigDecimal) lastIdObj).longValue(); 329 } 330 else if (lastIdObj instanceof Integer) { 331 lastId = lastIdObj == null ? 0 : ( (Integer) lastIdObj).longValue(); 332 } 333 else if (lastIdObj instanceof Double) { 334 lastId = lastIdObj == null ? 0 : ( (Double) lastIdObj).longValue(); 335 } else if(lastIdObj instanceof String) { 336 lastId = lastIdObj == null ? 0 : ( Long.parseLong((String)lastIdObj) ); 337 } 338 return lastId; 339 } 340 catch (Exception ex) { 341 return 0; 342 } 343 finally { 344 try { 345 if (rs != null) { 346 Statement st = rs.getStatement(); 347 rs.close(); 348 if (st != null) 349 st.close(); 350 } 351 } 352 catch (SQLException ex1) { 353 } 354 } 355 } 356 357 private ResultSet getResultSet(Connection pub_sub_Connection, String nonPreparedQuery) throws SQLException, RepException { 358 //Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 359 360 Statement st; 361 if (dbDataTypeHandler.getvendorName() == Utility.DataBase_PostgreSQL || 362 dbDataTypeHandler.getvendorName() == Utility.DataBase_Cloudscape || 363 dbDataTypeHandler.getvendorName() == Utility.DataBase_DB2 || 364 dbDataTypeHandler.getvendorName() == Utility.DataBase_SqlServer) { 365 st = pub_sub_Connection.createStatement(); 366 } 367 else { 368 st = pub_sub_Connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY); 369 } 370 log.debug(nonPreparedQuery); 371 return st.executeQuery(nonPreparedQuery); 372 } 373 374 /** 375 * creates and XML Insert Element 376 * @param operation 377 * @param noOfColumns 378 * @param xmlWriter 379 * @param rows_I 380 * @param rsmt 381 * @param remoteServerName 382 * @throws SQLException 383 * @throws IOException 384 * @throws RepException 385 */ 386 private void makeInsertElement(Writer bw, String operation, int noOfColumns, 387 XMLWriter xmlWriter, ResultSet rows_I, 388 ResultSetMetaData rsmt, 389 String remoteServerName, 390 ArrayList encodedCols, ArrayList updatedPrimaryKey) throws SQLException, 391 IOException, RepException { 392 393 Object Uid = rows_I.getObject(RepConstants.shadow_sync_id1); 394 395 if (viewedIds.containsKey(Uid)) { 396 viewedIds.remove(Uid); 397 return; 398 } 399 400 Object[] primaryColValues = new Object[primaryColNames.length]; 401 402 for (int i = 0; i < primaryColNames.length; i++) { 403 primaryColValues[i] = rows_I.getObject(primaryColNames[i]); 404 } 405 406 if (USE_getLastRecord) { // can comment this code if no error comes 407 makeInsertElement_GetLastRecord(bw, operation, noOfColumns, xmlWriter, 408 rows_I, rsmt, remoteServerName, 409 primaryColValues, 410 encodedCols, updatedPrimaryKey); 411 } 412 else { 413 if (filterResultSet(rows_I)) { 414 writeInsertElement(bw, operation, noOfColumns, xmlWriter, rows_I, rsmt, 415 primaryColValues, encodedCols); 416 } 417 } 418 } 419 420 /** 421 * creates and XML Insert Element 422 * iF USED with getLast Records for shadow Table 423 * @param operation 424 * @param noOfColumns 425 * @param xmlWriter 426 * @param rows_I 427 * @param rsmt 428 * @param remoteServerName 429 * @param primaryColValues 430 * @throws IOException 431 * @throws RepException 432 * @throws SQLException 433 */ 434 private void makeInsertElement_GetLastRecord(Writer bw, String operation, 435 int noOfColumns, 436 XMLWriter xmlWriter, 437 ResultSet rows_I, 438 ResultSetMetaData rsmt, 439 String remoteServerName, 440 Object[] primaryColValues, 441 ArrayList encodedCols, ArrayList updatedPrimaryKey) throws 442 IOException, RepException, SQLException { 443 Object UId = rows_I.getObject(RepConstants.shadow_sync_id1); 444 Tracer tracer = new Tracer(); 445 getLastRecord(primaryColValues, UId, tracer, remoteServerName, updatedPrimaryKey); 446 if (!tracer.recordFound) { 447 if (filterResultSet(rows_I)) { 448 writeInsertElement(bw, operation, noOfColumns, xmlWriter, rows_I, rsmt, 449 primaryColValues, encodedCols); 450 } 451 } 452 else if (tracer.type.equals(RepConstants.update_operation)) { 453 ResultSet rsTracer = tracer.rs; 454 if (filterResultSet(rsTracer)) { 455 writeInsertElement(bw, operation, noOfColumns, xmlWriter, rsTracer, 456 rsTracer.getMetaData(), tracer.primaryKeyValues, 457 encodedCols); 458 } 459 } 460 // else if (tracer.type.equalsIgnoreCase(RepConstants.delete_operation)) { 461 // } 462 } 463 464 /** 465 * creates a XML element for Insert 466 * @param operation 467 * @param noOfColumns 468 * @param xmlWriter 469 * @param rows_I 470 * @param rsmt 471 * @param primaryColValues 472 * @throws SQLException 473 * @throws IOException 474 * @throws RepException 475 */ 476 private void writeInsertElement(Writer bw, String operation, int noOfColumns, 477 XMLWriter xmlWriter, ResultSet rows_I, 478 ResultSetMetaData rsmt, 479 Object[] primaryColValues, 480 ArrayList encodedCols) throws 481 SQLException, IOException, RepException { 482 /* 483 // just an example not mandat implementation. 484 <operation>Insert 485 <row> 486 <columnName name="c1"> 487 <![CDATA[ value value value ]]> 488 </columnName> 489 490 <columnName name="c2"> // suppose column is of blob/clob type 491 <start> start position index</start> 492 <length> number of characters</length> 493 </columnName> 494 </row> 495 <primary> 496 <primaryColumnNames name="p1"> 497 <![CDATA[ value value value ]]> 498 </primaryColumnNames> 499 </primary> 500 </operation> 501 */ 502 503 long startTime = System.currentTimeMillis(); 504 bw.write("<operation>"); 505 bw.write(operation); 506 xmlWriter.writeRowElement(noOfColumns, rows_I, rsmt, primaryColNames,primaryColValues, tableName, encodedCols); 507 xmlWriter.writePrimaryKeyElement(primaryColNames, primaryColValues,encodedCols); 508 bw.write("</operation>\r\n"); 509 if(countWriteInsertElement<=5) { 510 //System.out.println(" Time taken in write the INSERT element : " +(System.currentTimeMillis() - startTime)); 511 countWriteInsertElement++; 512 } 513 } 514 515 /** 516 * Returns a resutSet for the other common record corresponding to the common Id passed. 517 * @param rs 518 * @param tracer 519 * @return 520 * @throws SQLException 521 */ 522 private ResultSet getOtherCommonRecord(ResultSet rs, Tracer tracer) throws SQLException, RepException { 523 // so that if the method is called recursively then result set created is not closed. 524 // commonPreparedStatement = makeCommonPreparedStatement(shadowTable); 525 Object commonId = rs.getObject(RepConstants.shadow_common_id2); 526 String serverName = rs.getString(RepConstants.shadow_serverName_n); 527 Object UId = rs.getObject(RepConstants.shadow_sync_id1); 528 if (!serverName.equalsIgnoreCase(remoteServerName)) { 529 viewedIds.put(UId,null); 530 } 531 commonPreparedStatement.setObject(1, UId); 532 /* bjt - try for quotes */ 533 //commonPreparedStatement.setObject(1, UId.toString()); 534 commonPreparedStatement.setObject(2, commonId); 535 /* bjt - try for quotes */ 536 //commonPreparedStatement.setObject(2, commonId.toString()); 537 long starttime = System.currentTimeMillis(); 538 ResultSet resultSet = commonPreparedStatement.executeQuery(); 539 // if(commPreCount<5) { 540 //System.out.println(" TIME TAKNE IN EXECUTION OF COMMON PREPARED STATEMENT : " +(System.currentTimeMillis() - starttime)); 541 // commPreCount++; 542 // } 543 544 return resultSet; 545 } 546 547 /** 548 * creates and XML Element for Update 549 * @param noOfColumns 550 * @param rsmt 551 * @param xmlWriter 552 * @param rows_U 553 * @param shadowTableName 554 * @param remoteServerName 555 * @throws java.lang.Exception 556 */ 557 private void makeUpdateElement(Writer bw, int noOfColumns, 558 ResultSetMetaData rsmt 559 , XMLWriter xmlWriter, ResultSet rows_U, 560 String shadowTableName, 561 String remoteServerName, 562 ArrayList encodedCols, ArrayList updatedPrimaryKey) throws Exception { 563 Tracer tracer = new Tracer(); 564 ResultSet rs = null; 565 try { 566 Object Uid = rows_U.getObject(RepConstants.shadow_sync_id1); 567 if (viewedIds.containsKey(Uid)) { 568 viewedIds.remove(Uid); 569 return; 570 } 571 // if (rows_U.getString(RepConstants.shadow_status4).equalsIgnoreCase( 572 // RepConstants.afterUpdate)) { 573 // throw new Exception(" NOT POSSIBLE "); 574 // } 575 576 Object[] oldPrimaryColValues = new Object[primaryColNames.length]; 577 for (int i = 0; i < primaryColNames.length; i++) { 578 oldPrimaryColValues[i] = rows_U.getObject(primaryColNames[i]); 579 } 580 rs = getOtherCommonRecord(rows_U, tracer); 581 boolean recordPresent = rs.next(); 582 Object[] primaryColValues = new Object[primaryColNames.length]; 583 for (int i = 0; i < primaryColNames.length; i++) { 584 primaryColValues[i] = rs.getObject(primaryColNames[i]); 585 } 586 String serverName = rs.getString(RepConstants.shadow_serverName_n); 587 Object currentUId = rs.getObject(RepConstants.shadow_sync_id1); 588 if (!serverName.equalsIgnoreCase(remoteServerName)) { 589 viewedIds.put(currentUId,null); 590 } 591 592 if (USE_getLastRecord) { 593 makeUpdateElement_getLastRecord(bw, noOfColumns, xmlWriter, rows_U, 594 remoteServerName, oldPrimaryColValues, 595 tracer, rs, primaryColValues, 596 currentUId, encodedCols, updatedPrimaryKey); 597 } 598 else { 599 makeUpdateEment_ForFilter(bw, noOfColumns, xmlWriter, rows_U, 600 oldPrimaryColValues, rs, primaryColValues, 601 tracer, encodedCols); 602 } 603 } 604 finally { 605 if (tracer.rs != null) { 606 ResultSet rstemp = tracer.rs; 607 rstemp.close(); 608 } 609 if (rs != null) { 610 rs.close(); 611 } 612 } 613 614 } 615 616 /** 617 * creates an XML element for update With filter clause 618 * @param noOfColumns 619 * @param xmlWriter 620 * @param rows_U 621 * @param oldPrimaryColValues 622 * @param newRs 623 * @param primaryColValues 624 * @throws java.lang.Exception 625 */ 626 private void makeUpdateEment_ForFilter(Writer bw, int noOfColumns, 627 XMLWriter xmlWriter, 628 ResultSet rows_U, 629 Object[] oldPrimaryColValues, 630 ResultSet newRs, 631 Object[] primaryColValues, 632 Tracer tracer, 633 ArrayList encodedCols) throws 634 Exception { 635 boolean oldFilterResult = filterResultSet(rows_U); 636 boolean newFilterResult = filterResultSet(newRs); 637 if (oldFilterResult && newFilterResult) { 638 writeUpdateElement(bw, noOfColumns, xmlWriter, newRs, newRs.getMetaData(), 639 oldPrimaryColValues, primaryColValues, rows_U, tracer, 640 encodedCols); 641 } 642 else if (oldFilterResult && (!newFilterResult)) { 643 writeDeleteElement(bw, xmlWriter, rows_U, oldPrimaryColValues, 644 encodedCols); 645 } 646 else if ( (!oldFilterResult) && newFilterResult) { 647 writeInsertElement(bw, RepConstants.insert_operation, 648 newRs.getMetaData().getColumnCount(), xmlWriter, newRs, 649 newRs.getMetaData(), primaryColValues, 650 encodedCols); 651 } 652 } 653 654 /** 655 * creates an XML element for Update is Get Last record from shadow Table is Used. 656 * @param noOfColumns 657 * @param xmlWriter 658 * @param rows_U 659 * @param remoteServerName 660 * @param oldPrimaryColValues 661 * @param tracer 662 * @param rs 663 * @param primaryColValues 664 * @param currentUId 665 * @throws java.lang.Exception 666 */ 667 private void makeUpdateElement_getLastRecord(Writer bw, int noOfColumns, 668 XMLWriter xmlWriter, 669 ResultSet rows_U, 670 String remoteServerName, 671 Object[] oldPrimaryColValues, 672 Tracer tracer, ResultSet rs, 673 Object[] primaryColValues, 674 Object currentUId, 675 ArrayList encodedCols, ArrayList updatedPrimaryKey) throws 676 Exception { 677 getLastRecord(primaryColValues, currentUId, tracer, remoteServerName, updatedPrimaryKey); 678 if (!tracer.recordFound) { 679 makeUpdateEment_ForFilter(bw, noOfColumns, xmlWriter, rows_U, 680 oldPrimaryColValues, rs, primaryColValues, 681 tracer, encodedCols); 682 } 683 else if (tracer.type.equals(RepConstants.delete_operation)) { 684 if (filterResultSet(rows_U)) { 685 writeDeleteElement(bw, xmlWriter, rows_U, oldPrimaryColValues, 686 encodedCols); 687 } 688 } 689 else if (tracer.type.equals(RepConstants.update_operation)) { 690 ResultSet rsTracer = tracer.rs; 691 makeUpdateEment_ForFilter(bw, noOfColumns, xmlWriter, rows_U, 692 oldPrimaryColValues, rsTracer, 693 primaryColValues, tracer, 694 encodedCols); 695 696 } 697 else { 698 ResultSet rsTracer = tracer.rs; 699 if (rsTracer == null) { 700 rsTracer = rs; 701 tracer.primaryKeyValues = primaryColValues; 702 } 703 if (filterResultSet(rsTracer)) { 704 writeInsertElement(bw, RepConstants.insert_operation, noOfColumns, 705 xmlWriter, rsTracer, 706 rsTracer.getMetaData(), tracer.primaryKeyValues, 707 encodedCols); 708 709 } 710 } 711 712 } 713 714 // Update Operation Only 715 private void writeUpdateElement(Writer bw, int noOfColumns, 716 XMLWriter xmlWriter, 717 ResultSet rs, 718 ResultSetMetaData rsmt, 719 Object[] primaryKeyValues, 720 Object[] newprimaryKeyValues, 721 ResultSet originalResultSet, 722 Tracer tracer 723 , ArrayList encodedCols 724 ) throws Exception { 725 long startTime =System.currentTimeMillis(); 726 bw.write("<operation>"); 727 bw.write(RepConstants.update_operation); 728 if (tracer.primaryKeyValues != null) { 729 xmlWriter.writeRowElementForUpdate(noOfColumns, rs, rsmt, 730 originalResultSet, primaryColNames, 731 tracer.primaryKeyValues, tableName, 732 encodedCols); 733 } 734 else { 735 xmlWriter.writeRowElementForUpdate(noOfColumns, rs, rsmt, 736 originalResultSet, primaryColNames, 737 newprimaryKeyValues, tableName, 738 encodedCols); 739 } 740 741 xmlWriter.writePrimaryKeyElement(primaryColNames, primaryKeyValues,encodedCols); 742 bw.write("</operation>\r\n"); 743 // if(countWriteupdateElement<=5) { 744 // System.out.println(" TIME TAKEN TO WRITE THE UPDATE ELEMENT : " +(System.currentTimeMillis() - startTime)); 745 // countWriteupdateElement++; 746 // } 747 } 748 749 // Updated record is deleted 750 private void writeDeleteElement(Writer bw, XMLWriter xmlWriter, ResultSet rs, 751 Object[] primaryKeyValues, 752 ArrayList encodedCols) throws 753 Exception { 754 long startTime = System.currentTimeMillis(); 755 bw.write("<operation>"); 756 bw.write(RepConstants.delete_operation); 757 xmlWriter.writePrimaryKeyElement(primaryColNames, primaryKeyValues, encodedCols); 758 bw.write("</operation>\r\n"); 759 // if(countWriteDelementelement<=5) { 760 // System.out.println(" TIME TAKEN TO WRITE THE DELETE ELEMENT " +(System.currentTimeMillis() - startTime)); 761 // countWriteDelementelement++; 762 // } 763 } 764 765 // writes a n delete Element in XML file 766 private void makeDeleteElement(Writer bw, int noOfColumns, 767 XMLWriter xmlWriter, ResultSet rows_D, 768 ArrayList encodedCols) throws 769 Exception { 770 Object Uid = rows_D.getObject(RepConstants.shadow_sync_id1); 771 if (viewedIds.containsKey(Uid)) { 772 viewedIds.remove(Uid); 773 return; 774 } 775 776 Object[] primaryColValues = new Object[primaryColNames.length]; 777 for (int i = 0; i < primaryColNames.length; i++) { 778 primaryColValues[i] = rows_D.getObject(primaryColNames[i]); 779 } 780 // primaryColValues = getLastRecordBackwardtraversing(primaryColValues,Uid); 781 // if (primaryColValues!=null&&filterResultSet(rows_D)) { 782 if (filterResultSet(rows_D)) { 783 writeDeleteElement(bw, xmlWriter, rows_D, primaryColValues, encodedCols); 784 } 785 } 786 787 /** 788 * get last unique id from the corresponding the shadow Table 789 * @param shadowTable 790 * @return 791 * @throws java.lang.Exception 792 */ 793 /* private Object getLastUIDFromShadowTable(String shadowTable) throws Exception { 794 StringBuffer query = new StringBuffer(); 795 ResultSet rs = null; 796 try { 797 query.append(" Select max(").append(RepConstants.shadow_sync_id1).append( 798 ")").append(" from ").append(shadowTable); 799 Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 800 rs = pub_sub_Connection.createStatement().executeQuery(query.toString()); 801 boolean flag = rs.next(); 802 Object lastId = rs.getObject(1); 803 return flag ? (lastId == null ? new Long(0) : lastId) : new Long(0); 804 } 805 finally { 806 Statement st = rs.getStatement(); 807 rs.close(); 808 st.close(); 809 } 810 } 811 */ 812 813 private Object getLastUIDFromShadowTable(String shadowTable) throws Exception { 814 Connection pub_sub_Connection = null; 815 StringBuffer query = new StringBuffer(); 816 ResultSet rs = null; 817 boolean flag = false; 818 Object lastId = null; 819 try { 820 //RepConstants.writeMessage_FILE(" pub_sub_Connection ="+pub_sub_Connection); 821 //query.append(" Select max(").append(RepConstants.shadow_sync_id1).append(")").append(" from ").append(shadowTable); 822 /* bjt - for postgres performance */ 823 query.append(" Select ").append(RepConstants.shadow_sync_id1).append(" from ").append(shadowTable).append(" order by ").append(RepConstants.shadow_sync_id1).append(" desc limit 1"); 824 pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 825 rs = pub_sub_Connection.createStatement().executeQuery(query.toString()); 826 //RepConstants.writeMessage_FILE("getLastUIDFromShadowTable rs :: "+rs); 827 flag = rs.next(); 828 if (flag) 829 lastId = rs.getObject(1); 830 } 831 catch (Exception ex) { 832 RepConstants.writeERROR_FILE(ex); 833 } 834 finally { 835 if (rs != null) { 836 Statement st = rs.getStatement(); 837 rs.close(); 838 if (st != null) 839 st.close(); 840 } 841 connectionPool.returnConnection(pub_sub_Connection); 842 } 843 return flag ? (lastId == null ? new Long(0) : lastId) : new Long(0); 844 } 845 846 /** 847 * update the bookmarks table after XML file for synchronisation is ready with the last UniqueID. 848 * @param shadowTable 849 * @param tableName 850 * @param remote_Pub_Sub_Name 851 * @throws java.lang.Exception 852 */ 853 private void updateBookMarkLastSyncId(String shadowTable, String tableName, 854 String remote_Pub_Sub_Name, 855 Object lastId) throws Exception { 856 857 Statement stmt = null; 858 Connection pub_sub_Connection = null; 859 try { 860 // Object lastId = getLastUIDFromShadowTable(shadowTable); 861 String updateQuery = "update " + dbDataTypeHandler.getBookMarkTableName() +" set " + 862 RepConstants.bookmark_lastSyncId4 + "=" + lastId + " where " + 863 RepConstants.bookmark_LocalName1 + " = '" + local_pub_sub_name +"' and " + 864 RepConstants.bookmark_RemoteName2 + " = '" + remote_Pub_Sub_Name +"' and " + 865 RepConstants.bookmark_TableName3 + " = '" + tableName + "' "; 866 pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 867 stmt = pub_sub_Connection.createStatement(); 868 int updateNumber = stmt.executeUpdate(updateQuery); 869 } 870 finally { 871 if (stmt != null) 872 stmt.close(); 873 connectionPool.returnConnection(pub_sub_Connection); 874 } 875 } 876 877 /** 878 * writes the zip file on the other nodes socket. 879 * @param s 880 * @param xmlFilePath 881 * @throws IOException 882 */ 883 private void writeZIPFileOnClientSocket(Socket s, String xmlFilePath) throws IOException { 884 FileInputStream fis = new FileInputStream(xmlFilePath); 885 OutputStream sos = s.getOutputStream(); 886 s.setSendBufferSize(Integer.MAX_VALUE); 887 BufferedOutputStream bos = new BufferedOutputStream(sos); 888 889 byte[] buf = new byte[1024]; 890 int len = 0; 891 int kb = 0; 892 while ( (len = fis.read(buf)) > 0) { 893 bos.write(buf, 0, len); 894 } 895 bos.flush(); 896 bos.close(); 897 sos.close(); 898 fis.close(); 899 } 900 901 // public Object[] createXMLFile(String xmlFileURL, String zipFielURL, 902 // String xmlFileName, String remote_Pub_Sub_Name, 903 // ArrayList subRepTables,String clientServerName, 904 // int noOfTables, boolean DeleteXML, 905 // String local_pub_sub_name,boolean isSchemaSupported,_FileUpload fileUpload) throws RepException { 906 // return createXMLFile(xmlFileURL, zipFielURL, xmlFileName, 907 // remote_Pub_Sub_Name, subRepTables, clientServerName,noOfTables, DeleteXML, local_pub_sub_name,isSchemaSupported,fileUpload); 908 // } 909 910 /** 911 * deletes all records from the shadow tablw having null primary key set corresponding to the actual table. 912 * @throws SQLException 913 */ 914 private void deleteRecordsFromShadowTableWithNullPk() throws SQLException, 915 RepException { 916 StringBuffer query = new StringBuffer(); 917 query.append(" delete from ").append(shadowTable).append(" where "); 918 for (int i = 0; i < primaryColNames.length; i++) { 919 if (i > 0) { 920 query.append(" and "); 921 } 922 query.append(primaryColNames[i]).append(" is null "); 923 } 924 Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 925 Statement st = null; 926 try { 927 st = pub_sub_Connection.createStatement(); 928 st.executeUpdate(query.toString()); 929 } 930 finally { 931 if (st != null) 932 st.close(); 933 connectionPool.returnConnection(pub_sub_Connection); 934 } 935 } 936 937 /** 938 * filters the result set with the filter clause given. 939 * @param rs 940 * @return 941 * @throws SQLException 942 * @throws SQLException 943 */ 944 private boolean filterResultSet(ResultSet rs) throws SQLException, SQLException, RepException { 945 Connection pub_sub_Connection = null; 946 StringBuffer query = new StringBuffer(); 947 if ( (filterClause != null) && (!filterClause.trim().equals(""))) { 948 Object Uid = rs.getObject(RepConstants.shadow_sync_id1); 949 query.append("Select * from ").append(shadowTable).append(" where ") 950 .append(RepConstants.shadow_sync_id1) 951 .append(" = ").append(Uid) 952 .append(" and ").append(RepConstants.shadow_serverName_n) 953 .append(" != '") 954 .append(remoteServerName).append("' and ").append(filterClause); 955 PreparedStatement pstmt = null; 956 ResultSet filteredSet = null; 957 try { 958 pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 959 pstmt = pub_sub_Connection.prepareStatement(query.toString()); 960 if (parameters != null) { 961 for (int i = 0, size = parameters.length; i < size; i++) { 962 pstmt.setString(i + 1, parameters[0]); 963 } 964 } 965 filteredSet = pstmt.executeQuery(); 966 boolean f1 = filteredSet.next(); 967 return f1; 968 } 969 finally { 970 if (filteredSet != null) 971 filteredSet.close(); 972 if (pstmt != null) 973 pstmt.close(); 974 connectionPool.returnConnection(pub_sub_Connection); 975 } 976 } 977 else { 978 return true; 979 } 980 } 981 982 983 // Changed by neeraj sir as stackoverflow was coming 984 // when a single row was updated more than 2500 times(ANZ case) 985 // now getlastrecord not called recursively 986 // but executed in infinite loop 987 988 private boolean getLastRecord(Object[] primaryColValues0, Object UId0, 989 Tracer tracer, String remoteServerName, ArrayList updatedPrimaryKey) throws 990 RepException, SQLException { 991 boolean recordFound = false; 992 Object[] primaryColValues = primaryColValues0; 993 Object[] oldPrimaryColValues = primaryColValues0; 994 Object UId = UId0; 995 String returnOperation = ""; 996 // ArrayList updatedPrimaryKey = getUpdatedPrimaryKey(); /* bjt */ 997 boolean isPrimaryKeyUpdated =true; 998 Object[] UIdTemp=null; 999 while (true) { 1000 primaryPreparedStatement.setObject(1, UId); 1001 for (int i = 0; i < primaryColValues.length; i++) { 1002 primaryPreparedStatement.setObject(i + 2, primaryColValues[i]); 1003 } 1004 1005 ResultSet rs = primaryPreparedStatement.executeQuery(); 1006 // execute prepared statement and check the record in result set 1007 if (!rs.next()) { 1008 rs.close(); 1009 break; 1010 } 1011 else { 1012 String operation = rs.getString(RepConstants.shadow_operation3); 1013 String serverName = rs.getString(RepConstants.shadow_serverName_n); 1014 if (operation.equalsIgnoreCase(RepConstants.delete_operation)) { 1015 // Delete Operation 1016 if (!serverName.equalsIgnoreCase(remoteServerName)) { 1017 Object currentUId = rs.getObject(RepConstants.shadow_sync_id1); 1018 viewedIds.put(currentUId,null); 1019 recordFound = true; 1020 returnOperation = RepConstants.delete_operation; 1021 } 1022 break; 1023 } 1024 else if (operation.equals(RepConstants.update_operation)) { 1025 ResultSet resultSet = getOtherCommonRecord(rs, tracer); 1026 resultSet.next(); 1027 oldPrimaryColValues = primaryColValues; 1028 primaryColValues = new Object[primaryColNames.length]; 1029 for (int i = 0; i < primaryColNames.length; i++) { 1030 primaryColValues[i] = resultSet.getObject(primaryColNames[i]); 1031 } 1032 serverName = resultSet.getString(RepConstants.shadow_serverName_n); 1033 UId = resultSet.getObject(RepConstants.shadow_sync_id1); 1034 if (!serverName.equalsIgnoreCase(remoteServerName)) { 1035 viewedIds.put(UId,null); 1036 } 1037 isPrimaryKeyUpdated = isPrimaryKeyUpdated(primaryColValues,updatedPrimaryKey); 1038 //System.out.println(" isPrimaryKeyUpdated :: "+isPrimaryKeyUpdated); 1039 if(!isPrimaryKeyUpdated) { 1040 UIdTemp = getUIDRecordUpdatedExceptPK(primaryColNames, primaryColValues, oldPrimaryColValues); 1041 if (UIdTemp != null) { 1042 UId = UIdTemp[0]; 1043 viewedIds.put(UId,null); 1044 } 1045 } 1046 recordFound = true; 1047 returnOperation = RepConstants.update_operation; 1048 resultSet.close(); 1049 } 1050 else { 1051 // for insert type operation case produced by yana's table having triggers . 1052 primaryColValues = new Object[primaryColNames.length]; 1053 for (int i = 0; i < primaryColNames.length; i++) { 1054 primaryColValues[i] = rs.getObject(primaryColNames[i]); 1055 } 1056 serverName = rs.getString(RepConstants.shadow_serverName_n); 1057 UId = rs.getObject(RepConstants.shadow_sync_id1); 1058 if (!serverName.equalsIgnoreCase(remoteServerName)) { 1059 viewedIds.put(UId,null); 1060 } 1061 recordFound = true; 1062 returnOperation = RepConstants.insert_operation; 1063 } 1064 } // end if (!rs.next()) 1065 // bjt - close result set 1066 rs.close(); 1067 } // end while (true) 1068 // bjt - move above return to set tracer before returning 1069 if (!recordFound) 1070 return recordFound; 1071 tracer.recordFound = recordFound; 1072 tracer.type = returnOperation; 1073 if (returnOperation.equals(RepConstants.update_operation)) { 1074 tracer.primaryKeyValues = primaryColValues; 1075 if(!isPrimaryKeyUpdated) { 1076 tracer.rs = (ResultSet)UIdTemp[1]; 1077 } else { 1078 String finalResultSetQuery = "select * from " + shadowTable + " where " +RepConstants.shadow_sync_id1 + " = " + UId; 1079 /* bjt - try to make postgres use index */ 1080 //String finalResultSetQuery = "select * from " + shadowTable + " where " +RepConstants.shadow_sync_id1 + " = " + UId.toString(); 1081 ResultSet resultSet = commonStatement.executeQuery(finalResultSetQuery); 1082 resultSet.next(); 1083 tracer.rs = resultSet; 1084 // bjt - close resultSet 1085 //resultSet.close(); 1086 } 1087 } 1088 return recordFound; 1089 } 1090 1091 /** 1092 * This method is implemented to traverse the record backward for writing 1093 * the delete element in XML file.It traverse the record backward. 1094 * If a record found the status 'B' then record is trvarsed continue up to 1095 * original record that have status 'I'. 1096 */ 1097 private Object[] getLastRecordBackwardtraversing(Object[] primaryColValues0, Object UId) throws SQLException, RepException { 1098 String serverName =null; 1099 Object[] primaryColValues =new Object[primaryColValues0.length] ; 1100 for (int i = 0; i < primaryColValues0.length; i++) { 1101 primaryColValues[i] =primaryColValues0[i]; 1102 } 1103 while(true) { 1104 primaryPreparedStatementBackwardTraversing.setObject(1, UId); 1105 for (int i = 0; i < primaryColValues.length; i++) { 1106 primaryPreparedStatementBackwardTraversing.setObject(i + 2, primaryColValues[i]); 1107 } 1108 ResultSet rs = primaryPreparedStatementBackwardTraversing.executeQuery(); 1109 boolean flag = rs.next(); 1110 if (!flag) { 1111 rs.close(); 1112 return primaryColValues; 1113 } 1114 String operation = rs.getString(RepConstants.shadow_operation3); 1115 if (operation.equals(RepConstants.insert_operation)) { 1116 if(rs!=null) 1117 rs.close(); 1118 return null; 1119 } 1120 else { 1121 // String operation = rs.getString(RepConstants.shadow_operation3); 1122 if (operation.equalsIgnoreCase(RepConstants.delete_operation)) { // Delete Operation 1123 if(rs!=null) 1124 rs.close(); 1125 return primaryColValues; 1126 } 1127 else { 1128 ResultSet resultSet = getOtherCommonRecordBackwardTraversing(rs); 1129 boolean updatedREcord = resultSet.next(); 1130 if (!updatedREcord) { // other common record not found 1131 throw new RepException("REP051",new Object[] {rs.getObject(RepConstants.shadow_common_id2), shadowTable}); 1132 } 1133 primaryColValues = new Object[primaryColNames.length]; 1134 for (int i = 0; i < primaryColNames.length; i++) { 1135 primaryColValues[i] = resultSet.getObject(primaryColNames[i]); 1136 } 1137 UId= resultSet.getObject(RepConstants.shadow_sync_id1); 1138 serverName = resultSet.getString(RepConstants.shadow_serverName_n); 1139 if (!serverName.equalsIgnoreCase(remoteServerName)) { 1140 viewedIds.put(UId,null); 1141 } 1142 if(resultSet!=null) 1143 resultSet.close(); 1144 } 1145 } 1146 } 1147 } 1148 1149 private ResultSet getOtherCommonRecordBackwardTraversing(ResultSet rs) throws SQLException,RepException { 1150 Object commonId = rs.getObject(RepConstants.shadow_common_id2); 1151 Object UId = rs.getObject(RepConstants.shadow_sync_id1); 1152 commonPreparedStatementForBackwardTraversing.setObject(1, UId); 1153 commonPreparedStatementForBackwardTraversing.setObject(2, commonId); 1154 for (int i = 0; i < primaryColNames.length; i++) { 1155 commonPreparedStatementForBackwardTraversing.setObject(i + 3,rs.getObject("REP_OLD_" +primaryColNames[i])); 1156 } 1157 return commonPreparedStatementForBackwardTraversing.executeQuery(); 1158 } 1159 1160 private PreparedStatement makeCommonPreparedStatementBackwardTraversing() throws 1161 SQLException, RepException { 1162 StringBuffer query = new StringBuffer(); 1163 query.append(" select * from "); 1164 query.append(shadowTable); 1165 query.append(" where "); 1166 query.append(RepConstants.shadow_sync_id1); 1167 query.append(" != "); 1168 query.append(" ? "); 1169 query.append(" and "); 1170 query.append(RepConstants.shadow_common_id2); 1171 query.append(" = "); 1172 query.append(" ? "); 1173 // for matching oldPrimary keys 1174 for (int i = 0; i < primaryColNames.length; i++) { 1175 query.append(" and "); 1176 query.append("REP_OLD_" + primaryColNames[i]); 1177 query.append(" = "); 1178 query.append(" ? "); 1179 } 1180 query.append(" and "); 1181 query.append(RepConstants.shadow_status4); 1182 query.append(" = '"); 1183 query.append(RepConstants.beforeUpdate + "'"); 1184 query.append(" order by ").append(RepConstants.shadow_sync_id1); 1185 Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 1186 return pub_sub_Connection.prepareStatement(query.toString()); 1187 } 1188 1189 /** 1190 * Get the updated priamry key columns values from trackpriamrykey table 1191 */ 1192 private ArrayList getUpdatedPrimaryKey() throws SQLException, RepException { 1193 ResultSet rs=null; 1194 Statement stmt=null; 1195 ArrayList list = new ArrayList(); 1196 try 1197 { 1198 stmt = ( (Connection) connectionPool.getConnection(local_pub_sub_name)).createStatement(); 1199 rs = stmt.executeQuery(makeQueryForUpdatedPriamryKey()); 1200 ResultSetMetaData metaData = rs.getMetaData(); 1201 int columnCount = metaData.getColumnCount(); 1202 while (rs.next()) { 1203 Object[] columnValues = new Object[columnCount]; 1204 for (int i = 1; i <= columnCount; i++) { 1205 columnValues[i - 1] = rs.getObject(i); 1206 } 1207 // System.out.println("UPDATED PRIMARY KEY ::"+Arrays.asList(columnValues)); 1208 list.add(columnValues); 1209 } 1210 } finally { 1211 try { 1212 if (rs != null) { 1213 rs.close(); 1214 } 1215 if(stmt!=null) { 1216 stmt.close(); 1217 } 1218 } 1219 catch (SQLException ex) { 1220 // Ignore Exception 1221 } 1222 } 1223 return list; 1224 } 1225 1226 /** 1227 * It return false if record is not updated else return true 1228 */ 1229 private boolean isPrimaryKeyUpdated(Object[] priamryColValues,ArrayList updatedPrimaryKey) throws SQLException, RepException { 1230 boolean isPrimaryKeyUpdated = false; 1231 if(updatedPrimaryKey.size()==0) { 1232 return isPrimaryKeyUpdated; 1233 } 1234 updatedPkList: for (int i = 0; i < updatedPrimaryKey.size(); i++) { 1235 Object[] updatedPk = (Object[]) updatedPrimaryKey.get(i); 1236 /* bjt fix loop */ 1237 updatedPkFields: for (int j = 0; j < updatedPk.length; j++) { 1238 //if (updatedPk[j].equals(priamryColValues[j])) { 1239 //if (!updatedPk[j].equals(priamryColValues[j])) { 1240 if (!updatedPk[j].toString().equals(priamryColValues[j].toString())) { 1241 //isPrimaryKeyUpdated=true; 1242 continue updatedPkList; /* field does not equal, so look at next record */ 1243 } 1244 } 1245 /* bjt - if we make it here, all the fields are equal, so the records match */ 1246 //if(isPrimaryKeyUpdated) { 1247 /* bjt */ 1248 isPrimaryKeyUpdated = true; /* now set to true */ 1249 //updatedPrimaryKey.remove(i); /* faster if we could remove, but later records might reference this 1250 /* same primary key so we can't remove it from the array. Previous incarnation 1251 * refreshed this array for every record considered, so removing was no issue. 1252 * refreshing the array for every record seems like a gigantic waste of time, so 1253 * we don't clear this array once we've seen a record, b/c we might see it later - bjt */ 1254 break; 1255 //} 1256 } 1257 return isPrimaryKeyUpdated; 1258 } 1259 1260 /* 1261 SELECT * FROM Rep_Shadow_PUSHTABLE WHERE (Rep_sync_id = (SELECT MAX(rep_sync_id) 1262 FROM Rep_Shadow_PUSHTABLE WHERE rollno = 11 AND (rep_old_rollNo = 11) AND rep_Status='A')) 1263 1264 This method is used if same record is updated many times(except primary column i.e primary 1265 column is not updated) This leads to improving performance(As suggested by parveen sir) And 1266 to avoid traversing of the syncIds which couldnt add in the viewId we use 1267 addSyncidToViewIdForSameOldPKEqualsNewPks(String primaryCols[],Object[] primaryColValues, 1268 Object MaxUID) method. 1269 */ 1270 private Object[] getUIDRecordUpdatedExceptPK(String 1271 primaryCols[], Object[] primaryColValues, Object[] oldPrimaryColValues) throws SQLException, RepException { 1272 ResultSet rs = null; 1273 1274 //check if old primary keys and new primary keys are same or not. 1275 // If Yes,then return else coontinue 1276 for (int i = 0; i < primaryColValues.length; i++) { 1277 if (!primaryColValues[i].toString().equals(oldPrimaryColValues[i].toString())) 1278 return null; 1279 } 1280 PSForLastRecordSameRecordUpdatedExceptPK = makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(primaryColNames); 1281 int indexPrimaryColumn = 0; 1282 for (indexPrimaryColumn = 0; indexPrimaryColumn < primaryCols.length;indexPrimaryColumn++) { 1283 PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +1, primaryColValues[indexPrimaryColumn]); 1284 } 1285 for (int j = 0; j < primaryCols.length; j++) { 1286 PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +j + 1, oldPrimaryColValues[j]); 1287 } 1288 rs = PSForLastRecordSameRecordUpdatedExceptPK.executeQuery(); 1289 rs.next(); 1290 Object Uid = rs.getObject(1); 1291 //Add the SyncIds to viewIds Which we have considered indirectly through above Query 1292 addSyncidToViewIdForSameOldPKEqualsNewPks(primaryColValues, Uid); 1293 return new Object[]{ Uid,rs}; 1294 } 1295 1296 /* 1297 Query to get syncIds : 1298 " SELECT REP_SYNC_ID FROM SHADOW_TABLE WHERE 1299 primarycolumns=rep_old_primarycolumns and pk=?; " 1300 SyncID which we get through this query if not 1301 already present in viewIDs and less than the MaxUId 1302 then add into viewID. MaxUId is the SyncId upto which 1303 we have considered shadow table record for that particular 1304 primary key.We require to add id into view becasue all of 1305 1306 */ 1307 1308 private void addSyncidToViewIdForSameOldPKEqualsNewPks(Object[] 1309 primaryColValues, Object MaxUID) throws SQLException, RepException { 1310 ResultSet rs = null; 1311 try { 1312 PSToGetSyncidForSameOldPKEqualsNewPks = makeQueryToGetViewId(MaxUID); 1313 for (int i = 0; i < primaryColValues.length; i++) 1314 PSToGetSyncidForSameOldPKEqualsNewPks.setObject(i + 1,primaryColValues[i]); 1315 rs = PSToGetSyncidForSameOldPKEqualsNewPks.executeQuery(); 1316 while (rs.next()) { 1317 Object Uid = rs.getObject(1); 1318 if (!viewedIds.containsKey(Uid)) { 1319 // && ( (Number) Uid).longValue() < ( (Number) MaxUID).longValue()) { 1320 viewedIds.put(Uid, null); 1321 } 1322 } 1323 } 1324 finally { 1325 try { 1326 if (rs != null) { 1327 rs.close(); 1328 } 1329 if (PSToGetSyncidForSameOldPKEqualsNewPks != null) 1330 PSToGetSyncidForSameOldPKEqualsNewPks.close(); 1331 } 1332 catch (SQLException ex2) { 1333 //ignore SQLException 1334 } 1335 } 1336 } 1337 1338 private PreparedStatement makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(String[] 1339 primaryColsName) throws SQLException, RepException { 1340 StringBuffer query = new StringBuffer(); 1341 query.append("SELECT * ") /*.append(RepConstants.shadow_sync_id1).*/. 1342 append(" FROM ").append(shadowTable).append(" WHERE ( ") 1343 .append(RepConstants.shadow_sync_id1) 1344 .append(" = (SELECT MAX(").append(RepConstants.shadow_sync_id1) 1345 .append(") FROM ") 1346 .append(shadowTable).append(" WHERE "); 1347 for (int i = 0; i < primaryColsName.length; i++) { 1348 if (i != 0) { 1349 query.append(" AND "); 1350 } 1351 query.append(primaryColsName[i]).append("= ? "); 1352 } 1353 query.append(" AND "); 1354 for (int i = 0; i < primaryColsName.length; i++) { 1355 if (i != 0) { 1356 query.append(" AND "); 1357 } 1358 query.append(" REP_OLD_").append(primaryColsName[i]).append("= ? "); 1359 } 1360 query.append(" AND ").append(RepConstants.shadow_status4) 1361 .append(" = '").append(RepConstants.afterUpdate).append(" ')) "); 1362 Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 1363 //System.out.println("Get UID query.toString() :: "+query.toString()); 1364 return pub_sub_Connection.prepareStatement(query.toString()); 1365 } 1366 1367 private PreparedStatement makeQueryToGetViewId(Object MaxUID) throws SQLException, RepException { 1368 StringBuffer query = new StringBuffer(); 1369 query.append("SELECT ").append(RepConstants.shadow_sync_id1) 1370 .append(" FROM ").append(shadowTable).append(" WHERE "); 1371 for (int i = 0; i < primaryColNames.length; i++) { 1372 if (i != 0) 1373 query.append(" AND "); 1374 query.append(primaryColNames[i]).append("= ").append(" REP_OLD_") 1375 .append(primaryColNames[i]); 1376 } 1377 query.append(" AND "); 1378 for (int i = 0; i < primaryColNames.length; i++) { 1379 if (i != 0) 1380 query.append(" AND "); 1381 query.append(primaryColNames[i]).append("= ?"); 1382 } 1383 query.append(" AND ") 1384 .append(RepConstants.shadow_sync_id1) 1385 .append(" < ") 1386 .append( ( (Number) MaxUID).longValue()); 1387 Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name); 1388 //System.out.println(" makeQuery To get View ID :: "+query.toString().toUpperCase()); 1389 return pub_sub_Connection.prepareStatement(query.toString()); 1390 } 1391 1392 private String makeQueryForUpdatedPriamryKey() { 1393 StringBuffer query = new StringBuffer(); 1394 query.append("SELECT "); 1395 for (int i = 0; i < primaryColNames.length; i++) { 1396 if (i != 0) 1397 query.append(","); 1398 query.append(primaryColNames[i]); 1399 } 1400 query.append(" FROM ") 1401 .append(dbDataTypeHandler.getShadowTableName(tableName)) 1402 .append(" where ").append(RepConstants.shadow_PK_Changed) 1403 .append(" = 'Y'"); 1404 //System.out.println(" MAKE QUERY FOR UPDATED PRIMARY KEY :: "+query.toString()); 1405 return query.toString(); 1406 } 1407 1408 private String makeQueryToSelectRecordsMarkedDeleted(long lastId) { 1409 StringBuffer query =new StringBuffer(); 1410 query.append( "Select * from ") 1411 .append(shadowTable) 1412 .append(" where ") 1413 .append(RepConstants.shadow_sync_id1) 1414 .append(" > ") 1415 .append(lastId) 1416 .append(" and ") 1417 .append(RepConstants.shadow_operation3) 1418 .append(" ='D' order by ") 1419 .append(RepConstants.shadow_sync_id1); 1420 return query.toString(); 1421 } 1422 1423 1424 /** 1425 * used for debugging. shows values stored in a resultSet. 1426 * @param rs 1427 * @throws SQLException 1428 */ 1429 // public static void showResultSet(ResultSet rs) throws SQLException { 1430 // ResultSetMetaData metaData = rs.getMetaData(); 1431 // int columnCount = metaData.getColumnCount(); 1432 // Object[] displayColumn = new Object[columnCount]; 1433 // for (int i = 1; i <= columnCount; i++) 1434 // displayColumn[i - 1] = metaData.getColumnName(i); 1435 // System.out.println(Arrays.asList(displayColumn)); 1436 // while (rs.next()) { 1437 // Object[] columnValues = new Object[columnCount]; 1438 // for (int i = 1; i <= columnCount; i++) 1439 // columnValues[i - 1] = rs.getObject(i); 1440 // System.out.println(Arrays.asList(columnValues)); 1441 // } 1442 // } 1443 1444 1445 } 1446

