001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 package org.dbreplicator.replication.xml; 021 022 import java.sql.*; 023 import java.util.*; 024 025 import org.xml.sax.*; 026 import org.xml.sax.helpers.*; 027 import org.dbreplicator.replication.*; 028 import org.dbreplicator.replication.DBHandler.*; 029 import org.dbreplicator.replication.column.*; 030 import org.apache.log4j.Logger; 031 032 /** 033 * This class is the implementation class for ContentHandler for parsing the 034 * snapshot XML file. This class implements differnt event methods which automatically 035 * are called by the parser. It implements these methods to get and use the different 036 * values stored on the XML file. 037 * These methods helps at the time of taking snapshot, as when some information is 038 * found by the parser these methods stores the relative value and perform different 039 * operations like inserting, updating and deleting records from respective tables. 040 */ 041 042 public class SnapshotHandler 043 extends DefaultHandler { 044 XMLElement currentElement; 045 Connection subConnection; 046 Statement statement; 047 Subscription subscription; 048 TreeMap treeMap; 049 PreparedStatement preparedStatement, psForUpdateBookMarksTable; 050 String remoteServerName,subName, pubName; 051 AbstractDataBaseHandler dbHandler; 052 boolean setAutoCommitFlag = true; 053 protected static Logger log = Logger.getLogger(SnapshotHandler.class.getName()); 054 private boolean isFirstPass, isCurrentTableCyclic; 055 private RepTable currentRepTable; 056 private XMLElement tableElement; 057 private TreeMap allColumnsMap; 058 int batchCntr = 0; int batchMax = 0; 059 060 /** 061 * Default Handler for parsing and reading the contents from XML file 062 * for getting Snapshot 063 * @param subConnection0 064 * @param subscription0 065 * @throws SQLException 066 */ 067 public SnapshotHandler(boolean isFirstPass0, Connection subConnection0, 068 Subscription subscription0, 069 AbstractDataBaseHandler dbHandler0, 070 String remoteServerName0) throws SQLException { 071 currentElement = new XMLElement("root"); 072 subConnection = subConnection0; 073 statement = subConnection.createStatement(); 074 subscription = subscription0; 075 dbHandler = dbHandler0; 076 remoteServerName = remoteServerName0; 077 isFirstPass = isFirstPass0; 078 //bjt 079 DatabaseMetaData dbmd = subConnection.getMetaData(); 080 //batchMax = (dbmd.supportsBatchUpdates() ? 10000 : 0); 081 batchMax = 0; 082 log.debug("Batch Max = " + batchMax); 083 if (batchMax > 0) { 084 log.debug("Using statement pooling in batches of " + batchMax); 085 } else { 086 log.debug("NOT Using batch updates, database does not seem to support it"); 087 } 088 } 089 090 /** 091 * Initilazing an XML element and adding its childs. 092 * @param namespace 093 * @param localname 094 * @param qname 095 * @param atts 096 * @throws SAXException 097 */ 098 public void startElement(String namespace, String localname, String qname, Attributes atts) throws SAXException { 099 try { 100 XMLElement childElement = null; 101 102 /* 103 Below given if condition and structure of XML file has been changed 104 after merging of branch "Vinesreplication". 105 A new element Operation is added to make the 106 common element of row and primary key element. 107 Note : - to get the copy of old source check out 108 it from branch replicator1_9 109 */ 110 111 if ( (currentElement == null || 112 !currentElement.elementName.equals("operation")) && 113 qname.equals("tableName")) { 114 115 /** 116 * In place of "XMLElement" class "XMLTableElement" 117 * class is used to control the outofmemory error. 118 * If a table have records in lakh then outofmemoryerror 119 * occurs because in "XMLElement" class one by one all the 120 * element are added in elementlist. 121 */ 122 childElement = new XMLTableElement(qname); 123 // childElement = new XMLElement(qname); 124 tableElement = childElement; 125 } 126 else { 127 if (tableElement != null) { 128 createStatmement(); 129 } 130 childElement = new XMLElement(qname); 131 tableElement = null; 132 } 133 currentElement.addChild(childElement); 134 childElement.setParentElement(currentElement); 135 childElement.addAtt(atts.getValue("name")); 136 137 //add Encode 138 childElement.addEncodeAtt(atts.getValue("Encode")); 139 currentElement = childElement; 140 } 141 catch (NullPointerException ex) { 142 RepConstants.writeERROR_FILE(ex); 143 throw new SAXException(ex); 144 } 145 catch (Exception ex) { 146 RepConstants.writeERROR_FILE(ex); 147 throw new SAXException(ex); 148 } 149 } 150 151 /** 152 * Called after end of an element is reached for calling delete and then insert on the respective tables on client side. 153 * @param namespace 154 * @param localname 155 * @param qname 156 * @throws SAXException 157 */ 158 public void endElement(String namespace, String localname, String qname) throws SAXException { 159 try { 160 currentElement.checkEncoding(); 161 XMLElement parentElement = currentElement.getParentElement(); 162 //log.debug(" parentElement "+parentElement); 163 //log.debug("qname::::"+qname); 164 /* 165 Below given if condition and structure of XML file has been changed 166 after merging of branch "Vinesreplication". 167 A new element Operation is added to make the 168 common element of row and primary key element. 169 Note : - to get the copy of old source check out 170 it from branch replicator1_9 171 */ 172 if ( (parentElement == null || 173 parentElement.elementName.equals("tableName")) && 174 qname.equals("operation")) { 175 176 // currentElement = parentElement; 177 178 createQuery(); 179 currentElement.elementList.clear(); 180 } 181 if ( (parentElement == null || 182 !parentElement.elementName.equals("row") || 183 !qname.equals("primary")) && 184 qname.equals("tableName")) { 185 try { 186 String tableNaam = currentElement.elementValue; 187 // String shadowTableName = RepConstants.shadow_Table(tableNaam); 188 String shadowTableName = dbHandler.getShadowTableName(tableNaam); 189 // delete all entries from Shadow Table as snapshot of table is taken by now 190 StringBuffer query = new StringBuffer(); 191 query.append("SELECT MAX(").append(RepConstants.shadow_sync_id1).append(") from ").append(shadowTableName); 192 ResultSet rs = statement.executeQuery(query.toString()); 193 rs.next(); 194 long lastSyncId = rs.getLong(1); 195 rs.close(); 196 197 query = new StringBuffer(); 198 query.append("delete from ").append(shadowTableName).append(" where ") 199 .append(RepConstants.shadow_sync_id1).append(" != ").append(lastSyncId); 200 statement.execute(query.toString()); 201 202 String updateServerNameQuery = "update " + shadowTableName + " set " + 203 RepConstants.shadow_serverName_n + " = '" + remoteServerName +"'"; 204 statement.execute(updateServerNameQuery); 205 206 // updating records Lastsyncid and Concidered Id on BookMarks Table 207 if (psForUpdateBookMarksTable == null) { 208 psForUpdateBookMarksTable = makeUpdateBookMarksTable(); 209 } 210 psForUpdateBookMarksTable.setLong(1, lastSyncId); 211 psForUpdateBookMarksTable.setLong(2, lastSyncId); 212 psForUpdateBookMarksTable.setString(3, subName); 213 psForUpdateBookMarksTable.setString(4, pubName); 214 psForUpdateBookMarksTable.setString(5, tableNaam); 215 int count = psForUpdateBookMarksTable.executeUpdate(); 216 currentElement.elementList.clear(); 217 218 // inserting and deleting records from log Table.. 219 insert_dummyRecordInLogTable(); 220 deleteRecordsFromSuperLogTable(tableNaam); 221 } 222 catch (Exception ex) { 223 RepConstants.writeERROR_FILE(ex); 224 throw new SAXException(ex.getMessage(), ex); 225 } 226 if (preparedStatement != null) { 227 try { 228 if (batchMax > 0) { 229 log.debug("Sending last " + batchCntr + " records to database with batch processing"); 230 preparedStatement.executeBatch(); 231 setAutocomitTrueAndCommitRecord(); 232 preparedStatement.clearBatch(); 233 log.debug("Batch submitted"); 234 batchCntr = 0; 235 } 236 preparedStatement.close(); 237 } 238 catch (SQLException ex2) { 239 } 240 } 241 } 242 if (parentElement != null) { 243 currentElement = parentElement; 244 } 245 } 246 catch (ClassCastException ex) { 247 RepConstants.writeERROR_FILE(ex); 248 throw new SAXException(ex.getMessage(), ex); 249 } 250 catch (NullPointerException ex1) { 251 ex1.printStackTrace(); 252 RepConstants.writeERROR_FILE(ex1); 253 throw new SAXException(ex1); 254 } 255 catch (Exception ex1) { 256 ex1.printStackTrace(); 257 RepConstants.writeERROR_FILE(ex1); 258 throw new SAXException(ex1); 259 } 260 } 261 262 /** 263 * getting the value for XML element. 264 * @param ch 265 * @param start 266 * @param len 267 * @throws SAXException 268 */ 269 public void characters(char[] ch, int start, int len) throws SAXException { 270 try { 271 String elementValue = new String(ch, start, len); 272 if (elementValue.equalsIgnoreCase("") || 273 elementValue.equalsIgnoreCase("\n")) { 274 return; 275 } 276 currentElement.setElementValue(elementValue); 277 } 278 catch (NullPointerException ex1) { 279 throw new SAXException(ex1.getMessage(), ex1); 280 } 281 } 282 283 private void createStatmement() throws SAXException { 284 try { 285 String elementValue = currentElement.elementValue; 286 XMLElement parentElement = currentElement.getParentElement(); 287 if ( (parentElement == null || 288 !parentElement.elementName.equals("row")) && 289 currentElement.elementName.equals("tableName")) { 290 RepTable repTable = subscription.getRepTable(elementValue); 291 isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES); 292 if (isFirstPass) { 293 // statement.execute("DELETE FROM " + elementValue); 294 // ResultSet rs = statement.getResultSet(); 295 treeMap = repTable.getColumnTreeMap(subConnection,subscription.getDBDataTypeHandler()); 296 String preparedQuery = repTable.createInsertQueryForSnapShot(); 297 preparedStatement = subConnection.prepareStatement(preparedQuery); 298 //log.debug(preparedQuery); 299 } 300 else { 301 if (isCurrentTableCyclic) { 302 treeMap = repTable.getColumnTreeMap(subConnection,subscription.getDBDataTypeHandler()); 303 String preparedQuery = repTable.createUpdateQueryForSnapShot(); 304 preparedStatement = subConnection.prepareStatement(preparedQuery); 305 } 306 } 307 currentRepTable = repTable; 308 allColumnsMap=currentRepTable.getAllColumns(); 309 } 310 } 311 catch (ClassCastException ex) { 312 RepConstants.writeERROR_FILE(ex); 313 throw new SAXException(ex.getMessage(), ex); 314 } 315 catch (NullPointerException ex1) { 316 ex1.printStackTrace(); 317 log.error(ex1.getMessage(), ex1); 318 throw new SAXException(ex1.getMessage(), ex1); 319 } 320 catch (Exception ex) { 321 RepConstants.writeERROR_FILE(ex); 322 throw new SAXException(ex.getMessage(), ex); 323 } 324 325 } 326 327 /** 328 * creating and firing Insert Query for client subscribed tables. 329 * @throws SAXException 330 */ 331 public void createQuery() throws SAXException, SQLException { 332 333 try { 334 ArrayList elements = currentElement.getChildElements(); 335 // XMLElement parentElement =currentElement.getParentElement(); 336 // ArrayList elements = parentElement.getChildElements(); 337 ArrayList InsertElements = ( (XMLElement) elements.get(0)).getChildElements(); 338 // ArrayList InsertElements = elements; 339 //log.debug(" Is First pass : " + isFirstPass); 340 341 if (isFirstPass) { 342 int j = 0; 343 for (int i = 0; i < InsertElements.size(); i++) { 344 XMLElement element = (XMLElement) InsertElements.get(i); 345 String columnName = element.elementName; 346 columnName = (String) allColumnsMap.get(columnName); 347 //log.debug(" allColumnsMap : "+allColumnsMap); 348 //String value = element.elementValue; 349 //log.debug(" XML Element : " + element); 350 //log.debug(" columnName : " + columnName); 351 //log.debug(" value : " + value); 352 if (currentRepTable.isIgnoredColumn(columnName)) { 353 //log.debug("isIgnoredColumn: " + columnName); 354 continue; 355 } 356 //log.debug(" isCurrentTableCyclic : " + isCurrentTableCyclic); 357 //log.debug(columnName + " isForeignKeyColumn : " +currentRepTable.isForeignKeyColumn(columnName)); 358 if (isCurrentTableCyclic && currentRepTable.isForeignKeyColumn(columnName)) { 359 //log.debug(" Is isCurrentTableCyclic : YES"); 360 //log.debug(" currentRepTable.isForeignKeyColumn(columnName) : "+currentRepTable.isForeignKeyColumn(columnName)); 361 AbstractColumnObject columnObject = (AbstractColumnObject) treeMap.get(columnName); 362 //once setAutoCommitFlag is set to false,we shouldn't change it to true 363 // by checking for other columns for that 'if' check is used 364 checkAutocommit(columnObject); 365 columnObject.setColumnObject(preparedStatement, "NULL", j + 1); 366 //log.debug("setting null to " + columnName); 367 } 368 else { 369 AbstractColumnObject columnObject = (AbstractColumnObject) treeMap.get(columnName); 370 //once setAutoCommitFlag is set to false,we shouldn't change it to true 371 // by checking for other columns for that 'if' check is used 372 checkAutocommit(columnObject); 373 columnObject.setColumnObject(preparedStatement, element, j + 1); 374 //log.debug("setting " + columnName + " to " + element.elementValue); 375 } 376 j++; 377 } 378 379 try { 380 if (batchMax > 0) { 381 if (batchCntr < batchMax) { 382 preparedStatement.addBatch(); 383 batchCntr++; 384 } else { 385 log.debug("Sending next " + batchCntr + " records to database with batch processing"); 386 preparedStatement.executeBatch(); 387 setAutocomitTrueAndCommitRecord(); 388 preparedStatement.clearBatch(); 389 log.debug("Batch submitted"); 390 batchCntr = 0; 391 } 392 } else { 393 preparedStatement.execute(); 394 } 395 } 396 catch (SQLException ex1) { 397 ex1.printStackTrace(); 398 } 399 } 400 else { 401 //log.debug("IS CURRENT TABLE CYCLIC "+isCurrentTableCyclic); 402 if (isCurrentTableCyclic) { 403 int columnIndex = 0; 404 String[] foreignKeyCols = currentRepTable.getForeignKeyCols(); 405 for (int j = 0, size = foreignKeyCols.length; j < size; j++) { 406 for (int i = 0; i < InsertElements.size(); i++) { 407 XMLElement element = (XMLElement) InsertElements.get(i); 408 String ColumnName = element.elementName; 409 ColumnName = (String) allColumnsMap.get(ColumnName); 410 //log.debug("Second Pass " + ColumnName + " isForeignKeyColumn : " +currentRepTable.isForeignKeyColumn(ColumnName)); 411 if (ColumnName.equalsIgnoreCase(foreignKeyCols[j])) { 412 //String value = element.elementValue; 413 ( (AbstractColumnObject) treeMap.get(ColumnName)).setColumnObject(preparedStatement, element, columnIndex + 1); 414 //log.debug("Second Pass ColumnName::" + ColumnName + ":" + value); 415 //log.debug("Second Pass columnName : " + ColumnName); 416 //log.debug("Second Pass value : " + value); 417 break; 418 } 419 //log.debug("Second Pass XML Element : " + element); 420 } 421 columnIndex++; 422 } 423 // set object for where cluase 424 425 ArrayList primaryKeyElements = ( (XMLElement) elements.get(1)).getChildElements(); 426 //log.debug("Create Query Primary Key Element : "+primaryKeyElements); 427 String[] primaryKeyValues = new String[primaryKeyElements.size()]; 428 for (int i = 0; i < primaryKeyElements.size(); i++) { 429 String ColumnName = ( ( (XMLElement) primaryKeyElements.get(i)).getAttribute()); 430 //log.debug("Create Query Coulmns : "+ColumnName); 431 XMLElement prKeyValuesElement = (XMLElement) primaryKeyElements.get(i); 432 //log.debug("prKeyValuesElement.elementValue SnapshotHandler.createQuery() "+prKeyValuesElement.elementValue); 433 primaryKeyValues[i] = prKeyValuesElement.elementValue; 434 //log.debug("treeMap::::"+treeMap); 435 ( (AbstractColumnObject) treeMap.get(ColumnName)).setColumnObject(preparedStatement, prKeyValuesElement, columnIndex + 1); 436 columnIndex++; 437 } 438 preparedStatement.execute(); 439 } 440 } 441 } 442 catch (SQLException ex) { 443 //exception is dumped in case when parent table is not included in the publisher 444 /* RepConstants.writeERROR_FILE(ex); 445 try { 446 if (!dbHandler.getPrimaryKeyErrorCode(ex)) { 447 throw new SAXException(ex.getMessage(), ex); 448 } 449 } 450 catch (SQLException ex1) { 451 RepConstants.writeERROR_FILE(ex1); 452 } */ 453 } 454 finally { 455 if (batchMax != 0) { 456 setAutocomitTrueAndCommitRecord(); 457 } 458 } 459 } 460 461 private PreparedStatement makeUpdateBookMarksTable() throws Exception { 462 StringBuffer query = new StringBuffer(); 463 query.append(" UPDATE ").append(dbHandler.getBookMarkTableName()).append(" set ") 464 .append(RepConstants.bookmark_ConisderedId5) 465 .append(" = ? , ").append(RepConstants.bookmark_lastSyncId4).append(" = ? where ( ") 466 .append(RepConstants.bookmark_LocalName1).append(" = ? and ") 467 .append(RepConstants.bookmark_RemoteName2).append(" = ? and ") 468 .append(RepConstants.bookmark_TableName3).append(" = ? ) "); 469 return subConnection.prepareStatement(query.toString()); 470 } 471 472 public void setPubName(String pubName0) { 473 pubName = pubName0; 474 } 475 476 public void setSubName(String subName0) { 477 subName = subName0; 478 } 479 480 /** 481 * deletes records from super log table 482 * @param tableName 483 * @throws java.lang.Exception 484 */ 485 private void deleteRecordsFromSuperLogTable(String tableName) throws Exception { 486 Statement stmt = subConnection.createStatement(); 487 try { 488 // deleting all records from super log table where tableName is passed one 489 // or for whihc xml file has been written. 490 StringBuffer query = new StringBuffer(); 491 query.append("delete from ").append(dbHandler.getLogTableName()).append(" where ") 492 .append(RepConstants.logTable_tableName2).append(" = '") 493 .append(tableName).append("'"); 494 stmt.executeUpdate(query.toString()); 495 } 496 finally { 497 if (stmt != null) 498 stmt.close(); 499 } 500 } 501 502 /** 503 * insert a dummy record in super log table for getting a unique id after snapshot for inserting records in shadow Table. 504 * @throws SQLException 505 */ 506 private void insert_dummyRecordInLogTable() throws SQLException { 507 Statement stmt = subConnection.createStatement(); 508 StringBuffer query = new StringBuffer(); 509 query.append("insert into ").append(dbHandler.getLogTableName()).append(" (") 510 .append(RepConstants.logTable_tableName2).append(") values ('$$$$$$')"); 511 stmt.execute(query.toString()); 512 query = new StringBuffer(); 513 query.append("Select max(").append(RepConstants.logTable_commonId1).append(") from ") 514 .append(dbHandler.getLogTableName()); 515 ResultSet rs = stmt.executeQuery(query.toString()); 516 rs.next(); 517 long maxCID = rs.getLong(1); 518 rs.close(); 519 query = new StringBuffer(); 520 query.append("delete from ").append(dbHandler.getLogTableName()) 521 .append(" where ").append(RepConstants.logTable_tableName2).append(" = '") 522 .append("$$$$$$").append("' and ").append(RepConstants.logTable_commonId1) 523 .append(" != ").append(maxCID); 524 stmt.executeUpdate(query.toString()); 525 } 526 527 public void closeAllStatementAndResultset() { 528 try { 529 if (statement != null) 530 statement.close(); 531 } 532 catch (SQLException ex) { 533 } 534 } 535 536 /** 537 * This has been implemented to handle the problem 538 * related to CLOB and BLOB data type. Postgre 539 * do not to insert LOB object in autocommit mode. 540 * @param <any> Abs 541 * @return boolean 542 */ 543 private void checkAutocommit(AbstractColumnObject aco) throws SQLException { 544 if (setAutoCommitFlag) { 545 if (dbHandler instanceof PostgreSQLHandler && 546 (aco instanceof ClobObject || aco instanceof BlobObject)) { 547 setAutoCommitFlag = false; 548 subConnection.setAutoCommit(false); 549 } 550 } 551 552 } 553 554 private void setAutocomitTrueAndCommitRecord() throws SQLException { 555 if (setAutoCommitFlag == false) { 556 subConnection.commit(); 557 subConnection.setAutoCommit(true); 558 setAutoCommitFlag = true; 559 } 560 } 561 562 }

