001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 package org.dbreplicator.replication; 021 022 import java.io.*; 023 import java.net.*; 024 import java.net.UnknownHostException; 025 import java.rmi.*; 026 import java.rmi.server.*; 027 import java.sql.*; 028 import java.util.*; 029 import javax.xml.parsers.*; 030 031 import org.xml.sax.*; 032 import org.xml.sax.ContentHandler; 033 import org.dbreplicator.replication.DBHandler.*; 034 import org.dbreplicator.replication.xml.*; 035 import org.dbreplicator.replication.zip.ZipHandler; 036 import org.dbreplicator.replication.xml.SnapshotHandler; 037 import org.dbreplicator.replication.xml.DDLHandler; 038 import org.apache.log4j.Logger; 039 import org.dbreplicator.replication.synchronize.AbstractSynchronize; 040 import java.sql.ResultSet; 041 import java.math.BigDecimal; 042 043 /** 044 * The Subscription class holds all the methods which are required for the physical 045 * creation of the subscription(i.e system tables) and which are used at the time 046 * of synchronization at the subscriber's end. It implements three interfaces, 047 * <p> 048 * _subscription : It makes this class to implement all the methods which are 049 * called by the user, at the time of subscribig, and for different tasks 050 * related to synchronization. 051 * <p> 052 * _SubImpl : It makes this class to implement the method getServerName which 053 * is called remotely by the publisher to get the local server name at the time 054 * of synchronization. 055 * <p> 056 * _Replicator : It makes this class to implement all the methods which are called 057 * at the time of synchronization to get the subscriber's information. 058 * 059 */ 060 061 public class Subscription 062 extends UnicastRemoteObject 063 implements _SubImpl, _Subscription, _Replicator { 064 public String subName; 065 private int remotePort; 066 private String remoteUrl; 067 private ConnectionPool connectionPool; 068 private ArrayList subRepTables; 069 private String conflictResolver; 070 private String pubName; 071 private _ReplicationServerImpl remoteServer; 072 protected static Logger log = Logger.getLogger(Subscription.class.getName()); 073 private ArrayList alterTableAddFKQueries; 074 075 // To write the file on client socket. 076 private _FileUpload fileUpload; 077 078 public AbstractDataBaseHandler dbHandler; 079 080 public void resetRemoteServer() { 081 remoteServer = null; 082 } 083 SyncXMLCreator syncXMLCreator; 084 private ReplicationServer localServer; 085 HashMap syncIdMap; 086 087 public Subscription() throws RemoteException { 088 } 089 090 public Subscription(ConnectionPool connectionPool0, String subName0, 091 String serverName0, ReplicationServer localServer0) throws 092 RemoteException, 093 RepException { 094 subName = subName0; 095 connectionPool = connectionPool0; 096 // make sure whether you need this connection statement or not 097 Connection subConnection = connectionPool.getConnection(subName); 098 dbHandler = Utility.getDatabaseHandler(connectionPool, subName); 099 connectionPool.returnConnection(subConnection); 100 dbHandler.setLocalServerName(serverName0); 101 dbHandler.setLocalServerName(serverName0); 102 syncXMLCreator = new SyncXMLCreator(subName, connectionPool, dbHandler); 103 localServer = localServer0; 104 fileUpload = new FileUpload(); 105 106 107 } 108 109 /** 110 * This method is called after parsing to set the repTables with the structures 111 * same as written on XML file. 112 * 113 * @param tableNames0 114 */ 115 116 public void setSubscriptionTables(ArrayList tableNames0) { 117 subRepTables = tableNames0; 118 } 119 120 /* todo Added by hisar team. */ 121 122 public void setAlterTableAddFKStatements(ArrayList alterTableAddFKQueries0) { 123 alterTableAddFKQueries = alterTableAddFKQueries0; 124 } 125 126 /** 127 * This method is called at the time of creating the subscriber. This method 128 * sets the url(i p address.) of the publsher's m/c for the subscriber, i.e 129 * on which m/c the publisher exists. And on which m/c the subscriber will 130 * look for this publisher. 131 * 132 * @param remoteUrl0 133 * @throws RepException 134 */ 135 136 public void setRemoteServerUrl(String remoteUrl0) throws RepException { 137 if (remoteUrl0.equalsIgnoreCase("localhost")) { 138 throw new RepException("REP004", null); 139 } 140 remoteUrl = remoteUrl0; 141 } 142 143 /** 144 * This method is called at the time of creating the subscriber. This method 145 * sets the port no. of the publsher for the subscriber, i.e on which port 146 * the publisher is running. And on which port the subscriber will look 147 * for the replication server of the publisher. 148 * 149 * @param remotePort0 150 */ 151 152 public void setRemoteServerPortNo(int remotePort0) { 153 remotePort = remotePort0; 154 } 155 156 public String getRemoteServerUrl() { 157 return remoteUrl; 158 } 159 160 public int getRemoteServerPortNo() { 161 return remotePort; 162 } 163 164 public ConnectionPool getConnectionPool() { 165 return connectionPool; 166 } 167 168 public void setPublicatonName(String pubName0) { 169 pubName = pubName0; 170 } 171 172 /** 173 * This method is responsible for the physical creation of the subscriber. 174 * This method creates replication system tables for the subscription at the 175 * subscriber's end. For getting the publication's replication tables' 176 * structures, it looks up the remote replication server, gets the object 177 * of the publication's class and then calls remote methods which 178 * create the XML file on the publisher's end. This file holds the queries 179 * needed to create the same structures on the client/subscriber system. Then this file 180 * is transferred over the client socket, parsed and replication 181 * tables are generated according to its specifications. Then the proper triggers are created, 182 * and subscription data is saved. 183 * 184 * @throws RepException 185 */ 186 187 public void subscribe() throws RepException { 188 // Create an Instance of DBHandler With Respect to Client Connection 189 Connection connection = null; 190 Statement stmt = null; 191 try { 192 //Creates Subscription Table 193 //Creates BookMark Table 194 //Creates Super Log Table 195 //Creates Rep Table 196 connection = connectionPool.getConnection(subName); 197 stmt = connection.createStatement(); 198 dbHandler.createClientSystemTables(subName); 199 HashMap primCols = new HashMap(); 200 _PubImpl pub = getPublication(); 201 int pubVendorType = pub.getPubVendorName(); 202 //Returns [0] array of schemas [1]publicationtablequeries. 203 ArrayList[] schemaTables = getPublicationTableQueries(primCols, pub); 204 dbHandler.createSchemas(pubName, schemaTables[0]); 205 String[] alterTableQueries = alterTableAddFKQueries == null ? null : (String[]) alterTableAddFKQueries.toArray(new String[0]); 206 dbHandler.createSubscribedTablesTriggersAndShadowTables(subName,(String[]) (schemaTables[1]).toArray(new String[0]), alterTableQueries, primCols, pubVendorType, subRepTables); 207 //Do Entry in the Subscription Table , BookMark Table, Rep Table. 208 saveSubscriptionData(dbHandler, connection, stmt); 209 pub.saveSubscriptionData(subName); 210 } 211 catch (RepException ex) { 212 RepConstants.writeERROR_FILE(ex); 213 throw ex; 214 } 215 catch (Exception ex) { 216 RepConstants.writeERROR_FILE(ex); 217 throw new RepException("REP025", new Object[] {ex.getMessage()}); 218 } 219 finally { 220 connectionPool.removeSubPubFromMap(subName); 221 if (stmt != null) 222 try { 223 stmt.close(); 224 } 225 catch (SQLException ex1) { 226 } 227 connectionPool.returnConnection(connection); 228 } 229 } 230 231 /** 232 * Make an entry in the subscriptions table, and insert all the records 233 * corresponding to each replicated table into the reptable and bookmark table. 234 * 235 * @param dbHandler 236 * @throws SQLException 237 * @throws RepException 238 */ 239 240 private void saveSubscriptionData(AbstractDataBaseHandler dbHandler, 241 Connection connection, Statement stmt) throws 242 SQLException, RepException { 243 244 try { 245 246 //Inserts into Subscription table 247 stmt.execute("insert into " + dbHandler.getSubscriptionTableName() +" values ( '" + subName + "', '" 248 + pubName + "' , '" + conflictResolver + "' , '" +dbHandler.getLocalServerName() + "' )"); 249 log.info("Query exceuted insert into " + 250 dbHandler.getSubscriptionTableName() + 251 " values ( '" + subName + "', '" + pubName + "' , '" + 252 conflictResolver + "' , '" + 253 dbHandler.getLocalServerName() + "' )"); 254 //For each table makes an entry into the BookMark Table and Rep Table 255 for (int i = 0, size = subRepTables.size(); i < size; i++) { 256 RepTable repTable = (RepTable) subRepTables.get(i); 257 repTable.setConflictResolver(conflictResolver); 258 StringBuffer sb2 = new StringBuffer(); 259 sb2.append(" Insert into ").append(dbHandler.getBookMarkTableName()) 260 .append(" values ( '").append(subName).append("','") 261 .append(pubName).append("','").append(repTable.getSchemaQualifiedName()) 262 .append("',").append("0,0,'N')"); 263 dbHandler.saveRepTableData(connection, subName, repTable); 264 stmt.execute(sb2.toString()); 265 log.debug(sb2.toString()); 266 } 267 } 268 catch (SQLException ex) { 269 RepConstants.writeERROR_FILE(ex); 270 throw new RepException("REP022", new Object[] {subName, pubName}); 271 } 272 finally { 273 if (stmt != null) 274 stmt.close(); 275 } 276 } 277 278 /** 279 * remove all the records from subscribed tables at subscriber 280 * and update the subscriber side with the latest records from the publisher. 281 * 282 * @throws RepException 283 */ 284 public synchronized void getSnapShot() throws RepException { 285 Connection subConnection = null; 286 Statement subStatment = null; 287 boolean islockedTaken = false,isCurrentTableCyclic = false; 288 _PubImpl publication = null; 289 try { 290 String localAddress = null,remoteServerName=null,remoteAddress=null; 291 try { 292 _ReplicationServerImpl remoteServer = getRemoteReplicationServer(); 293 remoteServerName = remoteServer.getServerName(); 294 publication = remoteServer.getRemotePublication(pubName); 295 publication.checkForLock(subName); 296 islockedTaken = true; 297 localAddress = InetAddress.getLocalHost().getHostAddress(); 298 remoteAddress =remoteServer.getServerName(); 299 publication.createSnapShot(subName,isSchemaSupported(),fileUpload,localAddress ); 300 } 301 catch (RepException ex) { 302 log.error(ex.getMessage(), ex); 303 throw ex; 304 } 305 catch (Exception ex) { 306 RepConstants.writeERROR_FILE(ex); 307 RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()}); 308 rex.setStackTrace(ex.getStackTrace()); 309 throw rex; 310 } 311 try { 312 //unzipping the zip file 313 if(!localAddress.equalsIgnoreCase(remoteAddress)) { 314 ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("snapshot_" + pubName + "_" + subName), 315 PathHandler.getDefaultFilePathForClient("snapshot_" + pubName +"_" + subName)); 316 } 317 } 318 catch (IOException ex) { 319 RepConstants.writeERROR_FILE(ex); 320 RepException rex = new RepException("REP052", new Object[] {subName,ex.getMessage()}); 321 rex.setStackTrace(ex.getStackTrace()); 322 throw rex; 323 } 324 325 try { 326 subConnection = connectionPool.getConnection(subName); 327 subStatment = subConnection.createStatement(); 328 /* bjt - start transaction */ 329 for (int i = 0; i < subRepTables.size(); i++) { 330 RepTable repTable = ( (RepTable) subRepTables.get(i)); 331 isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES); 332 if(isCurrentTableCyclic) 333 break; 334 } 335 SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser(); 336 XMLReader reader = saxParser.getXMLReader(); 337 // Delete records from all tables 338 deleteAllRecordsFromMainTables(subStatment); 339 //Instance for content handler 340 //bjt 341 /* 342 DatabaseMetaData dbmd = subConnection.getMetaData(); 343 int batchMax = (dbmd.supportsBatchUpdates() ? 1000 : 0); 344 log.debug("Batch Max = " + batchMax); 345 if (batchMax > 0) { 346 log.debug("Using statement pooling in batches of " + batchMax); 347 } else { 348 log.debug("NOT Using statement pooling, database does not seem to support it"); 349 } 350 SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName, batchMax); // instance for content hanedler 351 */ 352 SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName); // instance for content hanedler 353 // instance for content hanedler 354 ch.setPubName(pubName); 355 ch.setSubName(subName); 356 reader.setContentHandler(ch); 357 reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName)); 358 ch.closeAllStatementAndResultset(); 359 /** 360 * To handle the cyclic table case referenced 361 * columns value is updated in second pass. 362 */ 363 if(isCurrentTableCyclic){ 364 SnapshotHandler ch1 = new SnapshotHandler(false, subConnection, this,dbHandler, remoteServerName); // instance for content hanedler 365 ch1.setPubName(pubName); 366 ch1.setSubName(subName); 367 reader.setContentHandler(ch1); 368 reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName)); 369 ch1.closeAllStatementAndResultset(); 370 } 371 if (_Subscription.xmlAndShadow_entries) { 372 // deleting xml file 373 deleteFile(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName)); 374 // deleting zip file 375 deleteFile(PathHandler.getDefaultZIPFilePathForClient("snapshot_" + pubName + "_" +subName)); 376 } 377 dbHandler.deleteRecordsFromSuperLogTable(subStatment); 378 /* bjt - finish transaction */ 379 } 380 catch (Exception ex) { 381 RepConstants.writeERROR_FILE(ex); 382 RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()}); 383 rex.setStackTrace(ex.getStackTrace()); 384 throw rex; 385 } 386 } 387 catch (RepException rex) { 388 RepConstants.writeERROR_FILE(rex); 389 throw rex; 390 } 391 finally { 392 connectionPool.removeSubPubFromMap(subName); 393 try { 394 if (islockedTaken) 395 publication.releaseLOCK(); 396 } 397 catch (RemoteException ex2) { 398 RepConstants.writeERROR_FILE(ex2); 399 } 400 401 try { 402 if (subStatment != null) { 403 subStatment.close(); 404 405 } 406 } 407 catch (SQLException ex1) { 408 } 409 connectionPool.returnConnection(subConnection); 410 } 411 } 412 413 /** 414 * Delete records from all main tables 415 * 416 * @throws SQLException 417 */ 418 private void deleteAllRecordsFromMainTables(Statement subStatment) throws SQLException, RepException { 419 for (int i = subRepTables.size() - 1; i >= 0; i--) { 420 RepTable repTable = (RepTable) subRepTables.get(i); 421 subStatment.execute("delete from " + repTable.getSchemaQualifiedName()); 422 } 423 } 424 425 /** 426 * This method first gets the object of the publications object, 427 * and makes an entry in the publisher server's bookmark table. 428 * Next, it creates struct_pubname.xml,struct_pubname.zip file , blob.lob, clob.lob 429 * on the publisher and writes these on socket to the subscriber side, then parses 430 * them. 431 * 432 * @param primColMap 433 * @return ArrayList[] 434 * @throws SQLException 435 * @throws RemoteException 436 * @throws RepException 437 */ 438 439 private ArrayList[] getPublicationTableQueries(HashMap primColMap, _PubImpl pub) throws 440 SQLException, RemoteException, RepException { 441 442 if (pub == null) { 443 throw new RepException("REP036", new Object[] {pubName}); 444 } 445 conflictResolver = pub.getConflictResolver(); 446 String address = null,remoteMachineAddresss =null; 447 _ReplicationServerImpl replicationServer = getRemoteReplicationServer(); 448 remoteMachineAddresss = replicationServer.getServerName(); 449 450 try { 451 //address = InetAddress.getLocalHost().getHostAddress(); 452 address = InetAddress.getLocalHost().getHostName(); 453 } 454 catch (UnknownHostException ex) { 455 RepConstants.writeERROR_FILE(ex); 456 throw new RepException("REP025", new Object[] {subName, ex.getMessage()}); 457 } 458 //Do entry in the server's bookmark table. 459 //Creates struct_pubname.xml,struct_pubname.zip file , blob.lob, clob.lob on the publisher. 460 //Writes tablequery file i.e XMLfile on the subscribers socket 461 pub.createStructure(Utility.getVendorType(connectionPool, subName), subName,address, isSchemaSupported(),fileUpload,address); 462 ArrayList schemas = null; 463 ArrayList queries = null; 464 Connection connection = null; 465 try { 466 connection = connectionPool.getConnection(subName); 467 MetaDataInfo mdi = Utility.getDatabaseMataData(connection); 468 String filePath = PathHandler.getDefaultFilePathForCreateStructure("struct_" + pubName + "_" + subName); 469 //Creates xml,blob.lob,clob.lob files over the subscriber 470 if(!address.equalsIgnoreCase(remoteMachineAddresss)) { 471 String zipFilePath = PathHandler.getDefaultZIPFilePathForCreateStructure("struct_" + pubName + "_" + subName); 472 ZipHandler.unStructZip(zipFilePath, filePath); 473 } 474 SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser(); 475 //Returns XMLReaderImpl class' object in XMLReader interface's reference. 476 XMLReader reader = saxParser.getXMLReader(); 477 schemas = new ArrayList(); 478 queries = new ArrayList(); 479 ContentHandler ddh = new DDLHandler(this, schemas, queries,conflictResolver, primColMap, mdi); 480 reader.setContentHandler(ddh); 481 //Actually called method is XMLReaderImpl->parse. 482 //internally calls startDicument,endDocument methods of DDLHandler. 483 reader.parse(filePath); 484 } 485 catch (SAXException ex) { 486 RepConstants.writeERROR_FILE(ex); 487 throw new RepException("REP081", null); 488 } 489 catch (ParserConfigurationException ex) { 490 RepConstants.writeERROR_FILE(ex); 491 throw new RepException("REP081", null); 492 } 493 catch (FileNotFoundException ex) { 494 RepConstants.writeERROR_FILE(ex); 495 throw new RepException("REP082", null); 496 } 497 catch (IOException ex) { 498 RepConstants.writeERROR_FILE(ex); 499 throw new RepException("REP083", null); 500 } 501 finally { 502 connectionPool.returnConnection(connection); 503 } 504 return new ArrayList[] {schemas, queries}; 505 } 506 507 /** 508 * Gets the object of the replication server class by looking up the 509 * rmi registry listening at the specified port and url. 510 * 511 * @return 512 * @throws NotBoundException 513 * @throws MalformedURLException 514 * @throws RemoteException 515 * @throws RepException 516 */ 517 518 private _ReplicationServerImpl getRemoteReplicationServer() throws 519 RepException { 520 //remoteUrl & remotePort are set by the user it comes from the functions setRemoteServerPortNo. 521 //& setRemoteServerURL... 522 try { 523 if (remoteServer != null) { 524 return remoteServer; 525 } 526 String ipadd = null; 527 try { 528 //ipadd = InetAddress.getByName(remoteUrl).getHostAddress(); 529 // bjt - remote 530 ipadd = InetAddress.getByName(remoteUrl).getHostName(); 531 } 532 catch (UnknownHostException ex) { 533 RepConstants.writeERROR_FILE(ex); 534 throw new RepException("REP001", new Object[] {ex.getMessage()}); 535 } 536 String name = "rmi://" + ipadd + ":" + Integer.toString(remotePort) + "/" + remoteUrl + "_" + remotePort; 537 log.debug(" bjt - rmi line "); 538 log.debug("Naming.lookup(" + name + ")"); 539 remoteServer = (_ReplicationServerImpl) Naming.lookup(name); 540 log.debug(" bjt - returning remoteServer " + remoteServer); 541 return remoteServer; 542 } 543 catch (RemoteException ex) { 544 RepConstants.writeERROR_FILE(ex); 545 throw new RepException("REP025", new Object[] {subName, ex.getMessage()}); 546 } 547 catch (MalformedURLException ex) { 548 RepConstants.writeERROR_FILE(ex); 549 throw new RepException("REP025", new Object[] {subName, ex.getMessage()}); 550 } 551 552 catch (NotBoundException ex1) { 553 RepConstants.writeERROR_FILE(ex1); 554 throw new RepException("REP025", new Object[] {subName, ex1.getMessage()}); 555 } 556 catch (RepException ex1) { 557 throw ex1; 558 } 559 } 560 561 /** 562 * Takes changed records from publisher, transfers them to subscriber and applies them, 563 * then takes changed records from subscriber and applies them back to the publisher. This 564 * is the main function called for merge replication, and does everything according to the 565 * rules set forth in the conflict resolution algorithm chosen for this publication. 566 * 567 * @throws RepException 568 */ 569 public synchronized void synchronize() throws RepException { 570 _PubImpl publication = null; 571 Object[] serverInfo = null; 572 Object[] pubLastSyncId = null; 573 BufferedWriter bw = null; 574 Connection subConnection = null; 575 Statement stmt = null; 576 ResultSet rs = null; 577 boolean isCurrentTableCyclic = false, islockedTaken = false; 578 String localMachineAddress=null,remoteMachineAddress=null; 579 try { 580 try { 581 _ReplicationServerImpl remoteRepServer = getRemoteReplicationServer(); 582 publication = remoteRepServer.getRemotePublication(pubName); 583 remoteMachineAddress = remoteRepServer.getRemoteAddress(); 584 publication.checkForLock(subName); 585 islockedTaken = true; 586 //Creates synchronization related files over server and transfers it over client socket. 587 //Besides it creates server socket over server and returns serversocket related information. 588 localMachineAddress = InetAddress.getLocalHost().getHostName(); 589 serverInfo = publication.createXMLForClient(subName, getServerName(),isSchemaSupported(),fileUpload,localMachineAddress); 590 pubLastSyncId = ( (Object[]) serverInfo[2]); 591 } 592 catch (RemoteException ex1) { 593 RepConstants.writeERROR_FILE(ex1); 594 RepException rex = new RepException("REP001",new Object[] {ex1.getMessage()}); 595 rex.setStackTrace(ex1.getStackTrace()); 596 throw rex; 597 } 598 catch (Exception ex1) { 599 RepConstants.writeERROR_FILE(ex1); 600 RepException rex = new RepException("REP057", new Object[] {ex1.getMessage()}); 601 rex.setStackTrace(ex1.getStackTrace()); 602 throw rex; 603 } 604 605 try { 606 // unzipping zip file 607 if(!localMachineAddress.equalsIgnoreCase(remoteMachineAddress)) { 608 log.debug(" bjt - localMachineAddress = " + localMachineAddress + ", remoteMachineAddress = " + remoteMachineAddress); 609 ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName),PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName)); 610 } 611 } 612 catch (IOException ex) { 613 RepConstants.writeERROR_FILE(ex); 614 throw new RepException("REP084", null); 615 } 616 617 try { 618 subConnection = connectionPool.getConnection(subName); 619 /* bjt - start transaction here or below */ 620 stmt = subConnection.createStatement(); 621 SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser(); 622 XMLReader reader = saxParser.getXMLReader(); 623 624 MergeHandler mg = new MergeHandler(true, 625 subConnection, this, 626 publication.getServerName(), 627 dbHandler, bw, "MERGE REPLICATION", 628 PathHandler. 629 fullOrPartialTransactionLogFile(), 630 Utility.getDatabaseMataData(subConnection)); 631 mg.setLocalName(subName); 632 mg.setRemoteName(pubName); 633 ContentHandler ch = mg; 634 reader.setContentHandler(ch); 635 636 //initializing hashmap for maxSyncId for updating consideredId of bookMarkTable further 637 syncIdMap = new HashMap(); 638 for (int i = 0; i < subRepTables.size(); i++) { 639 /* bjt - start transaction here or above - spot 2 */ 640 RepTable repTable =( (RepTable) subRepTables.get(i)); 641 String tableName = repTable.getSchemaQualifiedName().toString(); 642 StringBuffer query = new StringBuffer(); 643 isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES); 644 query.append(" select ").append(RepConstants.shadow_sync_id1). 645 append(" from ").append(RepConstants.shadow_Table(tableName)). 646 append(" order by ").append(RepConstants.shadow_sync_id1). 647 append(" desc limit 1"); 648 rs = stmt.executeQuery(query.toString()); 649 long syncIdValue = new Long(0); 650 if(rs.next()) { 651 syncIdValue = rs.getLong(1); 652 } 653 syncIdMap.put(tableName, syncIdValue); 654 log.debug("tableName:" + tableName + " syncid: " + syncIdValue); 655 } 656 /* bjt - do the work of absorbing the sync file into the database here */ 657 reader.parse(PathHandler.getDefaultFilePathForClient("server_" + pubName + "_" + subName)); 658 mg.closeAllStatementAndResultset(); 659 makeSubscriberTransactionLgFile(subName,mg,bw,"MERGE"); 660 661 // Second pass 662 /* bjt - not being used for cyclic depenency anyway...*/ 663 /* if (isCurrentTableCyclic) { */ 664 MergeHandler mg1 = new MergeHandler(false, 665 subConnection, this, 666 publication.getServerName(), 667 dbHandler, bw, 668 "MERGE REPLICATION", 669 PathHandler. 670 fullOrPartialTransactionLogFile(), 671 Utility.getDatabaseMataData(subConnection)); 672 mg1.setLocalName(subName); 673 mg1.setRemoteName(pubName); 674 ContentHandler ch1 = mg1; 675 reader.setContentHandler(ch1); 676 reader.parse(PathHandler.getDefaultFilePathForClient("server_" +pubName+ "_" + subName)); 677 makeSubscriberTransactionLgFile(subName,mg,bw,"MERGE"); 678 mg1.closeAllStatementAndResultset(); 679 /* } */ 680 681 //updating consideredId on subscriber side 682 for (int i = 0; i < subRepTables.size(); i++) { 683 String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString(); 684 if ( ( (RepTable) subRepTables.get(i)).getCreateShadowTable().equalsIgnoreCase(RepConstants.NO)) 685 continue; 686 UpdateConisderedForBookMarksTable(pubName, subName, tableName,(Long) syncIdMap.get(tableName),stmt); 687 } 688 syncIdMap.clear(); 689 } 690 catch (Exception ex) { 691 if(Utility.createTransactionLogFile) { 692 AbstractSynchronize.writeUnsuccessfullOperationInTransaction(bw); 693 } 694 RepConstants.writeERROR_FILE(ex); 695 RepException rep = null; 696 if (ex instanceof SAXException) { 697 Exception e = ( (SAXException) ex).getException(); 698 if (e instanceof RepException) { 699 throw (RepException) e; 700 } 701 else { 702 rep = new RepException("REP057", new Object[] {e.getMessage()}); 703 rep.setStackTrace(e.getStackTrace()); 704 throw rep; 705 } 706 } 707 else { 708 rep = new RepException("REP057", new Object[] {ex.getMessage()}); 709 rep.setStackTrace(ex.getStackTrace()); 710 } 711 throw rep; 712 } 713 714 if (_Subscription.xmlAndShadow_entries) { 715 // deleting xml file 716 deleteFile(PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName)); 717 // deleting zip file 718 deleteFile(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName)); 719 } 720 //going to update lastsyncId in bookmark table on publisher side 721 722 try { 723 publication.updateBookMarkLastSyncId(subName, pubLastSyncId); 724 } 725 catch (Exception ex) { 726 RepConstants.writeERROR_FILE(ex); 727 throw new RepException("REP057", new Object[] {subName, ex.getMessage()}); 728 } 729 730 // CREATE n Write XML File from Client Side to Socket for server Side. 731 try { 732 if (subRepTables.size() > 0) { 733 // creating and writing XML file ON SOCKET for server 734 Object[] clientTablesAndLastId = 735 syncXMLCreator.createXMLFile(PathHandler.getDefaultFilePathForClient("client_" + subName + "_" + pubName), 736 PathHandler.getDefaultZIPFilePathForClient("client_" + subName +"_" + pubName), 737 "client_" + subName + "_" + pubName /*+ ".xml"*/, pubName,subRepTables,publication.getServerName(),subRepTables.size(), 738 _Subscription.xmlAndShadow_entries,subName,isSchemaSupported(),publication.getFileUploader(),localMachineAddress,remoteMachineAddress); 739 ArrayList usedActualTables = (ArrayList) clientTablesAndLastId[0]; 740 Object[] subLastSyncId = (Object[]) clientTablesAndLastId[1]; 741 dbHandler.deleteRecordsFromSuperLogTable(stmt); 742 //This method let publisher to accept the data written on the publisher's socket by the subscriber. And parse it. 743 //And deletes records from pub's log table.And perform other table operations. 744 publication.synchronize(subName, getServerName(),Utility.createTransactionLogFile,localMachineAddress); 745 for (int i = 0; i < subRepTables.size(); i++) { 746 RepTable repTable = (RepTable) subRepTables.get(i); 747 if (repTable.getCreateShadowTable().equalsIgnoreCase(RepConstants.NO)) 748 continue; 749 String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString(); 750 updateBookMarkLastSyncId(tableName, pubName, subLastSyncId[i],stmt); 751 } 752 if (_Subscription.xmlAndShadow_entries) { 753 deletingRecordsFromShadowTable(usedActualTables, stmt); 754 } 755 } 756 } 757 catch (Exception ex) { 758 RepConstants.writeERROR_FILE(ex); 759 if (ex instanceof RepException) { 760 throw (RepException) ex; 761 } 762 RepException rex = new RepException("REP054", new Object[] {subName,ex.getMessage()}); 763 rex.setStackTrace(ex.getStackTrace()); 764 throw rex; 765 } 766 } 767 catch (RepException rex) { 768 RepConstants.writeERROR_FILE(rex); 769 throw rex; 770 } 771 finally { 772 connectionPool.removeSubPubFromMap(subName); 773 try { 774 if (islockedTaken) 775 publication.releaseLOCK(); 776 } 777 catch (RemoteException ex2) { 778 RepConstants.writeERROR_FILE(ex2); 779 } 780 try { 781 if (bw != null) 782 bw.close(); 783 if (rs != null) { 784 rs.close(); 785 } 786 if (stmt != null) 787 stmt.close(); 788 } 789 catch (IOException ex4) { 790 } 791 catch (SQLException ex4) { 792 } 793 connectionPool.returnConnection(subConnection); 794 } 795 } 796 797 /** 798 * In Push Replication records are uploaded to the publisher database. 799 * Subscriber create a XML file of records which are considered for 800 * synchronization and writes it on client synchronization socket. 801 * The Publisher reads the zip file of client records, parses it and 802 * performs the included operations on the published tables in accordance 803 * with the conflict resolution algorithm chosen for this publication. 804 * 805 * @throws RepException 806 */ 807 808 public synchronized void push() throws RepException { 809 _PubImpl publication = null; 810 Statement stmt = null; 811 ResultSet rs = null; 812 Connection subConnection = null; 813 boolean islockedTaken = false; 814 String localMachineAddress=null,remoteMachineAddress=null; 815 try { 816 subConnection = connectionPool.getConnection(subName); 817 818 try { 819 _ReplicationServerImpl remoteRepServer =getRemoteReplicationServer(); 820 publication = remoteRepServer.getRemotePublication(pubName); 821 remoteMachineAddress =remoteRepServer.getServerName(); 822 localMachineAddress =InetAddress.getLocalHost().getHostAddress(); 823 } 824 catch (RemoteException ex1) { 825 RepConstants.writeERROR_FILE(ex1); 826 RepException rex = new RepException("REP001",new Object[] {ex1.getMessage()}); 827 rex.setStackTrace(ex1.getStackTrace()); 828 throw rex; 829 } 830 catch (Exception ex1) { 831 RepConstants.writeERROR_FILE(ex1); 832 RepException rex = new RepException("REP0106",new Object[] {ex1.getMessage()}); 833 rex.setStackTrace(ex1.getStackTrace()); 834 throw rex; 835 } 836 // CREATE n Write XML File from Client Side to Socket for server Side. 837 try { 838 stmt = subConnection.createStatement(); 839 publication.checkForLock(subName); 840 islockedTaken = true; 841 if (subRepTables.size() > 0) { 842 // creating and writing XML file ON SOCKET for server 843 Object[] clientTablesAndLastId = syncXMLCreator.createXMLFile( 844 PathHandler.getDefaultFilePathForClient("client_" + subName + "_" +pubName), 845 PathHandler.getDefaultZIPFilePathForClient("client_" + subName +"_" + pubName), "client_" + subName + "_" + pubName /*+ ".xml"*/, 846 pubName,subRepTables,publication.getServerName(),subRepTables.size(), 847 _Subscription.xmlAndShadow_entries, subName,isSchemaSupported(),publication.getFileUploader(),localMachineAddress,remoteMachineAddress); 848 ArrayList usedActualTables = (ArrayList) clientTablesAndLastId[0]; 849 Object[] subLastSyncId = (Object[]) clientTablesAndLastId[1]; 850 dbHandler.deleteRecordsFromSuperLogTable(stmt); 851 /* This method let publisher to accept the data written on the 852 publisher's socket by the subscriber. And parse it. 853 And deletes records from pub's log table. 854 And perform other table operations. 855 */ 856 //initializing hashmap for maxSyncId for updating consideredId of bookMarkTable further 857 858 syncIdMap = new HashMap(); 859 for (int i = 0; i < subRepTables.size(); i++) { 860 String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString(); 861 StringBuffer query = new StringBuffer(); 862 query.append(" select ").append(RepConstants.shadow_sync_id1). 863 append(" from ").append(RepConstants.shadow_Table(tableName)). 864 append(" order by ").append(RepConstants.shadow_sync_id1). 865 append(" desc limit 1"); 866 rs = stmt.executeQuery(query.toString()); 867 if (rs.next()) 868 syncIdMap.put(tableName, new Long(rs.getLong(1))); 869 } 870 publication.push(subName, getServerName(),Utility.createTransactionLogFile,localMachineAddress); 871 for (int i = 0; i < subRepTables.size(); i++) { 872 String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString(); 873 updateBookMarkLastSyncId(tableName, pubName, subLastSyncId[i],stmt); 874 UpdateConisderedForBookMarksTable(pubName, subName, tableName, (Long) syncIdMap.get(tableName),stmt); 875 } 876 syncIdMap.clear(); 877 if (_Subscription.xmlAndShadow_entries) { 878 deletingRecordsFromShadowTable(usedActualTables, stmt); 879 } 880 } 881 } 882 catch (Exception ex) { 883 RepConstants.writeERROR_FILE(ex); 884 if (ex instanceof RepException) { 885 throw (RepException) ex; 886 } 887 RepException rex = new RepException("REP0105", new Object[] {subName, ex.getMessage()}); 888 rex.setStackTrace(ex.getStackTrace()); 889 throw rex; 890 } 891 } 892 catch (RepException rex) { 893 RepConstants.writeERROR_FILE(rex); 894 throw rex; 895 } 896 finally { 897 connectionPool.removeSubPubFromMap(subName); 898 899 try { 900 if (islockedTaken) 901 publication.releaseLOCK(); 902 } 903 catch (RemoteException ex2) { 904 RepConstants.writeERROR_FILE(ex2); 905 } 906 connectionPool.returnConnection(subConnection); 907 } 908 } 909 910 /** 911 * In Pull Replication records are downloaded from the publisher database. 912 * Subscriber create a XML file of records which are considered for 913 * synchronization and writes it on client synchronization socket. 914 * The Subscriber then reads the zip file of records, parses it and 915 * performs the included operations on the subscribed tables in accordance 916 * with the conflict resolution algorithm chosen for this publication. 917 * 918 * @throws RepException 919 */ 920 921 922 public synchronized void pull() throws RepException { 923 _PubImpl publication = null; 924 String localAddress = null,remoteMachineAddress=null; 925 Object[] pubLastSyncId; 926 BufferedWriter bw = null; 927 Statement stmt = null; 928 ResultSet resultSet = null; 929 Connection subConnection = null; 930 boolean islockedTaken = false, isCurrentTableCyclic = false; 931 try { 932 try { 933 subConnection = connectionPool.getConnection(subName); 934 stmt = subConnection.createStatement(); 935 _ReplicationServerImpl remoteRepServer = getRemoteReplicationServer(); 936 publication = remoteRepServer.getRemotePublication(pubName); 937 remoteMachineAddress = remoteRepServer.getServerName(); 938 publication.checkForLock(subName); 939 islockedTaken = true; 940 localAddress = InetAddress.getLocalHost().getHostAddress(); 941 942 /* 943 Creates synchronization related files over server and transfers 944 it over client socket. Besides it creates server socket over server 945 and returns serversocket related information. 946 */ 947 long startTime = System.currentTimeMillis(); 948 Object[] serverInfo = publication.createXMLForClient(subName, getServerName(),isSchemaSupported(),fileUpload,localAddress); 949 pubLastSyncId = (Object[]) serverInfo[2]; 950 } 951 catch (RemoteException ex1) { 952 RepConstants.writeERROR_FILE(ex1); 953 RepException rex = new RepException("REP001",new Object[] {ex1.getMessage()}); 954 rex.setStackTrace(ex1.getStackTrace()); 955 throw rex; 956 } 957 catch (Exception ex1) { 958 RepConstants.writeERROR_FILE(ex1); 959 RepException rex = new RepException("REP0152", new Object[] {ex1.getMessage()}); 960 rex.setStackTrace(ex1.getStackTrace()); 961 throw rex; 962 } 963 964 try { 965 // unzipping zip file 966 if(!localAddress.equalsIgnoreCase(remoteMachineAddress)) { 967 ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName),PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName)); 968 } 969 } 970 catch (IOException ex) { 971 RepConstants.writeERROR_FILE(ex); 972 throw new RepException("REP084", null); 973 } 974 try { 975 976 SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser(); 977 XMLReader reader = saxParser.getXMLReader(); 978 MergeHandler mg = new MergeHandler(true, 979 subConnection, this, 980 publication.getServerName(), 981 dbHandler, bw, "PULL REPLICATION", 982 PathHandler.fullOrPartialTransactionLogFile(), 983 Utility.getDatabaseMataData(subConnection)); 984 mg.setLocalName(subName); 985 mg.setRemoteName(pubName); 986 ContentHandler ch = mg; 987 reader.setContentHandler(ch); 988 //initializing hashmap for maxSyncId for updating consideredId of bookMarkTable further 989 990 syncIdMap = new HashMap(); 991 992 for (int i = 0; i < subRepTables.size(); i++) { 993 RepTable repTable = ( (RepTable) subRepTables.get(i)); 994 String tableName = repTable.getSchemaQualifiedName().toString(); 995 StringBuffer query = new StringBuffer(); 996 isCurrentTableCyclic = repTable.getCyclicDependency(). 997 equalsIgnoreCase(RepConstants.YES); 998 query.append(" select ").append(RepConstants.shadow_sync_id1). 999 append(" from ").append(RepConstants.shadow_Table(tableName)). 1000 append(" order by ").append(RepConstants.shadow_sync_id1). 1001 append(" desc limit 1"); 1002 resultSet = stmt.executeQuery(query.toString()); 1003 if(resultSet.next()) 1004 syncIdMap.put(tableName, new Long(resultSet.getLong(1))); 1005 } 1006 reader.parse(PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName)); 1007 mg.closeAllStatementAndResultset(); 1008 1009 makeSubscriberTransactionLgFile(subName,mg,bw,"PULL"); 1010 1011 1012 /** 1013 * Following code has been written to handler the case 1014 * of cyclic table and complete the cyclic work in two 1015 * pass.In second pass value of all the colums is updated 1016 * by actual values that are set to null in first pass. 1017 * 1018 */ 1019 if (isCurrentTableCyclic) { 1020 MergeHandler mg1 = new MergeHandler(false, 1021 subConnection, this, 1022 publication.getServerName(), 1023 dbHandler, bw, 1024 "PULL REPLICATION", 1025 PathHandler.fullOrPartialTransactionLogFile(), 1026 Utility.getDatabaseMataData(subConnection)); 1027 mg1.setLocalName(subName); 1028 mg1.setRemoteName(pubName); 1029 ContentHandler ch1 = mg1; 1030 reader.setContentHandler(ch1); 1031 AbstractSynchronize.writeDateInTransactionLogFile(bw); 1032 reader.parse(PathHandler.getDefaultFilePathForClient("server_" +pubName+ "_" + subName)); 1033 mg1.closeAllStatementAndResultset(); 1034 } 1035 try { 1036 for (int i = 0; i < subRepTables.size(); i++) { 1037 String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString(); 1038 UpdateConisderedForBookMarksTable(pubName, subName, tableName,(Long) syncIdMap.get(tableName),stmt); 1039 } 1040 syncIdMap.clear(); 1041 } 1042 catch (SQLException ex) { 1043 RepConstants.writeERROR_FILE(ex); 1044 throw new RepException("REP0152", new Object[] {subName,ex.getMessage()}); 1045 } 1046 makeSubscriberTransactionLgFile(subName,mg,bw,"PULL"); 1047 } 1048 catch (Exception ex) { 1049 if(Utility.createTransactionLogFile) { 1050 AbstractSynchronize.writeUnsuccessfullOperationInTransaction(bw); 1051 } 1052 RepConstants.writeERROR_FILE(ex); 1053 RepException rep = null; 1054 if (ex instanceof SAXException) { 1055 Exception e = ( (SAXException) ex).getException(); 1056 if (e instanceof RepException) { 1057 throw (RepException) e; 1058 } 1059 else { 1060 rep = new RepException("REP0152", new Object[] {e.getMessage()}); 1061 rep.setStackTrace(e.getStackTrace()); 1062 throw rep; 1063 } 1064 } 1065 else { 1066 rep = new RepException("REP0152", new Object[] {ex.getMessage()}); 1067 rep.setStackTrace(ex.getStackTrace()); 1068 } 1069 throw rep; 1070 } 1071 1072 if (_Subscription.xmlAndShadow_entries) { 1073 // deleting xml file 1074 deleteFile(PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName)); 1075 // deleting zip file 1076 deleteFile(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName)); 1077 } 1078 try { 1079 publication.updatePublisherShadowAndBookmarkTableAfterPullOnSubscriber(subName, pubLastSyncId); 1080 } 1081 catch (Exception ex) { 1082 RepConstants.writeERROR_FILE(ex); 1083 throw new RepException("REP0152", new Object[] {subName,ex.getMessage()}); 1084 } 1085 } 1086 catch (RepException rex) { 1087 RepConstants.writeERROR_FILE(rex); 1088 throw rex; 1089 } 1090 finally { 1091 try { 1092 if (bw != null) 1093 bw.close(); 1094 connectionPool.removeSubPubFromMap(subName); 1095 } 1096 catch (Exception ex) { 1097 RepConstants.writeERROR_FILE(ex); 1098 } 1099 try { 1100 if (islockedTaken) 1101 publication.releaseLOCK(); 1102 } 1103 catch (RemoteException ex2) { 1104 RepConstants.writeERROR_FILE(ex2); 1105 } 1106 connectionPool.returnConnection(subConnection); 1107 } 1108 } 1109 1110 /** 1111 * Return the instance of RepTable corresponding to table name passed 1112 * 1113 * @param tableName String 1114 * @throws RepException 1115 * @return RepTable 1116 * 1117 */ 1118 public RepTable getRepTable(String tableName) throws RepException { 1119 for (int i = 0; i < subRepTables.size(); i++) { 1120 if ( ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName(). 1121 toString().equalsIgnoreCase(tableName)) { 1122 return (RepTable) subRepTables.get(i); 1123 } 1124 } 1125 RepException rep = new RepException("REP017", new Object[] {tableName}); 1126 RepConstants.writeERROR_FILE(rep); 1127 throw rep; 1128 } 1129 1130 public void setFilterClause(String tableName, String filterClause) { 1131 for (int i = 0; i < subRepTables.size(); i++) { 1132 if ( ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName(). 1133 toString().equalsIgnoreCase(tableName)) { 1134 ( (RepTable) subRepTables.get(i)).setFilterClause(filterClause); 1135 } 1136 } 1137 } 1138 1139 public String getServerName() throws RemoteException { 1140 return dbHandler.getLocalServerName(); 1141 } 1142 1143 public AbstractDataBaseHandler getDBDataTypeHandler() { 1144 return dbHandler; 1145 } 1146 1147 public String getPub_SubName() { 1148 return subName; 1149 } 1150 1151 private void deleteFile(String fileName) { 1152 File f = new File(fileName); 1153 boolean deleted = f.delete(); 1154 } 1155 1156 private void deleteALLRecordsFromSuperLogTable(Statement stmt) throws SQLException { 1157 stmt.executeUpdate("delete from " + dbHandler.getLogTableName()); 1158 } 1159 1160 private void deleteRecordsFromSuperLogTable1(Statement subStatment) throws SQLException { 1161 // insert one record in superLogTable 1162 StringBuffer query = new StringBuffer(); 1163 query.append("insert into ").append(dbHandler.getLogTableName()).append(" ("). 1164 append(RepConstants.logTable_tableName2).append(") values ('$$$$$$')"); 1165 1166 subStatment.execute(query.toString()); 1167 1168 query = new StringBuffer(); 1169 // deleting all but one last record from super log table where commonid is maximum 1170 query.append("Select max (").append(RepConstants.logTable_commonId1). 1171 append(") from ").append(dbHandler.getLogTableName()); 1172 ResultSet rs = subStatment.executeQuery(query.toString()); 1173 rs.next(); 1174 long maxCID = rs.getLong(1); 1175 1176 query = new StringBuffer(); 1177 1178 query.append("delete from ").append(dbHandler.getLogTableName()).append( 1179 " where ") 1180 .append(RepConstants.logTable_commonId1).append(" !=").append(maxCID); 1181 int deletedNo = subStatment.executeUpdate(query.toString()); 1182 log.debug(query.toString()); 1183 } 1184 1185 private void deletingRecordsFromShadowTable(ArrayList usedActualTables,Statement stmt) throws SQLException, RepException { 1186 1187 int noOFTables = usedActualTables.size(); 1188 StringBuffer query; 1189 if (noOFTables > 0) { 1190 for (int i = 0; i < noOFTables; i++) { 1191 Object minValue= dbHandler.getMinValOfSyncIdTodeleteRecordsFromShadowTable((String) 1192 usedActualTables.get(i),stmt); 1193 if (minValue instanceof BigDecimal) { 1194 minValue = new Long( ( (BigDecimal) minValue).longValue()); 1195 } 1196 else if (minValue instanceof Double) { 1197 minValue = new Long( ( (Double) minValue).longValue()); 1198 }else if (minValue instanceof Long) { 1199 minValue = new Long( ( (Long) minValue).longValue()); 1200 }else if (minValue instanceof Integer) { 1201 minValue = new Long( ( (Integer) minValue).longValue()); 1202 } 1203 1204 // deleting records from shadow table for that table 1205 query = new StringBuffer(); 1206 query.append("delete from ").append(RepConstants.shadow_Table( ( 1207 String) usedActualTables.get(i))).append(" where ").append( 1208 RepConstants.shadow_sync_id1).append(" < ").append(minValue); 1209 log.debug(query.toString()); 1210 int count = stmt.executeUpdate(query.toString()); 1211 } 1212 } 1213 } 1214 1215 private void deletingRecordsFromShadowTable_old(ArrayList usedActualTables) { 1216 Connection conn = null; 1217 Statement stmt = null; 1218 try { 1219 int noOFTables = usedActualTables.size(); 1220 StringBuffer query; 1221 if (noOFTables > 0) { 1222 conn = connectionPool.getConnection(subName); 1223 stmt = conn.createStatement(); 1224 for (int i = 0; i < noOFTables; i++) { 1225 String shadowTable = RepConstants.shadow_Table( (String) 1226 usedActualTables.get(i)); 1227 query = new StringBuffer(); 1228 query.append("SELECT MAX(").append(RepConstants.shadow_sync_id1). 1229 append(") from ").append(shadowTable); 1230 ResultSet rs = stmt.executeQuery(query.toString()); 1231 rs.next(); 1232 long maxSyncId = rs.getLong(1); 1233 1234 query = new StringBuffer(); 1235 query.append("delete from ").append(shadowTable).append(" where ") 1236 .append(RepConstants.shadow_sync_id1).append(" != ").append(maxSyncId); 1237 int count = stmt.executeUpdate(query.toString()); 1238 log.debug(query.toString()); 1239 } 1240 } 1241 } 1242 catch (Exception ex) { 1243 log.debug(ex); 1244 RepConstants.writeERROR_FILE(ex); 1245 } 1246 finally { 1247 connectionPool.removeSubPubFromMap(subName); 1248 try { 1249 if (stmt != null) 1250 stmt.close(); 1251 try { 1252 connectionPool.returnConnection(conn); 1253 } 1254 catch (RepException ex) { 1255 // Ignore the exception 1256 } 1257 } 1258 catch (SQLException ex1) { 1259 } 1260 } 1261 } 1262 1263 private void UpdateConisderedForBookMarksTable(String pubName, 1264 String subName, 1265 String tableName, 1266 Long syncId, Statement stmt) throws 1267 SQLException, RepException { 1268 1269 StringBuffer query = new StringBuffer(); 1270 1271 Long maxSyncId = syncId; 1272 log.debug("maxSyncId" + maxSyncId); 1273 query.append(" UPDATE ").append(dbHandler.getBookMarkTableName()).append( 1274 " set ").append(RepConstants.bookmark_ConisderedId5) 1275 .append(" = ").append(maxSyncId).append(" where ").append( 1276 RepConstants. 1277 bookmark_LocalName1).append(" = '").append(subName).append("' and "). 1278 append(RepConstants.bookmark_RemoteName2) 1279 .append(" = '").append(pubName).append("' and ") 1280 .append(RepConstants.bookmark_TableName3).append(" = '").append( 1281 tableName).append("'"); 1282 stmt.executeUpdate(query.toString()); 1283 log.debug(query.toString()); 1284 1285 } 1286 1287 /** 1288 * This method first gets the object of the replication server by 1289 * looking up the remote url and port. Then gets the publication object 1290 * from that remote object. 1291 * 1292 * @return 1293 * @throws RepException 1294 * @throws RemoteException 1295 */ 1296 1297 private _PubImpl getPublication() throws RepException, RemoteException { 1298 _ReplicationServerImpl serverRS = null; 1299 serverRS = getRemoteReplicationServer(); 1300 _PubImpl pub = serverRS.getRemotePublication(pubName); 1301 if (pub == null) { 1302 throw new RepException("REP048", new Object[] {subName, pubName}); 1303 } 1304 return pub; 1305 } 1306 1307 /* It unsubscribe the subscriber as well as drop all related schedules */ 1308 /** 1309 * This method removes the subscribed tables from the publication server's bookmark table, 1310 * then drops all replication related tables in the subscribed database 1311 * 1312 * @throws RepException 1313 * @throws RemoteException 1314 */ 1315 1316 public void unsubscribe() throws RepException { 1317 Connection con = null; 1318 Statement stt = null; 1319 try { 1320 try { 1321 getPublication().dropSubscription(subName); 1322 } 1323 catch (RepException ex) { 1324 RepConstants.writeERROR_FILE(ex); 1325 throw ex; 1326 } 1327 catch (Exception ex) { 1328 RepConstants.writeERROR_FILE(ex); 1329 throw new RepException("REP046", new Object[] {subName, ex.getMessage()}); 1330 } 1331 try { 1332 con = connectionPool.getFreshConnection(subName); 1333 dbHandler.setDefaultSchema(con); 1334 stt = con.createStatement(); 1335 try { 1336 stt.execute(" delete from " + dbHandler.getSubscriptionTableName() + 1337 " where " + RepConstants.subscription_subName1 + " = '" + 1338 subName + "'"); 1339 1340 log.debug(" delete from " + dbHandler.getSubscriptionTableName() + 1341 " where " + RepConstants.subscription_subName1 + " = '" + 1342 subName + "'"); 1343 } 1344 catch (SQLException ex) { 1345 RepConstants.writeERROR_FILE(ex); 1346 throw new RepException("REP037", new Object[] {subName}); 1347 } 1348 try { 1349 localServer.getScheduleHandler().dropSchedule(subName); 1350 } 1351 catch (RepException ex) { 1352 if (! (ex.getRepCode() == "REP203")) { 1353 RepConstants.writeERROR_FILE(ex); 1354 throw ex; 1355 } 1356 } 1357 deleteNonSharedPublishedSubscribedTables(con, subName); 1358 localServer.refershSubscription(subName); 1359 } 1360 catch (SQLException ex) { 1361 RepConstants.writeERROR_FILE(ex); 1362 } 1363 } 1364 finally { 1365 connectionPool.removeSubPubFromMap(subName); 1366 try { 1367 if (stt != null) 1368 stt.close(); 1369 } 1370 catch (SQLException ex1) { 1371 } 1372 connectionPool.returnConnection(con); 1373 } 1374 } 1375 1376 private void deleteNonSharedPublishedSubscribedTables(Connection con, 1377 String pubsubName) throws SQLException, RepException { 1378 1379 //" Select Table_Name from RepTable Where PubSub_Name = '"+pubName+"' and Table_Name Not In " 1380 //"( Select Distinct Table_Name from RepTable Where PubSub_Name <> '"+pubName+"') "; 1381 1382 Statement stt = null; 1383 ResultSet rs = null; 1384 ResultSet rsPub = null; 1385 1386 try { 1387 StringBuffer tablesToDelete = new StringBuffer(); 1388 tablesToDelete.append(" Select ").append(RepConstants.repTable_tableName2) 1389 .append(" from ").append(dbHandler.getRepTableName()).append( 1390 " where ") 1391 .append(RepConstants.repTable_pubsubName1).append(" = '").append( 1392 pubsubName) 1393 .append("' and ").append(RepConstants.repTable_tableName2) 1394 .append(" not in ( Select Distinct ").append(RepConstants. 1395 repTable_tableName2) 1396 .append(" from ").append(dbHandler.getRepTableName()) 1397 .append(" Where ").append(RepConstants.repTable_pubsubName1) 1398 .append(" <> '").append(pubsubName).append("') "); 1399 stt = con.createStatement(); 1400 rs = stt.executeQuery(tablesToDelete.toString()); 1401 Connection pubsubConnection = null; 1402 try 1403 { 1404 pubsubConnection = connectionPool.getConnection(pubsubName); 1405 while (rs.next()) { 1406 MetaDataInfo mdi = Utility.getDatabaseMataData(pubsubConnection); 1407 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, 1408 rs.getString(RepConstants.repTable_tableName2)); 1409 deleteAll(pubsubName, sname.toString(), con); 1410 } 1411 } 1412 finally { 1413 connectionPool.returnConnection(pubsubConnection); 1414 } 1415 stt.execute(" delete from " + dbHandler.getRepTableName() + " where " + 1416 RepConstants.repTable_pubsubName1 + " = '" + pubsubName +"'"); 1417 log.debug(" delete from " + dbHandler.getRepTableName() + " where " + 1418 RepConstants.repTable_pubsubName1 + " = '" + pubsubName + "'"); 1419 stt.execute(" delete from " + dbHandler.getBookMarkTableName() + 1420 " where " + 1421 RepConstants.bookmark_LocalName1 + " = '" + pubsubName +"'"); 1422 log.debug(" delete from " + dbHandler.getBookMarkTableName() + 1423 " where " + 1424 RepConstants.bookmark_LocalName1 + " = '" + pubsubName + "'"); 1425 rs = stt.executeQuery("select * from " + 1426 dbHandler.getSubscriptionTableName()); 1427 1428 boolean issubTableExist = rs != null ? rs.next() ? true : false : false; 1429 1430 try { 1431 rsPub = stt.executeQuery("select * from " + 1432 dbHandler.getPublicationTableName()); 1433 } 1434 catch (SQLException ex) { 1435 /** 1436 * Ignore the exception because Publication table 1437 * does not exist in Subscriber Database. It is the 1438 * case when table is published and subscribed in 1439 * same database. 1440 */ 1441 } 1442 boolean ispubTableExist = rsPub != null ? rsPub.next() ? true : false : false; 1443 if (!ispubTableExist && !issubTableExist) { 1444 dbHandler.dropSubscriberSystemTables(con); 1445 } 1446 } 1447 finally { 1448 if (rs != null) 1449 rs.close(); 1450 if (rsPub != null) 1451 rsPub.close(); 1452 if (stt != null) 1453 stt.close(); 1454 } 1455 } 1456 1457 private void deleteAll(String pubsubName, String table, Connection con) throws 1458 SQLException, 1459 1460 RepException { 1461 1462 dbHandler.dropTriggersAndShadowTable(con, table, pubsubName); 1463 1464 } 1465 1466 public ArrayList getRepTables() { 1467 return subRepTables; 1468 } 1469 1470 /* It saves the schedule information in the schedule table*/ 1471 1472 private void saveScheduleData(String schName, String subName, 1473 String scheduleType, 1474 String publicationServerName, 1475 String publicationPortNo, 1476 String recurrenceType, 1477 String repType, long schTime, int counter) throws 1478 RepException { 1479 1480 Connection connection = connectionPool.getConnection(subName); 1481 Statement stt = null; 1482 ResultSet rs = null; 1483 try { 1484 stt = connection.createStatement(); 1485 StringBuffer insertQuery = new StringBuffer(); 1486 insertQuery = insertQuery.append("insert into ").append(dbHandler. 1487 getScheduleTableName()) 1488 .append(" values ( '").append(schName).append("', '") 1489 .append(subName).append("' , '").append(scheduleType).append( 1490 "' , '") 1491 .append(publicationServerName).append("', '") 1492 .append(publicationPortNo).append("', '") 1493 .append(recurrenceType).append("' , '") 1494 .append(repType).append("' , ").append(schTime).append(" , "). 1495 append( 1496 counter).append(" )"); 1497 stt.executeUpdate(insertQuery.toString()); 1498 log.info("add schedule query " + insertQuery.toString()); 1499 1500 localServer.getScheduleHandler().startSchedule(subName, schName, 1501 scheduleType, 1502 publicationServerName, publicationPortNo, recurrenceType, repType, 1503 schTime, counter); 1504 } 1505 catch (SQLException ex) { 1506 log.error(ex.getMessage(), ex); 1507 try { 1508 StringBuffer query = new StringBuffer(); 1509 query = query.append("select ").append(RepConstants.subscription_subName1).append( 1510 " from ").append(dbHandler.getScheduleTableName()).append( 1511 " where "). 1512 append(RepConstants.subscription_subName1).append(" = '").append( 1513 subName).append("'"); 1514 rs = stt.executeQuery(query.toString()); 1515 if (rs.next() == true) { 1516 throw new RepException("REP201", new Object[] {subName}); 1517 } 1518 } 1519 catch (SQLException ex1) { 1520 RepConstants.writeERROR_FILE(ex1); 1521 } 1522 } 1523 catch (RepException ex) { 1524 RepConstants.writeERROR_FILE(ex); 1525 throw ex; 1526 } 1527 finally { 1528 connectionPool.removeSubPubFromMap(subName); 1529 try { 1530 if (stt != null) 1531 stt.close(); 1532 if (rs != null) 1533 rs.close(); 1534 } 1535 catch (SQLException ex2) { 1536 } 1537 connectionPool.returnConnection(connection); 1538 } 1539 } 1540 1541 /** 1542 * Add a schedule entry to the replication tables, which will be looked at 1543 * when the replication instance is started, and obeyed according to its 1544 * specification 1545 * 1546 * @param scheduleName String 1547 * @param subscriptionName String 1548 * @param scheduleType0 String 1549 * @param publicationServerName String 1550 * @param publicationPortNo String 1551 * @param recurrenceType String 1552 * @param replicationType String 1553 * @param startDateTime Timestamp 1554 * @param scheduleCounter int 1555 * @throws RepException 1556 */ 1557 public void addSchedule(String scheduleName, String subscriptionName, 1558 String scheduleType0, String publicationServerName, 1559 String publicationPortNo, 1560 String recurrenceType, String replicationType, 1561 Timestamp startDateTime, int scheduleCounter) throws 1562 RepException { 1563 String scheduleType = null; 1564 scheduleType = scheduleType0; 1565 try { 1566 if (scheduleName == null) { 1567 throw new RepException("REP215", null); 1568 } 1569 _Subscription sub = localServer.getSubscription(subscriptionName); 1570 if (sub == null) { 1571 throw new RepException("REP037", new Object[] {subscriptionName}); 1572 } 1573 if (publicationServerName == null) { 1574 throw new RepException("REP094", null); 1575 } 1576 if (publicationPortNo == null) { 1577 throw new RepException("REP095", null); 1578 } 1579 try { 1580 sub.setRemoteServerPortNo(Integer.parseInt(publicationPortNo)); 1581 } 1582 catch (NumberFormatException ex) { 1583 throw new RepException("REP220", new Object[] {null}); 1584 } 1585 if (replicationType == null) { 1586 throw new RepException("REP212", null); 1587 } 1588 if (scheduleType == null) { 1589 throw new RepException("REP222", new Object[] {null}); 1590 } 1591 if (! ( (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_nonRealTime)) || 1592 (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_realTime)))) { 1593 throw new RepException("REP223", new Object[] {null}); 1594 } 1595 if (! (replicationType.equalsIgnoreCase(RepConstants.replication_snapshotType) || 1596 replicationType.equalsIgnoreCase(RepConstants.replication_synchronizeType) || 1597 replicationType.equalsIgnoreCase(RepConstants.replication_pullType) || 1598 replicationType.equalsIgnoreCase(RepConstants.replication_pushType))) { 1599 throw new RepException("REP216", null); 1600 } 1601 long startTime = 0; 1602 if (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_nonRealTime)) { 1603 startTime = startDateTime.getTime(); 1604 if (startTime <= System.currentTimeMillis()) { 1605 throw new RepException("REP202", null); 1606 } 1607 if (recurrenceType == null) { 1608 throw new RepException("REP213", null); 1609 } 1610 if (! (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_yearType) || 1611 recurrenceType.equalsIgnoreCase(RepConstants.recurrence_monthType) || 1612 recurrenceType.equalsIgnoreCase(RepConstants.recurrence_dayType) || 1613 recurrenceType.equalsIgnoreCase(RepConstants.recurrence_hourType)|| 1614 recurrenceType.equalsIgnoreCase(RepConstants.recurrence_minuteType))) { 1615 throw new RepException("REP218", null); 1616 } 1617 1618 if (startDateTime == null) { 1619 throw new RepException("REP214", null); 1620 } 1621 if (! (scheduleCounter >= 0)) { 1622 throw new RepException("REP217", null); 1623 } 1624 } 1625 1626 saveScheduleData(scheduleName, subscriptionName, scheduleType, 1627 publicationServerName, 1628 publicationPortNo, recurrenceType, replicationType, 1629 startTime, scheduleCounter); 1630 } 1631 catch (RepException ex) { 1632 RepConstants.writeERROR_FILE(ex); 1633 throw ex; 1634 } 1635 finally { 1636 connectionPool.removeSubPubFromMap(subName); 1637 } 1638 } 1639 1640 /** 1641 * Remove a schedule entry from the schedule table 1642 * 1643 * @param scheduleName0 String 1644 * @param subName0 String 1645 */ 1646 public void removeSchedule(String scheduleName0, String subName0) throws 1647 RepException { 1648 try { 1649 if (scheduleName0 == null) { 1650 throw new RepException("REP215", null); 1651 } 1652 _Subscription sub = localServer.getSubscription(subName0); 1653 if (sub == null) { 1654 throw new RepException("REP037", new Object[] {subName0}); 1655 } 1656 1657 localServer.getScheduleHandler().dropSchedule(subName0); 1658 } 1659 catch (RepException ex) { 1660 RepConstants.writeERROR_FILE(ex); 1661 throw ex; 1662 } 1663 finally { 1664 connectionPool.removeSubPubFromMap(subName); 1665 } 1666 } 1667 1668 /** 1669 * Edit an existing schedule entry 1670 * 1671 * @param scheduleName0 String 1672 * @param subName0 String 1673 * @param newPubServerName String 1674 * @param newPubPortNo String 1675 */ 1676 public void editSchedule(String scheduleName0, String subName0, 1677 String newPubServerName, String newPubPortNo) throws 1678 RepException { 1679 try { 1680 if (scheduleName0 == null) { 1681 throw new RepException("REP215", null); 1682 } 1683 _Subscription sub = localServer.getSubscription(subName0); 1684 if (sub == null) { 1685 throw new RepException("REP037", new Object[] {subName0}); 1686 } 1687 if (newPubServerName == null) { 1688 throw new RepException("REP094", null); 1689 } 1690 if (newPubPortNo == null) { 1691 throw new RepException("REP095", null); 1692 } 1693 try { 1694 sub.setRemoteServerPortNo(Integer.parseInt(newPubPortNo)); 1695 } 1696 catch (NumberFormatException ex) { 1697 throw new RepException("REP220", new Object[] {null}); 1698 } 1699 localServer.getScheduleHandler().editSchedule(scheduleName0, subName0, 1700 newPubServerName, newPubPortNo); 1701 } 1702 catch (RepException ex) { 1703 RepConstants.writeERROR_FILE(ex); 1704 throw ex; 1705 } 1706 finally { 1707 connectionPool.removeSubPubFromMap(subName); 1708 } 1709 } 1710 1711 private void updateBookMarkLastSyncId(String tableName, 1712 String remote_Pub_Sub_Name, 1713 Object lastId, Statement stmt) throws 1714 Exception { 1715 String updateQuery = "update " + dbHandler.getBookMarkTableName() + 1716 " set " + 1717 RepConstants.bookmark_lastSyncId4 + "=" + lastId + " where " + 1718 RepConstants.bookmark_LocalName1 + " = '" + subName + "' and " + 1719 RepConstants.bookmark_RemoteName2 + " = '" + remote_Pub_Sub_Name + 1720 "' and " + RepConstants.bookmark_TableName3 + " = '" + tableName + 1721 "' "; 1722 stmt.executeUpdate(updateQuery); 1723 } 1724 1725 public synchronized void updateSubscription() throws RepException { 1726 /* NEEDS WORK...bjt */ 1727 Connection subConnection = null; 1728 Statement stt = null; 1729 ResultSet rs = null; 1730 try { 1731 subConnection = connectionPool.getConnection(subName); 1732 MetaDataInfo mdi = Utility.getDatabaseMataData(subConnection); 1733 HashMap primCols = new HashMap(); 1734 _PubImpl pub = getPublication(); 1735 int pubVendorType = pub.getPubVendorName(); 1736 //getting list of tables which after subscribing were dropped(even if dropped and added again by pub) 1737 ArrayList dropTableListFromSub = pub.dropTableListForSub(subName); 1738 //dropping table and unsubscribing them from replicator tables 1739 if (!dropTableListFromSub.isEmpty()){ 1740 UnsubscribeAndDropTablesDroppedFromPub(subConnection,dropTableListFromSub); 1741 } 1742 //Returns [0] array of schemas [1]publicationtablequeries. 1743 ArrayList[] schemaTables = getPublicationTableQueries(primCols, pub); 1744 ArrayList dropTableList = new ArrayList(); 1745 ArrayList oldSubRepTableList = new ArrayList(); 1746 ArrayList newSubRepTableList = new ArrayList(); 1747 stt = subConnection.createStatement(); 1748 StringBuffer sb = new StringBuffer(); 1749 //getting all the old subscribed tables list 1750 sb.append("select ").append(RepConstants.repTable_tableName2).append( 1751 " from ").append(dbHandler.getRepTableName()).append(" where ") 1752 .append(RepConstants.repTable_pubsubName1).append(" = '") 1753 .append(subName).append("'"); 1754 rs = stt.executeQuery(sb.toString()); 1755 while (rs.next()) { 1756 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, rs.getString(1)); 1757 String schema = sname.getSchemaName(); 1758 String table = sname.getTableName(); 1759 RepTable repTable = new RepTable(sname, RepConstants.publisher); 1760 //Sets Sorted primary key columns in repTable. 1761 mdi.setPrimaryColumns(repTable, schema, table); 1762 oldSubRepTableList.add(repTable.getSchemaQualifiedName()); 1763 } 1764 1765 String[] alterTableQueries = alterTableAddFKQueries == null ? null : 1766 (String[]) alterTableAddFKQueries.toArray(new String[0]); 1767 1768 dbHandler.createSubscribedTablesTriggersAndShadowTables(subName, 1769 (String[]) (schemaTables[1]).toArray(new String[0]), 1770 alterTableQueries, primCols, pubVendorType, subRepTables); 1771 1772 //Do Entry in the Subscription Table , BookMark Table, Rep Table. 1773 saveSubscriptionNewData(dbHandler); 1774 //get new subReptable list 1775 for (int i = 0; i < subRepTables.size(); i++) { 1776 RepTable repTable = (RepTable) subRepTables.get(i); 1777 newSubRepTableList.add(repTable.getSchemaQualifiedName()); 1778 } 1779 //find if any table if was dropped from publisher.If any found,add to dropTableList 1780 for (int j = 0; j < oldSubRepTableList.size(); j++) { 1781 if (!newSubRepTableList.contains(oldSubRepTableList.get(j))) { 1782 dropTableList.add(oldSubRepTableList.get(j)); 1783 } 1784 } 1785 StringBuffer deleteQuery = new StringBuffer(); 1786 for (int j = 0; j < dropTableList.size(); j++) { 1787 //drop triggers,shadow tables,delete from logtable 1788 dbHandler.dropTriggersAndShadowTable(subConnection, 1789 dropTableList.get(j).toString(),subName); 1790 //delete entry from bookmarkTable 1791 deleteQuery.append(" delete from ").append(dbHandler. 1792 getBookMarkTableName()).append(" where ").append(RepConstants. 1793 bookmark_LocalName1).append(" = '").append(subName) 1794 .append("'").append(" and ").append(RepConstants.repTable_tableName2).append( 1795 " ='").append(dropTableList.get(j).toString()).append(" '"). 1796 append(" and ").append(RepConstants.bookmark_RemoteName2).append(" ='"). 1797 append(pubName).append(" '"); 1798 stt.execute(deleteQuery.toString()); 1799 } 1800 pub.saveSubscriptionNewData(subName); 1801 } 1802 1803 catch (RepException ex) { 1804 RepConstants.writeERROR_FILE(ex); 1805 throw ex; 1806 } 1807 catch (Exception ex) { 1808 RepConstants.writeERROR_FILE(ex); 1809 throw new RepException("REP318", new Object[] {ex.getMessage()}); 1810 } 1811 finally { 1812 connectionPool.removeSubPubFromMap(subName); 1813 try { 1814 if (rs != null) 1815 rs.close(); 1816 } 1817 catch (SQLException ex1) { 1818 } 1819 try { 1820 if (stt != null) { 1821 stt.close(); 1822 } 1823 } 1824 catch (SQLException ex2) { 1825 } 1826 connectionPool.returnConnection(subConnection); 1827 } 1828 } 1829 1830 /** 1831 * Make an entry in the subscriptions table. Inserts all the records 1832 * corresponding to each replication table into the reptabe and Bookmark table. 1833 * 1834 * @param dbHandler 1835 * @throws SQLException 1836 * @throws RepException 1837 */ 1838 1839 private void saveSubscriptionNewData(AbstractDataBaseHandler dbHandler) throws 1840 SQLException, RepException { 1841 Connection connection = connectionPool.getConnection(subName); 1842 Statement stt = null; 1843 try { 1844 stt = connection.createStatement(); 1845 //Delete all entries from Rep_table 1846 StringBuffer sb = new StringBuffer(); 1847 sb.append("delete from ").append(dbHandler.getRepTableName()).append(" where ") 1848 .append(RepConstants.repTable_pubsubName1).append(" = '") 1849 .append(subName).append("'"); 1850 log.debug("Delete all entries from Rep_table::" + sb.toString()); 1851 stt.execute(sb.toString()); 1852 1853 //For each table makes an entry into the BookMark Table and Rep Table 1854 for (int i = 0, size = subRepTables.size(); i < size; i++) { 1855 RepTable repTable = (RepTable) subRepTables.get(i); 1856 repTable.setConflictResolver(conflictResolver); 1857 StringBuffer sb2 = new StringBuffer(); 1858 sb2.append(" Insert into ").append(dbHandler.getBookMarkTableName()) 1859 .append(" values ( '").append(subName).append("','") 1860 .append(pubName).append("','").append(repTable.getSchemaQualifiedName()) 1861 .append("',").append("0,0,'N')"); 1862 try { 1863 log.debug("Table Entry in BookMarkTable::" + sb2.toString()); 1864 stt.execute(sb2.toString()); 1865 } 1866 catch (SQLException ex1) { 1867 } 1868 dbHandler.saveRepTableData(connection, subName, repTable); 1869 } 1870 } 1871 catch (SQLException ex) { 1872 throw ex; 1873 } 1874 finally { 1875 connectionPool.removeSubPubFromMap(subName); 1876 try { 1877 if (stt != null) { 1878 stt.close(); 1879 } 1880 } 1881 catch (SQLException ex2) { 1882 } 1883 connectionPool.returnConnection(connection); 1884 } 1885 } 1886 1887 /** 1888 * Get a snapshot of newly added tables or tables 1889 * on which no replication operation was done 1890 * 1891 * @throws RepException 1892 */ 1893 public synchronized void getSnapShotAfterUpdatingSubscriber() throws 1894 /* NEEDS WORK...bjt */ 1895 RepException { 1896 Connection subConnection = null; 1897 Statement stmt = null; 1898 _PubImpl publication = null; 1899 boolean islockedTaken = false,isCurrentTableCyclic = false; 1900 try { 1901 ServerSocket serverSocket = null; 1902 Socket socket = null; 1903 serverSocket = connectionPool.startServerSocket(); 1904 String remoteServerName; 1905 1906 String localAddress = null,remoteMachineAddress=null; 1907 try { 1908 ArrayList tablesForSnapShot = getTableListForSnapShotAfterUpdatingSub(); 1909 _ReplicationServerImpl remoteServer = getRemoteReplicationServer(); 1910 remoteServerName = remoteServer.getServerName(); 1911 remoteMachineAddress = remoteServer.getServerName(); 1912 publication = remoteServer.getRemotePublication(pubName); 1913 publication.checkForLock(subName); 1914 islockedTaken = true; 1915 localAddress = InetAddress.getLocalHost().getHostName(); 1916 publication.createSnapShotAfterUpdateSub(localAddress,serverSocket.getLocalPort(),subName, tablesForSnapShot,isSchemaSupported(),fileUpload,localAddress); 1917 } 1918 catch (RepException ex) { 1919 log.error(ex.getMessage(), ex); 1920 throw ex; 1921 } 1922 catch (Exception ex) { 1923 RepConstants.writeERROR_FILE(ex); 1924 RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()}); 1925 rex.setStackTrace(ex.getStackTrace()); 1926 throw rex; 1927 } 1928 try { 1929 // bjt - still need to fix this routine...leave commented code until fixed 1930 // socket = serverSocket.accept(); 1931 // InputStream is = socket.getInputStream(); 1932 // FileOutputStream fos = new FileOutputStream(PathHandler. 1933 // getDefaultZIPFilePathForClient("snapshot_" + pubName+ "_" +subName)); 1934 // byte[] buf = new byte[1024]; 1935 // int len = 0; 1936 // while ( (len = is.read(buf)) > 0) { 1937 // fos.write(buf, 0, len); 1938 // } 1939 // fos.close(); 1940 // is.close(); 1941 // serverSocket.close(); 1942 // socket.close(); 1943 //unzipping the zip file 1944 if(!localAddress.equalsIgnoreCase(remoteMachineAddress)) { 1945 ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("snapshot_" + pubName + "_" + subName),PathHandler.getDefaultFilePathForClient("snapshot_" +pubName + "_" + subName)); 1946 } 1947 1948 } 1949 catch (IOException ex) { 1950 RepConstants.writeERROR_FILE(ex); 1951 RepException rex = new RepException("REP052", new Object[] {subName,ex.getMessage()}); 1952 rex.setStackTrace(ex.getStackTrace()); 1953 throw rex; 1954 } 1955 1956 try { 1957 1958 for (int i = 0; i < subRepTables.size(); i++) { 1959 RepTable repTable = ( (RepTable) subRepTables.get(i)); 1960 isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES); 1961 if(isCurrentTableCyclic) 1962 break; 1963 } 1964 SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser(); 1965 XMLReader reader = saxParser.getXMLReader(); 1966 subConnection = connectionPool.getConnection(subName); 1967 stmt = subConnection.createStatement(); 1968 //Instance for content handler 1969 //bjt 1970 /* 1971 DatabaseMetaData dbmd = subConnection.getMetaData(); 1972 int batchMax = (dbmd.supportsBatchUpdates() ? 1000 : 0); 1973 log.debug("Batch Max = " + batchMax); 1974 if (batchMax > 0) { 1975 log.debug("Using statement pooling in batches of " + batchMax); 1976 } else { 1977 log.debug("NOT Using statement pooling, database does not seem to support it"); 1978 } 1979 */ 1980 //SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName, batchMax); 1981 SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName); 1982 // instance for content hanedler 1983 ch.setPubName(pubName); 1984 ch.setSubName(subName); 1985 reader.setContentHandler(ch); 1986 reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" +pubName+ "_" +subName)); 1987 ch.closeAllStatementAndResultset(); 1988 1989 if(isCurrentTableCyclic){ 1990 SnapshotHandler ch1 = new SnapshotHandler(false, subConnection, this,dbHandler, remoteServerName); // instance for content hanedler 1991 ch1.setPubName(pubName); 1992 ch1.setSubName(subName); 1993 reader.setContentHandler(ch1); 1994 reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName+ "_" +subName)); 1995 ch1.closeAllStatementAndResultset(); 1996 } 1997 1998 1999 if (_Subscription.xmlAndShadow_entries) { 2000 // deleting xml file 2001 deleteFile(PathHandler.getDefaultFilePathForClient("snapshot_" +pubName+ "_" +subName)); 2002 // deleting zip file 2003 deleteFile(PathHandler.getDefaultZIPFilePathForClient("snapshot_" +pubName+ "_" +subName)); 2004 } 2005 dbHandler.deleteRecordsFromSuperLogTable(stmt); 2006 } 2007 catch (Exception ex) { 2008 RepConstants.writeERROR_FILE(ex); 2009 RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()}); 2010 rex.setStackTrace(ex.getStackTrace()); 2011 throw rex; 2012 } 2013 } 2014 catch (RepException rex) { 2015 RepConstants.writeERROR_FILE(rex); 2016 throw rex; 2017 } 2018 finally { 2019 connectionPool.removeSubPubFromMap(subName); 2020 try { 2021 if (islockedTaken) 2022 publication.releaseLOCK(); 2023 } 2024 catch (RemoteException ex2) { 2025 RepConstants.writeERROR_FILE(ex2); 2026 } 2027 connectionPool.returnConnection(subConnection); 2028 2029 } 2030 } 2031 2032 private ArrayList getTableListForSnapShotAfterUpdatingSub() throws RepException { 2033 ResultSet rs = null; 2034 Connection connection = null; 2035 Statement stt = null; 2036 ArrayList tablesForSnapShot = new ArrayList(); 2037 StringBuffer selectTableNames = new StringBuffer(); 2038 selectTableNames.append("select ").append(RepConstants.bookmark_TableName3) 2039 .append(" from ").append(RepConstants.bookmark_TableName).append(" where ") 2040 .append(RepConstants.bookmark_lastSyncId4).append(" = 0 ") 2041 .append(" and ").append(RepConstants.bookmark_ConisderedId5).append(" = 0 "); 2042 try { 2043 connection = connectionPool.getConnection(subName); 2044 stt = connection.createStatement(); 2045 rs = stt.executeQuery(selectTableNames.toString()); 2046 while (rs.next()) { 2047 tablesForSnapShot.add(rs.getString(RepConstants.bookmark_TableName3).toLowerCase()); 2048 } 2049 } 2050 catch (SQLException ex) { 2051 2052 } 2053 catch (RepException ex1) { 2054 throw ex1; 2055 } 2056 finally { 2057 try { 2058 if (rs != null) { 2059 rs.close(); 2060 } 2061 if (stt != null) { 2062 stt.close(); 2063 } 2064 } 2065 catch (SQLException ex2) { 2066 } 2067 connectionPool.returnConnection(connection); 2068 connectionPool.removeSubPubFromMap(subName); 2069 } 2070 return tablesForSnapShot; 2071 } 2072 2073 private void UnsubscribeAndDropTablesDroppedFromPub(Connection 2074 subConnection, ArrayList dropTableList) throws RepException, 2075 RepException, SQLException { 2076 Statement stt = null; 2077 try { 2078 stt = subConnection.createStatement(); 2079 for (int i = 0; i < dropTableList.size(); i++) { 2080 String tableName = dropTableList.get(i).toString(); 2081 //drop triggers,shadow tables,delete from logtable 2082 dbHandler.dropTriggersAndShadowTable(subConnection, tableName,subName); 2083 StringBuffer deleteQuery = new StringBuffer(); 2084 //delete entry from bookmarkTable 2085 deleteQuery.append(" delete from ").append(dbHandler. 2086 getBookMarkTableName()).append( 2087 " where ").append(RepConstants.bookmark_LocalName1).append(" = '").append( 2088 subName).append("'").append(" and ").append(RepConstants.repTable_tableName2). 2089 append(" ='").append(tableName).append("'").append( 2090 " and ").append(RepConstants.bookmark_RemoteName2).append(" ='"). 2091 append(pubName).append("'"); 2092 log.debug(deleteQuery.toString()); 2093 try { 2094 stt.execute(deleteQuery.toString()); 2095 } 2096 catch (SQLException ex1) { 2097 } 2098 2099 StringBuffer deleteRepTableQuery = new StringBuffer(); 2100 //delete entry from bookmarkTable 2101 deleteRepTableQuery.append(" delete from ").append(dbHandler.getRepTableName()) 2102 .append( " where ").append(RepConstants.repTable_pubsubName1).append( " = '").append(subName) 2103 .append( "'").append(" and ").append(RepConstants.repTable_tableName2).append(" ='") 2104 .append(tableName).append("'"); 2105 log.debug(deleteRepTableQuery.toString()); 2106 try { 2107 stt.execute(deleteRepTableQuery.toString()); 2108 } 2109 catch (SQLException ex1) { 2110 throw ex1; 2111 } 2112 2113 //dropping table from database 2114 String dropTable = "Drop table " + tableName; 2115 log.debug("dropTable::" + dropTable); 2116 try { 2117 stt.execute(dropTable); 2118 } 2119 catch (SQLException ex) { 2120 2121 } 2122 } 2123 } 2124 finally { 2125 try { 2126 if (stt != null) 2127 stt.close(); 2128 } 2129 catch (Exception ex) { 2130 } 2131 } 2132 } 2133 2134 private void makeSubscriberTransactionLgFile(String subName,MergeHandler mg,BufferedWriter bw,String replicationType) throws Exception { 2135 if(Utility.createTransactionLogFile) { 2136 String transactionLogURL = PathHandler.getDefaultTransactionLogFilePathForSubscriber(subName); 2137 FileOutputStream fos = new FileOutputStream(transactionLogURL, true); 2138 OutputStreamWriter os = new OutputStreamWriter(fos); 2139 bw = new BufferedWriter(os); 2140 AbstractSynchronize.writeDateInTransactionLogFile(bw); 2141 AbstractSynchronize.writeOperationInTransactionLogFile(bw, mg.insert,mg.update, mg.delete, replicationType); 2142 bw.flush(); 2143 } 2144 } 2145 2146 private boolean isSchemaSupported() { 2147 return dbHandler.isSchemaSupported(); 2148 } 2149 2150 2151 2152 2153 }

