001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 package org.dbreplicator.replication; 021 022 import java.net.*; 023 import java.net.UnknownHostException; 024 import java.rmi.*; 025 import java.rmi.registry.*; 026 import java.rmi.server.*; 027 import java.sql.*; 028 import java.util.*; 029 import org.dbreplicator.replication.zip.ImportedTablesInfo; 030 import org.dbreplicator.graph.DirectedGraph; 031 import org.dbreplicator.replication.DBHandler.AbstractDataBaseHandler; 032 import org.dbreplicator.replication.schedule.ScheduleHandler; 033 import org.apache.log4j.Logger; 034 import java.io.File; 035 import org.apache.log4j.PropertyConfigurator; 036 import java.io.Writer; 037 import java.io.BufferedWriter; 038 import java.io.FileWriter; 039 import java.io.InputStreamReader; 040 import java.io.BufferedReader; 041 import java.io.IOException; 042 import org.dbreplicator.graph.Vertex; 043 import javax.sql.*; 044 045 /** 046 * This is the main class of DBRepliactor, any side whether publisher or 047 * subscriber has to get an instance of this class by the method getInstance and 048 * then connect to the required database by using the method setDataSource. 049 * <p> 050 * The Publisher can create a publication by executing the method: 051 * <p> 052 * createPublication (get an instance of Publication class), 053 * <p> 054 * and the Subscriber can create a subscription by executing the method: 055 * <p> 056 * createSubscription (get an instance of Subscription class). 057 * <p> 058 * After that the control is transferred to the publisher and subscriber. 059 * and they start communicating to each other. It also implements _ReplicationServerImpl interface. So it gives publication's 060 * object when needed remotely(At the time of subscribing, snapshot, synchronize). 061 * 062 */ 063 064 public class ReplicationServer 065 extends UnicastRemoteObject implements _ReplicationServer, _ReplicationServerImpl { 066 067 /** 068 * Replication server name and url 069 */ 070 private String name, url; 071 072 /** 073 * Port on which Replication server is running 074 */ 075 private int port; 076 077 /** 078 * Mapping of connections 079 */ 080 private ConnectionPool connectionPool; 081 082 /** 083 * Mapping of pulications 084 */ 085 private TreeMap pubMap = new TreeMap(String.CASE_INSENSITIVE_ORDER); 086 087 /** 088 * Mapping of subscriptions 089 */ 090 091 public ScheduleHandler scheduleHandler; 092 093 private TreeMap subMap = new TreeMap(String.CASE_INSENSITIVE_ORDER); 094 private Connection defaultConnection; 095 public String driver = "",URL = "",user, 096 password, 097 portNo, 098 databaseName, 099 databaseServerName, 100 dataBaseName, 101 dBPortNo, 102 vendorName; 103 104 105 ArrayList tablesInCycle=null; 106 107 protected static Logger log = Logger.getLogger(ReplicationServer.class.getName()); 108 109 private ReplicationServer(int port0, String url0) throws RemoteException { 110 port = port0; 111 url = url0; 112 } 113 114 /** 115 * This method is responsible for getting the Replication Server instance. 116 * It starts a replication server at the specified port and m/c by starting 117 * the rmi registry over the specified port and m/c and binding this object. 118 * 119 * @param port0 Port no where we want to start the replication server. 120 * @param url0 System's name or ipaddress where to start replication server. 121 * @return rep instance of replication server 122 * @throws RepException if port is busy or m/c name is passed as localhost 123 */ 124 public static ReplicationServer getInstance(int port0, String url0) throws RepException { 125 //initialize and load log4j.properties file 126 initializeLog4j(); 127 if (url0.equalsIgnoreCase("localhost")) { 128 log.error("Url cannot be localhost"); 129 throw new RepException("REP004", null); 130 } 131 ReplicationServer rep; 132 try { 133 String ipadd = null; 134 try { 135 ipadd = InetAddress.getByName(url0).getHostAddress(); 136 /* bjt - listen on all addresses of this machine */ 137 //ipadd = "0.0.0.0"; 138 } 139 catch (UnknownHostException ex) 140 { 141 //ex.printStackTrace(); 142 log.error(ex.getMessage(), ex); 143 throw new RepException("REP004", new Object[]{ex.getMessage()}); 144 } 145 rep = new ReplicationServer(port0, url0); 146 Registry r = LocateRegistry.createRegistry(port0); 147 148 //String url = "rmi://"+ url0 +":"+ Integer.toString(port0)+"/"+url0+"_"+port0; 149 //String url = "rmi://" + ipadd + ":" + Integer.toString(port0) + "/" +ipadd + "_" + port0; 150 String url = "rmi://0.0.0.0:" + Integer.toString(port0) + "/" + url0 + "_" + port0; 151 Naming.rebind(url, rep); 152 log.info("URL " + url + " PORT " + port0); 153 } 154 catch (RemoteException ex) { 155 RepConstants.writeERROR_FILE(ex); 156 throw new RepException("REP001", new Object[] {ex.getMessage()}); 157 } 158 catch (MalformedURLException ex) { 159 RepConstants.writeERROR_FILE(ex); 160 throw new RepException("REP004", new Object[] {ex.getMessage()}); 161 } 162 log.debug("ReplicationServer started"); 163 return (ReplicationServer) rep; 164 } 165 166 /** 167 * This method is called to get the publication's object even after 168 * creating it. It is well implemented in the method getPriPublication. 169 * 170 * @param pubName Name of the publication whose object is required. 171 * @return pub Publication class' object. 172 * @throws RemoteException 173 * @throws RepException 174 */ 175 176 public _Publication getPublication(String pubName) throws RemoteException, RepException { 177 return getPriPublication(pubName); 178 } 179 180 /** 181 * Returns the publication object when called by the subscriber remotely. 182 * 183 * @param pubName 184 * @return Publication Object 185 * @throws RemoteException 186 * @throws RepException 187 */ 188 189 public _PubImpl getRemotePublication(String pubName) throws RemoteException, RepException { 190 return getPriPublication(pubName); 191 } 192 193 /** 194 * This method returns the publication object. It is implementd for the 195 * cases in which for some reason the publication's object can not 196 * be gotten directly, as when the replication server is restarted. This 197 * method searches publications and reptable tables and gathers the required 198 * information to rebuild the publication's object. 199 * 200 * @param pubName Name of the publication 201 * @return pub Publication's object. 202 * @throws RemoteException 203 * @throws RepException 204 */ 205 206 private Publication getPriPublication(String pubName) throws RemoteException, RepException { 207 Publication pub = (Publication) pubMap.get(pubName); 208 //pub will be null if the remote replication server has been restarted 209 if (pub != null) { 210 return pub; 211 } 212 //connectionPool will be null if could not get connection from datasource 213 if (connectionPool == null) { 214 throw new RepException("REP002", null); 215 } 216 //Here it comes if replication server has been restarted 217 Connection con = connectionPool.getConnection(pubName); 218 MetaDataInfo mdis = Utility.getDatabaseMataData(con); 219 ResultSet rs = null; 220 PreparedStatement prStt1 = null; 221 try { 222 //searches in publication table 223 prStt1 = con.prepareStatement(RepConstants.loadPublicationQuery); 224 prStt1.setString(1, pubName); 225 rs = prStt1.executeQuery(); 226 boolean f1 = rs.next(); 227 // if (!rs.next()) { 228 if (!f1) { 229 return null; 230 } 231 String conflictReolver = rs.getString(RepConstants.publication_conflictResolver2); 232 String serverName = rs.getString(RepConstants.publication_serverName3); 233 MetaDataInfo mdi = Utility.getDatabaseMataData(con); 234 AbstractDataBaseHandler dataBaseHandler = Utility.getDatabaseHandler(connectionPool, pubName); 235 //searches in reptable 236 ArrayList tableNames = new ArrayList(); 237 prStt1 = con.prepareStatement(RepConstants.loadRepTableQuery); 238 prStt1.setString(1, pubName); 239 rs = prStt1.executeQuery(); 240 while (rs.next()) { 241 String tableName = rs.getString(RepConstants.repTable_tableName2); 242 String filterClause = rs.getString(RepConstants.repTable_filter_clause3); 243 SchemaQualifiedName sname = new SchemaQualifiedName(mdis, tableName); 244 RepTable repTable = new RepTable(sname, filterClause,RepConstants.publisher); 245 repTable.setServerType(RepConstants.publisher); 246 String createShadowTable = rs.getString(RepConstants.repTable_createshadowtable6); 247 String cyclicDependency = rs.getString(RepConstants.repTable_cyclicdependency7); 248 String rep_CR = rs.getString(RepConstants.repTable_conflict_resolver4); 249 repTable.setConflictResolver(rep_CR == null ? conflictReolver : rep_CR); 250 repTable.setCreateShadowTable(createShadowTable); 251 repTable.setCyclicDependency(cyclicDependency); 252 mdi.setPrimaryColumns(repTable, sname.getSchemaName(),sname.getTableName()); 253 mdi.setForeignKeyColumns(repTable, sname.getSchemaName(),sname.getTableName()); 254 mdi.setAllColumns(repTable, sname.getSchemaName(),sname.getTableName()); 255 dataBaseHandler.setIgnoredColumns(con, pubName, repTable); 256 tableNames.add(repTable); 257 } 258 /** @todo rs has been closed */ 259 rs.close(); 260 try { 261 name = InetAddress.getLocalHost().getHostName() + "_" + port; 262 } 263 catch (UnknownHostException ex) { 264 ex.printStackTrace(); 265 RepConstants.writeERROR_FILE(ex); 266 throw new RepException(ex.getMessage(), null); 267 } 268 pub = new Publication(connectionPool, pubName, name, this); 269 pub.setConflictResolver(conflictReolver); 270 pub.setPublicationTables(tableNames); 271 pubMap.put(pubName, pub); 272 log.info("Publisher name " + pubName + " conflictResolver " +conflictReolver + " TableNames " + tableNames.toString()); 273 return pub; 274 } 275 catch (SQLException ex) { 276 log.debug("Returning Null Publicaiton"); 277 return null; 278 } 279 finally { 280 try { 281 if (prStt1 != null) 282 prStt1.close(); 283 } 284 catch (SQLException ex1) { 285 } 286 connectionPool.returnConnection(con); 287 } 288 } 289 290 /** 291 * It is responsible for getting the default connection with the database. 292 * Default connection implies the connection of the replication server. 293 * 294 * @param driver0 The driver name of the concerned databse. 295 * @param url0 The URL of the cocerned database. 296 * @param user0 The username for the concerned databse. 297 * @param password0 The password for the concerned databse. 298 * @throws RepException In case it can not get connection with 299 * the database. 300 */ 301 302 public void setDataSource(String driver0, String url0, String user0, 303 String password0) throws RepException { 304 try { 305 driver = driver0; 306 URL = url0; 307 connectionPool = new ConnectionPool(url0, driver0, user0, password0); 308 connectionPool.setLocalAddress(url); 309 connectionPool.setLocalPortNo(port); 310 //Gets connection. 311 defaultConnection = connectionPool.getDefaultConnection(); 312 AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection); 313 connectionPool.returnConnection(defaultConnection); 314 scheduleHandler = new ScheduleHandler(this, adh); 315 } 316 catch (RepException ex) { 317 RepConstants.writeERROR_FILE(ex); 318 throw ex; 319 } 320 catch (Exception ex) { 321 RepConstants.writeERROR_FILE(ex); 322 throw new RepException("REP0", new Object[] {ex.getMessage()}); 323 } 324 } 325 326 public void setDataSource(DataSource dataSource) throws RepException { 327 try { 328 DatabaseMetaData metaData = dataSource.getConnection().getMetaData(); 329 driver = metaData.getDriverName(); 330 URL = metaData.getURL(); 331 connectionPool = new ConnectionPool(dataSource); 332 connectionPool.setLocalAddress(url); 333 connectionPool.setLocalPortNo(port); 334 //Gets connection. 335 defaultConnection = connectionPool.getDefaultConnection(); 336 AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection); 337 connectionPool.returnConnection(defaultConnection); 338 scheduleHandler = new ScheduleHandler(this, adh); 339 } 340 catch (RepException ex) { 341 RepConstants.writeERROR_FILE(ex); 342 throw ex; 343 } 344 catch (Exception ex) { 345 RepConstants.writeERROR_FILE(ex); 346 throw new RepException("REP0", new Object[] {ex.getMessage()}); 347 } 348 } 349 350 public void setDataSource(DataSource dataSource, String username, String password) throws RepException { 351 try { 352 DatabaseMetaData metaData = dataSource.getConnection().getMetaData(); 353 driver = metaData.getDriverName(); 354 URL = metaData.getURL(); 355 connectionPool = new ConnectionPool(dataSource, username, password); 356 connectionPool.setLocalAddress(url); 357 connectionPool.setLocalPortNo(port); 358 //Gets connection. 359 defaultConnection = connectionPool.getDefaultConnection(); 360 AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection); 361 connectionPool.returnConnection(defaultConnection); 362 scheduleHandler = new ScheduleHandler(this, adh); 363 } 364 catch (RepException ex) { 365 RepConstants.writeERROR_FILE(ex); 366 throw ex; 367 } 368 catch (Exception ex) { 369 RepConstants.writeERROR_FILE(ex); 370 throw new RepException("REP0", new Object[] {ex.getMessage()}); 371 } 372 } 373 374 375 public void setDataSource(String dataBaseName0,String user0,String password0, String dBPortNo0, String databaseServerName0, String vendorName0) throws RepException { 376 dataBaseName =dataBaseName0; 377 user =user0; 378 password =password0; 379 dBPortNo =dBPortNo0; 380 databaseServerName =databaseServerName0; 381 vendorName =vendorName0; 382 try { 383 DBDataSource dbs = new DBDataSource(dataBaseName0, user0, password0,databaseServerName0, dBPortNo0,vendorName0); 384 DataSource datasource = dbs.getDataSource(); 385 defaultConnection = datasource.getConnection(user0,password0); 386 connectionPool = new ConnectionPool(datasource, user0, password0); 387 // defaultConnection = connectionPool.getDefaultConnection(); 388 AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection); 389 scheduleHandler = new ScheduleHandler(this, adh); 390 } 391 catch (SQLException ex) { 392 RepConstants.writeERROR_FILE(ex); 393 throw new RepException("REP0", new Object[] {ex.getMessage()}); 394 } 395 catch (RepException ex) { 396 throw ex; 397 } 398 } 399 400 /** 401 * Checks the following : 402 * <p> 403 * Publication alredy exists. 404 * <p> 405 * Specified Table(s) exists in database. 406 * <p> 407 * Each table must have primary key. 408 * <p> 409 * Sequence of tables specified ( Primary Table than Foreign Table ) 410 * <p> 411 * Then, Create the instance of Publication, set the instance of RepTable [] to pub 412 * <p> 413 * Put it in Map 414 * 415 * @param pubName Name of the Publication 416 * @param tableNames Names of the tables to be published. 417 * @return pub1 Publication class' object. 418 * @throws RepException In case of violation of above checked conditions. 419 */ 420 421 public _Publication createPublication(String pubName, String[] tableNames) throws RepException { 422 log.info("Create Publication " + pubName); 423 return createPublication(pubName,tableNames,null); 424 } 425 426 /** 427 * Check the Dublicacy of tables in given list 428 * If dublicate table name occure then it through 429 * the RepException because it further create the 430 * problemes in Replication process. 431 * @param pubTables ArrayList 432 * @param sname SchemaQualifiedName 433 * @param pubName String 434 * @throws RepException 435 */ 436 private void checkTablesDuplicyInPublication(ArrayList pubTables, SchemaQualifiedName sname, String pubName) throws RepException { 437 if (pubTables.size() > 0) { 438 for (int j = 0; j < pubTables.size(); j++) { 439 RepTable repTable = (RepTable) pubTables.get(j); 440 String sqname = repTable.getSchemaQualifiedName().toString(); 441 log.debug("Checking Table " + sqname +" if already existing in database"); 442 if (sqname.equals(sname.toString())) { 443 throw new RepException("REP013", new Object[] {sname.toString(),pubName}); 444 } 445 } 446 } 447 } 448 449 public _Publication createPublicationBK(String pubName, String[] tableNames) throws RepException { 450 //If database is not binded 451 if (connectionPool == null) { 452 throw new RepException("REP002", null); 453 } 454 //Checks in publication table in database 455 456 if (checkPublicationExistance(pubName)) { 457 //if(pubMap.containsKey(pubName)) 458 throw new RepException("REP014", new Object[] {pubName}); 459 } 460 //Checks the properness of given table names 461 if (tableNames == null || tableNames.length == 0) { 462 throw new RepException("REP012", new Object[] {pubName}); 463 } 464 465 TreeMap repTableMap = new TreeMap(String.CASE_INSENSITIVE_ORDER); 466 ArrayList pubTableList = new ArrayList(); 467 Connection connection = null; 468 469 try { 470 471 connection = connectionPool.getConnection(pubName); 472 MetaDataInfo mdi = Utility.getDatabaseMataData(connection); 473 for (int i = 0; i < tableNames.length; i++) { 474 //Converts Table names to schema qualified name. 475 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[i]); 476 if (repTableMap.containsKey(sname.toString())) { 477 throw new RepException("REP013", new Object[] {sname.toString(),pubName}); 478 } 479 String schema = sname.getSchemaName(); 480 String table = sname.getTableName(); 481 //checks whether the table exists in the database or not. 482 mdi.checkTableExistance(sname); 483 schema = sname.getSchemaName(); 484 RepTable repTable = new RepTable(sname, RepConstants.publisher); 485 //repTable.setConflictResolver(RepConstants.publisher_wins); 486 //Sets Sorted primary key columns in repTable. 487 mdi.setPrimaryColumns(repTable, schema, table); 488 mdi.setForeignKeyColumns(repTable, schema, table); 489 //Checks for First Primary key then foreign key tables else exception. 490 mdi.checkTableSequenceAccordingForeignKey(schema, table, pubTableList); 491 pubTableList.add(sname.toString().toUpperCase()); 492 repTableMap.put(sname.toString(), repTable); 493 } 494 //name = InetAddress.getLocalHost().getHostName()+"_"+pubName+"_"+port; 495 name = InetAddress.getLocalHost().getHostName() + "_" + port; 496 //Creates publication by setting it's DataBaseHandler & XMLCreator 497 Publication publ = new Publication(connectionPool, pubName, name, this); 498 //Sets publication's repTable elements 499 publ.setPublicationTables(new ArrayList(repTableMap.values())); 500 pubMap.put(pubName, publ); 501 return publ; 502 } 503 catch (RepException ex) { 504 throw ex; 505 } 506 catch (Exception ex) { 507 log.error(ex.getMessage(), ex); 508 throw new RepException("REP031", new Object[] {pubName, ex.getMessage()}); 509 } 510 finally { 511 connectionPool.returnConnection(connection); 512 } 513 } 514 515 /** 516 * This method returns the subscription's object, It is implemented for the 517 * cases in which for some reason the subscription's object can not 518 * be gotten directly, As when the replication server is restarted. This 519 * method searches subscriptions and reptable tables and gathers the required 520 * information to rebuild the subscription's object. 521 * 522 * @param subName Subscription's name 523 * @return sub Subscription class' object 524 * @throws RepException If could not set up connection with the database. 525 */ 526 527 public _Subscription getSubscription(String subName) throws RepException { 528 Subscription sub = (Subscription) subMap.get(subName); 529 if (sub != null) { 530 return sub; 531 } 532 if (connectionPool == null) { 533 throw new RepException("REP002", null); 534 } 535 AbstractDataBaseHandler dataBaseHandler = Utility.getDatabaseHandler(connectionPool, subName); 536 //Here it comes if the replication server has been restarted. 537 Connection con = null; 538 PreparedStatement prStt1 = null; 539 ResultSet rs = null; 540 try { 541 con = connectionPool.getFreshConnection(subName); 542 MetaDataInfo mds = Utility.getDatabaseMataData(con); 543 prStt1 = con.prepareStatement(RepConstants.loadSubscriptionQuery); 544 prStt1.setString(1, subName); 545 rs = prStt1.executeQuery(); 546 if (!rs.next()) { 547 return null; 548 } 549 550 //Searches in Subscription table. 551 String pubName = rs.getString(RepConstants.subscription_pubName2); 552 String conflictReolver = rs.getString(RepConstants.subscription_conflictResolver3); 553 String serverName = rs.getString(RepConstants.subscription_serverName4); 554 ArrayList tableNames = new ArrayList(); 555 556 //Searches in the Rep Table 557 prStt1 = con.prepareStatement(RepConstants.loadRepTableQuery); 558 prStt1.setString(1, subName); 559 rs = prStt1.executeQuery(); 560 if (!rs.next()) { 561 return null; 562 } 563 564 sub = new Subscription(connectionPool, subName, serverName, this); 565 ArrayList repTables = new ArrayList(); 566 do { 567 String tableName = rs.getString(RepConstants.repTable_tableName2); 568 String filterClause = rs.getString(RepConstants.repTable_filter_clause3); 569 String createShadowTable = rs.getString(RepConstants.repTable_createshadowtable6); 570 String cyclicDependency = rs.getString(RepConstants.repTable_cyclicdependency7); 571 SchemaQualifiedName sname = new SchemaQualifiedName(mds, tableName); 572 RepTable repTable = new RepTable(sname, filterClause, RepConstants.subscriber); 573 repTable.setConflictResolver(conflictReolver); 574 repTable.setServerType(RepConstants.subscriber); 575 repTable.setCreateShadowTable(createShadowTable); 576 repTable.setCyclicDependency(cyclicDependency); 577 mds.setPrimaryColumns(repTable, sname.getSchemaName(), sname.getTableName()); 578 mds.setForeignKeyColumns(repTable, sname.getSchemaName(), sname.getTableName()); 579 mds.setAllColumns(repTable, sname.getSchemaName(),sname.getTableName()); 580 dataBaseHandler.setIgnoredColumns(con, subName,repTable); 581 repTables.add(repTable); 582 } 583 while (rs.next()); 584 sub.setPublicatonName(pubName); 585 sub.setSubscriptionTables(repTables); 586 subMap.put(subName, sub); 587 log.info("Subscriber name " + subName + " Publication Name " + pubName); 588 return sub; 589 } 590 catch (SQLException ex) { 591 RepConstants.writeERROR_FILE(ex); 592 return null; 593 } 594 catch (RemoteException ex) { 595 log.error(ex.getMessage(), ex); 596 throw new RepException(ex.getMessage(), null); 597 } 598 finally { 599 try { 600 if (rs != null) { 601 rs.close(); 602 prStt1.close(); 603 } 604 } 605 catch (SQLException ex1) { 606 } 607 connectionPool.returnConnection(con); 608 } 609 } 610 611 /** 612 * Checks whether a subscription with the same name already exists 613 * in Subscription Table. Creates the subscription's object, 614 * then adds an entry in the subscription Map. 615 * 616 * @param subName Name of the subscription. 617 * @param pubName Name of the corresponding publication. 618 * @return sub subscription class' object. 619 * @throws RepException If the above check violates. 620 */ 621 622 public _Subscription createSubscription(String subName, String pubName) throws RepException { 623 log.debug("Create subscription " + subName + " with publication " + pubName); 624 625 //Checks if the subscription with the same name already exists in Subscription Table 626 if (checkSubscriptionExistance(subName)) { 627 throw new RepException("REP023", new Object[] {subName}); 628 } 629 try { 630 name = InetAddress.getLocalHost().getHostName() + "_" + port; 631 Subscription sub = new Subscription(connectionPool, subName, name, this); 632 sub.setPublicatonName(pubName); 633 subMap.put(subName, sub); 634 log.info("created subscription " + subName + " with publication " +pubName); 635 return sub; 636 } 637 catch (RepException ex) { 638 log.error(ex.getMessage(), ex); 639 RepConstants.writeERROR_FILE(ex); 640 throw ex; 641 } 642 catch (Exception ex) { 643 log.error(ex.getMessage(), ex); 644 throw new RepException("REP042", new Object[] {subName, ex.getMessage()}); 645 } 646 } 647 648 public Connection getConnection(String pub_sub_Name) throws RepException { 649 return connectionPool.getConnection(pub_sub_Name); 650 } 651 652 public Connection getDefaultConnection() { 653 return defaultConnection; 654 } 655 656 /** 657 * Searches into the publications table for the publication name. 658 * 659 * @param pubName 660 * @return true if found, else false. 661 * @throws RepException 662 */ 663 664 private boolean checkPublicationExistance(String pubName) throws RepException { 665 AbstractDataBaseHandler dbh = Utility.getDatabaseHandler(connectionPool,pubName); 666 String pubs = " Select " + RepConstants.publication_pubName1 + 667 " from " + dbh.getPublicationTableName() + " where " + 668 RepConstants.publication_pubName1 + " = '" + pubName + "'"; 669 Statement stt = null; 670 try { 671 stt = defaultConnection.createStatement(); 672 ResultSet rsPubs = stt.executeQuery(pubs); 673 log.debug("Query checkPublicationExistance " + pubs); 674 return rsPubs.next(); 675 } 676 catch (SQLException ex) { 677 // Ignore the Exception 678 } 679 finally { 680 try { 681 if (stt != null) 682 stt.close(); 683 } 684 catch (SQLException ex1) { 685 // Ignore the Exception 686 } 687 } 688 return false; 689 } 690 691 /** 692 * Searches the subscriptions table for the subscription name. 693 * 694 * @param subName 695 * @return true if found, else false. 696 * @throws RepException 697 */ 698 699 private boolean checkSubscriptionExistance(String subName) throws RepException { 700 AbstractDataBaseHandler dbh = Utility.getDatabaseHandler(connectionPool,subName); 701 String pubs = " Select " + RepConstants.subscription_subName1 702 + " from " + dbh.getSubscriptionTableName() + " where " + 703 RepConstants.subscription_subName1 + " = '" + subName + "'"; 704 Statement stt = null; 705 try { 706 stt = defaultConnection.createStatement(); 707 ResultSet rsPubs = stt.executeQuery(pubs); 708 log.debug("Query checkSubscriptionExistence " + pubs); 709 return rsPubs.next(); 710 } 711 catch (SQLException ex) { 712 // Ignore the Exception 713 } 714 finally { 715 try { 716 if (stt != null) 717 stt.close(); 718 } 719 catch (SQLException ex1) { 720 // Ignore the Exception 721 } 722 } 723 return false; 724 } 725 726 public void refershPublication(String pubName) { 727 pubMap.remove(pubName); 728 } 729 730 public void refershSubscription(String subName) { 731 subMap.remove(subName); 732 } 733 734 public String getServerName() throws RemoteException { 735 return name; 736 } 737 738 /** 739 * This method returns the scheduleHandler instance which is used during 740 * add,edit and remove Schedule methods in subscription class. 741 * 742 * @return ScheduleHandler 743 */ 744 745 public ScheduleHandler getScheduleHandler() { 746 return scheduleHandler; 747 } 748 749 /** 750 * To add more tables to a Publication 751 * <p> 752 * Checks the following : 753 * <p> 754 * Publication exists or not. 755 * <p> 756 * Specified Table(s) exists in database. 757 * <p> 758 * Each table must have primary key. 759 * <p> 760 * Sequence of tables specified ( Primary Table than Foreign Table ) 761 * 762 * @param pubName Name of the Publication 763 * @param tableNames0 Names of the tables to be published. 764 * @param filterClauses Filters on the published tables. 765 * @param pubRepTables ArrayList of tables already published. 766 * @param pub publication object. 767 * @throws RepException In case of violation of above checked conditions. 768 */ 769 770 public void addTableToPublication(String pubName, String[] tableNames0, 771 String[] filterClauses, 772 ArrayList pubRepTables, 773 Publication pub) throws RepException { 774 Publication pub1 = pub; 775 //If database is not binded 776 if (connectionPool == null) { 777 throw new RepException("REP002", null); 778 } 779 //Checks in publication table in database 780 781 if (!checkPublicationExistance(pubName)) { 782 //if(!pubMap.containsKey(pubName)) 783 throw new RepException("REP036", new Object[] {pubName}); 784 } 785 //Checks the properness of given table names 786 if (tableNames0 == null || tableNames0.length == 0) { 787 throw new RepException("REP012", new Object[] {pubName}); 788 } 789 ArrayList pubTableList = new ArrayList(); 790 ArrayList repTableList = new ArrayList(); 791 ArrayList existingPubTableList = new ArrayList(); 792 ArrayList newRepTableList = new ArrayList(); 793 String conflictResolver = null; 794 Connection connection = null; 795 try { 796 conflictResolver = pub1.getConflictResolver(); 797 798 //get already existing tables in publisher 799 for (int i = 0; i < pubRepTables.size(); i++) { 800 RepTable repTable = (RepTable) pubRepTables.get(i); 801 existingPubTableList.add(repTable.getSchemaQualifiedName().toString()); 802 // System.out.println("existingPubTableList " +repTable.getSchemaQualifiedName().toString()); 803 } 804 connection = connectionPool.getConnection(pubName); 805 MetaDataInfo mdi = Utility.getDatabaseMataData(connection); 806 String[] tableNames = mdi.getTablesHierarchy(tableNames0); 807 808 // initialising newRepTableList with new tables in publisher 809 for (int j = 0; j < tableNames.length; j++) { 810 if (tableNames[j].equalsIgnoreCase("")) { 811 throw new RepException("REP317", null); 812 } 813 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[j]); 814 mdi.checkTableExistance(sname); 815 if (existingPubTableList.contains(sname.toString())) { 816 throw new RepException("REP310", new Object[] {tableNames[j]}); 817 } 818 else 819 existingPubTableList.add(sname.toString()); 820 // System.out.println("new tables adding to existing " + sname.toString()); 821 } 822 823 String[] allTables = new String[existingPubTableList.size()]; 824 existingPubTableList.toArray( (Object[]) allTables); 825 allTables = mdi.getTablesHierarchy(allTables); 826 827 for (int i = 0; i < allTables.length; i++) { 828 //Converts Table names to schema qualified name. 829 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, allTables[i]); 830 checkTablesDuplicyInPublication(repTableList, sname, pubName); 831 String schema = sname.getSchemaName(); 832 String table = sname.getTableName(); 833 //checks whether the table exists in the database or not. 834 mdi.checkTableExistance(sname); 835 //Checks for First Primary key then foreign key tables else exception. 836 mdi.checkTableSequenceAccordingForeignKey(schema, table, pubTableList); 837 838 RepTable repTable = new RepTable(sname, RepConstants.publisher); 839 //Sets Sorted primary key columns in repTable. 840 mdi.setPrimaryColumns(repTable, schema, table); 841 842 //we will call mdi.setAllColumns in publication.addTableToPublication 843 844 repTable.setConflictResolver(conflictResolver); 845 try { 846 //System.out.println("filter clause for " + sname + " = " +pub1.getFilterClause(sname)); 847 repTable.setFilterClause(pub1.getFilterClause(sname)); 848 } 849 catch (RemoteException ex1) { 850 ex1.getMessage(); 851 //ignore the exception in case of new tables 852 } 853 pubTableList.add(sname.toString().toUpperCase()); 854 repTableList.add(repTable); 855 //System.out.println("repTableList " + repTableList.get(i)); 856 } 857 pub.setPublicationTables(repTableList); 858 //tableNames0-without hierarchy set 859 for (int i = 0; i < tableNames0.length; i++) { 860 if (filterClauses[i] != null) 861 pub.setFilter(tableNames0[i], filterClauses[i]); 862 } 863 } 864 catch (RepException ex) { 865 // ex.printStackTrace(); 866 throw ex; 867 } 868 catch (Exception ex) { 869 // ex.printStackTrace(); 870 throw new RepException("REP031", new Object[] {pubName, ex.getMessage()}); 871 } 872 finally { 873 connectionPool.returnConnection(connection); 874 } 875 } 876 877 /** 878 * To drop tables from a Publication 879 * <p> 880 * Checks the following : 881 * <p> 882 * Publication exists or not. 883 * <p> 884 * Specified Table(s) exists in database. 885 * <p> 886 * Each table must have primary key. 887 * <p> 888 * Sequence of tables specified ( Primary Table than Foreign Table ) 889 * 890 * @param pubName Name of the Publication 891 * @param tableNames Names of the tables to be published. 892 * @return pub1 Publication class' object. 893 * @throws RepException In case of violation of above checked conditions. 894 */ 895 896 public ArrayList dropTableFromPublication(String pubName, String[] tableNames, 897 Publication pub, ArrayList pubRepTables) throws RepException { 898 //If database is not binded 899 if (connectionPool == null) { 900 throw new RepException("REP002", null); 901 } 902 //Checks in publication table in database 903 904 if (!checkPublicationExistance(pubName)) { 905 //if(!pubMap.containsKey(pubName)) 906 throw new RepException("REP036", new Object[] {pubName}); 907 } 908 //Checks the properness of given table names 909 if (tableNames == null || tableNames.length == 0) { 910 throw new RepException("REP012", new Object[] {pubName}); 911 } 912 ArrayList newPubRepTableList = new ArrayList(); 913 Connection connection = null; 914 915 try { 916 connection = connectionPool.getConnection(pubName); 917 MetaDataInfo mdi = Utility.getDatabaseMataData(connection); 918 mdi.checkChildTableIncludedInDropTableList(pubRepTables,tableNames); 919 tableNames = mdi.getTablesHierarchy(tableNames); 920 int noOfAllowedDropTables = 0; 921 // initialising newRepTableList with tables to be dropped from publisher 922 for (int j = 0; j < tableNames.length; j++) { 923 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[j]); 924 String schema = sname.getSchemaName(); 925 String table = sname.getTableName(); 926 mdi.checkTableExistance(sname); 927 RepTable repTable = new RepTable(sname, RepConstants.publisher); 928 noOfAllowedDropTables = checkTableExistanceInPublication(noOfAllowedDropTables, pubRepTables,repTable.getSchemaQualifiedName(), pubName); 929 repTable.setFilterClause(pub.getFilterClause(sname)); 930 repTable.setConflictResolver(pub.getConflictResolver()); 931 mdi.setPrimaryColumns(repTable, schema, table); 932 mdi.setAllColumns(repTable, sname.getSchemaName(),sname.getTableName()); 933 newPubRepTableList.add(repTable); 934 } 935 return newPubRepTableList; 936 } 937 catch (RepException ex) { 938 throw ex; 939 } 940 catch (Exception ex) { 941 throw new RepException("REP031", new Object[] {pubName, ex.getMessage()}); 942 } 943 finally { 944 connectionPool.returnConnection(connection); 945 } 946 } 947 948 949 //check table to be dropped if published or not 950 // check if table to be dropped is the only table published ,if yes then throw Exception 951 //check if user is trying to drop all the tables,if yes throw Exception 952 //if not published throw exception 953 954 private int checkTableExistanceInPublication(int noOfAllowedDropTables, 955 ArrayList pubTables,SchemaQualifiedName sname,String pubName) throws RepException { 956 if (pubTables.size() > 0) { 957 boolean flag = false; 958 for (int j = 0; j < pubTables.size(); j++) { 959 RepTable repTable = (RepTable) pubTables.get(j); 960 String sqname = repTable.getSchemaQualifiedName().toString(); 961 if (sqname.equals(sname.toString())) { 962 flag = true; 963 noOfAllowedDropTables++; 964 } 965 // check if table to be dropped is the only table published ,if yes then throw Exception 966 if (pubTables.size() == 1 && flag == true) { 967 throw new RepException("REP314", new Object[] {sname.toString(),pubName}); 968 } 969 //check if user is trying to drop all the tables,if yes throw Exception 970 if (pubTables.size() == noOfAllowedDropTables) { 971 throw new RepException("REP315", new Object[] {pubName}); 972 } 973 } 974 //check table to be dropped if published or not 975 if (!flag) { 976 throw new RepException("REP313", new Object[] {sname.toString(),pubName}); 977 } 978 } 979 return noOfAllowedDropTables; 980 } 981 982 private static void initializeLog4j() { 983 try { 984 File f1 = new File("." + File.separator + "log4j.properties"); 985 if (!f1.exists()) { 986 f1.createNewFile(); 987 Writer output = null; 988 if (f1.canWrite()) { 989 output = new BufferedWriter(new FileWriter(f1)); 990 output.write("log4j.rootCategory=Off"); 991 output.write("\n"); 992 output.write("# log4j.rootCategory=DEBUG,A1"); 993 output.write("\n"); 994 output.write("log4j.appender.A1=org.apache.log4j.ConsoleAppender"); 995 output.write("\n"); 996 output.write("log4j.appender.A1.layout=org.apache.log4j.PatternLayout"); 997 output.write("\n"); 998 output.write("log4j.appender.A1.layout.ConversionPattern=%d{ABSOLUTE} %p [%c{1}] [%M %L] %m%n"); 999 } 1000 if (output != null) 1001 output.close(); 1002 } 1003 PropertyConfigurator.configureAndWatch("." + File.separator +"log4j.properties"); 1004 } 1005 catch (Exception ex) { 1006 RepConstants.writeERROR_FILE(ex); 1007 } 1008 } 1009 1010 1011 /** 1012 * createPublication 1013 * 1014 * @param pubName String 1015 * @param tableNames String[] 1016 * @param removeCycleTableNames String[] 1017 * @return _Publication 1018 */ 1019 public _Publication createPublication(String pubName, String[] tableNames, 1020 String[] removeCycleTableNames) throws RepException { 1021 //If database is not binded 1022 if (connectionPool == null) { 1023 throw new RepException("REP002", null); 1024 } 1025 1026 //Checks in publication table in database 1027 if (checkPublicationExistance(pubName)) { 1028 //if(pubMap.containsKey(pubName)) 1029 throw new RepException("REP014", new Object[] {pubName}); 1030 } 1031 1032 //Checks the properness of given table names 1033 if (tableNames == null || tableNames.length == 0) { 1034 throw new RepException("REP012", new Object[] {pubName}); 1035 } 1036 1037 //repTableList is a list of all reptable that are created 1038 //corresponding to which is included in publication. 1039 ArrayList repTableList = new ArrayList(); 1040 Connection connection = null; 1041 try { 1042 // pubTableList contain all tables that are included in publication 1043 ArrayList pubTableList = new ArrayList(); 1044 connection = connectionPool.getConnection(pubName); 1045 MetaDataInfo mdi = Utility.getDatabaseMataData(connection); 1046 SchemaQualifiedName[] schemaQualifiedNames = new SchemaQualifiedName[tableNames.length]; 1047 DirectedGraph directedGraph = new DirectedGraph(tableNames.length); 1048 1049 for (int i = 0; i < tableNames.length; i++) { 1050 //Converts Table names to schema qualified name. 1051 //log.debug("tablenames as passed tablename #" + i + " = " + tableNames[i]); 1052 SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[i]); 1053 //log.debug("sname for #" + i + " = " + sname.getTableName()); 1054 checkTablesDuplicyInPublication(repTableList, sname, pubName); 1055 String schema = sname.getSchemaName(); 1056 String table = sname.getTableName(); 1057 1058 //checks whether the table exists in the database or not. 1059 mdi.checkTableExistance(sname); 1060 schema = sname.getSchemaName(); 1061 RepTable repTable = new RepTable(sname, RepConstants.publisher); 1062 1063 //repTable.setConflictResolver(RepConstants.publisher_wins); 1064 //Sets Sorted primary key columns in repTable. 1065 mdi.setPrimaryColumns(repTable, schema, table); 1066 mdi.setForeignKeyColumns(repTable, schema, table); 1067 1068 //we are not calling mdi.setAllColumn(),we are doing this while publishing 1069 pubTableList.add(sname.toString()); 1070 directedGraph.addVertex(sname); 1071 repTableList.add(repTable); 1072 schemaQualifiedNames[i] = sname; 1073 } 1074 Map importedTableInfoMap = mdi.getImportedTablesInfo(schemaQualifiedNames,directedGraph,removeCycleTableNames); 1075 if (directedGraph.hasCycle()) { 1076 tablesInCycle = directedGraph.TablesInCycle(); 1077 throw new RepException("REP0205",new Object[]{tablesInCycle}); 1078 } 1079 1080 List repTablesNamesOrderdAccToHierarcy=Arrays.asList(directedGraph.topologicalSort()); 1081 //System.out.println("Ordered Tables :: " +repTablesNamesOrderdAccToHierarcy); 1082 int noOfRepTables = repTablesNamesOrderdAccToHierarcy.size(); 1083 RepTable[] tempRep = new RepTable[noOfRepTables]; 1084 for (int i = 0; i < noOfRepTables; i++) { 1085 RepTable repTable = (RepTable) repTableList.get(i); 1086 SchemaQualifiedName unOrderedschemaQualifiedName = repTable.getSchemaQualifiedName(); 1087 int orderedIndex = repTablesNamesOrderdAccToHierarcy.indexOf(unOrderedschemaQualifiedName); 1088 tempRep[orderedIndex] = repTable; 1089 } 1090 // System.out.println("repTablesOrderdAccToHierarcy::"); 1091 ArrayList repTables = new ArrayList(noOfRepTables); 1092 for (int i = noOfRepTables - 1; i >= 0; i--) { 1093 repTables.add(tempRep[i]); 1094 // System.out.println("tempRep[i] ::" + tempRep[i]); 1095 } 1096 //name = InetAddress.getLocalHost().getHostName()+"_"+pubName+"_"+port; 1097 name = InetAddress.getLocalHost().getHostName() + "_" + port; 1098 //Creates publication by setting it's DataBaseHandler & XMLCreator 1099 Publication publ = new Publication(connectionPool, pubName, name, this); 1100 //Sets publication's repTable elements 1101 //System.out.println(" ACTUAL NO of repTables :: " + repTables.size()); 1102 publ.setPublicationTables(repTables); 1103 publ.setCyclic(removeCycleTableNames != null ? removeCycleTableNames.length == 0 ? false : true : false); 1104 pubMap.put(pubName, publ); 1105 return publ; 1106 } 1107 catch (RepException ex) { 1108 throw ex; 1109 } 1110 catch (Exception ex) { 1111 log.error(ex.getMessage(), ex); 1112 RepConstants.writeERROR_FILE(ex); 1113 throw new RepException("REP039", new Object[] {ex.getMessage()}); 1114 } 1115 finally { 1116 connectionPool.returnConnection(connection); 1117 } 1118 } 1119 //Return ArrayList having table having cycles 1120 // this Arraylist needed if working through GUI 1121 public ArrayList getTablesInCycle(){ 1122 ArrayList tablesRelatedInCycle=new ArrayList(); 1123 int listSize=tablesInCycle.size(); 1124 for (int i = 0; i < listSize; i++) { 1125 if(i!=listSize-1){ 1126 tablesRelatedInCycle.add(tablesInCycle.get(i) + "-" + 1127 tablesInCycle.get(i + 1)); 1128 } 1129 } 1130 return tablesRelatedInCycle; 1131 } 1132 1133 1134 1135 public void shutdown() throws RepException 1136 { 1137 try 1138 { 1139 if (log.isDebugEnabled()) 1140 { 1141 log.debug("Daffodil Replicator Server shutdown called"); 1142 } 1143 1144 String ipAddr = null; 1145 1146 try 1147 { 1148 ipAddr = InetAddress.getByName(url).getHostAddress(); 1149 } 1150 catch (UnknownHostException ex) 1151 { 1152 log.error(ex.getMessage(), ex); 1153 throw new RepException("REP004", new Object[]{ex.getMessage()}); 1154 } 1155 1156 //String url = "rmi://" + ipAddr + ":" + Integer.toString(port) + "/" +ipAddr + "_" + port; 1157 String url = "rmi://0.0.0.0:" + Integer.toString(port) + "/" + ipAddr + "_" + port; 1158 1159 Naming.unbind(url); 1160 1161 if (log.isDebugEnabled()) 1162 { 1163 log.debug("Daffodil Replicator Server shutdown finished"); 1164 } 1165 } 1166 catch (Exception ex) 1167 { 1168 // seems the server is not running which is ok because we wanted to shut it down anyway 1169 } 1170 } 1171 1172 /** 1173 * getRemoteAddress 1174 * 1175 * @return String 1176 */ 1177 public String getRemoteAddress() throws RepException { 1178 try { 1179 //return InetAddress.getLocalHost().getHostAddress(); 1180 return InetAddress.getLocalHost().getHostName(); 1181 } 1182 catch (UnknownHostException ex) { 1183 return null; 1184 } 1185 } 1186 }

