From 3a83e1c2dd183fbbdedab1f1d9d1247fd5966c66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E5=AE=B6=E7=A6=BE?= <1449854570@qq.com> Date: Mon, 16 Jun 2025 14:13:45 +0800 Subject: [PATCH] =?UTF-8?q?Kafka=E7=94=9F=E4=BA=A7=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 92 ++ .../com/example/kafka/KafkaProducerApp.java | 37 + .../com/example/kafka/OrderDataGenerator.java | 69 ++ .../apache/hadoop/io/nativeio/NativeIO.java | 934 ++++++++++++++++++ src/main/resources/log4j.properties | 10 + 5 files changed, 1142 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/example/kafka/KafkaProducerApp.java create mode 100644 src/main/java/com/example/kafka/OrderDataGenerator.java create mode 100644 src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java create mode 100644 src/main/resources/log4j.properties diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..66598bc --- /dev/null +++ b/pom.xml @@ -0,0 +1,92 @@ + + 4.0.0 + + com.example + OrderInformationData + 1.0-SNAPSHOT + jar + + OrderInformationData + http://maven.apache.org + + + + 8 + 8 + UTF-8 + 2.13.0 + 3.4.3 + 3.3.0 + UTF-8 + + + + + org.apache.kafka + kafka-clients + 3.0.2 + + + + + org.apache.spark + spark-core_2.13 + ${spark.version} + + + + + org.apache.spark + spark-streaming_2.13 + ${spark.version} + + + + + org.apache.spark + spark-sql-kafka-0-10_2.13 + ${spark.version} + + + + + org.apache.spark + spark-sql_2.13 + ${spark.version} + + + + org.apache.spark + spark-mllib_2.13 + ${spark.version} + compile + + + + mysql + mysql-connector-java + 8.0.22 + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + \ No newline at end of file diff --git a/src/main/java/com/example/kafka/KafkaProducerApp.java b/src/main/java/com/example/kafka/KafkaProducerApp.java new file mode 100644 index 0000000..5f429c2 --- /dev/null +++ b/src/main/java/com/example/kafka/KafkaProducerApp.java @@ -0,0 +1,37 @@ +package com.example.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; + +public class KafkaProducerApp { + + public static void main(String[] args) { + String bootstrapServers = "172.16.5.3:2181"; // 替换远程 Kafka 地址 + String topic = "orders"; + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Producer producer = new KafkaProducer<>(props); + + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + String message = OrderDataGenerator.generateOrder(); + ProducerRecord record = new ProducerRecord<>(topic, message); + producer.send(record); + System.out.println("Sent to Kafka: " + message); + } + }, 0, 5000); // 每隔5秒发送一次 + } +} diff --git a/src/main/java/com/example/kafka/OrderDataGenerator.java b/src/main/java/com/example/kafka/OrderDataGenerator.java new file mode 100644 index 0000000..04330b0 --- /dev/null +++ b/src/main/java/com/example/kafka/OrderDataGenerator.java @@ -0,0 +1,69 @@ +package com.example.kafka; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.HashSet; +import java.util.Set; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.HashSet; +import java.util.Set; + +public class OrderDataGenerator { + + // 订单类别定义 + private static final String[] CATEGORIES = {"电器", "服饰", "食品", "玩具", "手机"}; + // 每个类别对应的订单名称 + private static final String[][] PRODUCT_NAMES = { + {"电视", "冰箱", "洗衣机", "空调", "吸尘器", "电饭煲", "微波炉", "电磁炉", "热水器", "空气净化器"}, // 电器 + {"T恤", "牛仔裤", "羽绒服", "衬衫", "运动鞋", "夹克", "卫衣", "连衣裙", "短裤", "风衣"}, // 服饰 + {"巧克力", "饼干", "方便面", "牛奶", "饮料", "面包", "糖果", "果冻", "薯片", "蛋糕"}, // 食品 + {"积木", "拼图", "玩偶", "遥控车", "毛绒玩具", "魔方", "乐高", "变形金刚", "洋娃娃", "电子琴"}, // 玩具 + {"华为", "苹果", "小米", "OPPO", "vivo", "荣耀", "三星", "魅族", "联想", "努比亚"} // 手机 + }; + + private static final Random random = new Random(); + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + // 用于记录已生成的 categoryIndex,保证唯一性 + private static final Set generatedCategoryIndexes = new HashSet<>(); + + /** + * 生成一条订单记录,并确保 categoryIndex 唯一 + */ + public static String generateOrder() { + int categoryIndex; + // 确保生成的 categoryIndex 未被使用过 + do { + categoryIndex = random.nextInt(CATEGORIES.length); + } while (generatedCategoryIndexes.contains(categoryIndex)); + + // 添加到已生成集合 + generatedCategoryIndexes.add(categoryIndex); + + String category = CATEGORIES[categoryIndex]; + String name = PRODUCT_NAMES[categoryIndex][random.nextInt(10)]; + int quantity = random.nextInt(10) + 1; + String date = sdf.format(new Date()); + int rating = new Random().nextInt(3) * 50; // 0, 50, 100 + String isValid = random.nextBoolean() ? "Y" : "N"; + + return String.join("\t", + category, + name, + String.valueOf(quantity), + date, + String.valueOf(rating), + isValid); + } + + /** + * 清空已生成的 categoryIndex 记录,以便下一轮生成 + */ + public static void resetCategoryIndexTracker() { + generatedCategoryIndexes.clear(); + } +} diff --git a/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java new file mode 100644 index 0000000..2ce910d --- /dev/null +++ b/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -0,0 +1,934 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.nativeio; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.util.PerformanceAdvisory; +import org.apache.hadoop.util.Shell; +import sun.misc.Unsafe; + +import java.io.*; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * JNI wrappers for various native IO-related calls not available in Java. + * These functions should generally be used alongside a fallback to another + * more portable mechanism. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class NativeIO { + public static class POSIX { + // Flags for open() call from bits/fcntl.h + public static final int O_RDONLY = 00; + public static final int O_WRONLY = 01; + public static final int O_RDWR = 02; + public static final int O_CREAT = 0100; + public static final int O_EXCL = 0200; + public static final int O_NOCTTY = 0400; + public static final int O_TRUNC = 01000; + public static final int O_APPEND = 02000; + public static final int O_NONBLOCK = 04000; + public static final int O_SYNC = 010000; + public static final int O_ASYNC = 020000; + public static final int O_FSYNC = O_SYNC; + public static final int O_NDELAY = O_NONBLOCK; + + // Flags for posix_fadvise() from bits/fcntl.h + /* No further special treatment. */ + public static final int POSIX_FADV_NORMAL = 0; + /* Expect random page references. */ + public static final int POSIX_FADV_RANDOM = 1; + /* Expect sequential page references. */ + public static final int POSIX_FADV_SEQUENTIAL = 2; + /* Will need these pages. */ + public static final int POSIX_FADV_WILLNEED = 3; + /* Don't need these pages. */ + public static final int POSIX_FADV_DONTNEED = 4; + /* Data will be accessed once. */ + public static final int POSIX_FADV_NOREUSE = 5; + /* Wait upon writeout of all pages + in the range before performing the + write. */ + public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1; + /* Initiate writeout of all those + dirty pages in the range which are + not presently under writeback. */ + public static final int SYNC_FILE_RANGE_WRITE = 2; + /* Wait upon writeout of all pages in + the range after performing the + write. */ + public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4; + private static final Log LOG = LogFactory.getLog(NativeIO.class); + private static boolean nativeLoaded = false; + private static boolean fadvisePossible = true; + private static boolean syncFileRangePossible = true; + static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY = + "hadoop.workaround.non.threadsafe.getpwuid"; + static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true; + private static long cacheTimeout = -1; + private static CacheManipulator cacheManipulator = new CacheManipulator(); + public static CacheManipulator getCacheManipulator() { + return cacheManipulator; + } + public static void setCacheManipulator(CacheManipulator cacheManipulator) { + POSIX.cacheManipulator = cacheManipulator; + } + /** + * Used to manipulate the operating system cache. + */ + @VisibleForTesting + public static class CacheManipulator { + public void mlock(String identifier, ByteBuffer buffer, + long len) throws IOException { + POSIX.mlock(buffer, len); + } + public long getMemlockLimit() { + return NativeIO.getMemlockLimit(); + } + public long getOperatingSystemPageSize() { + return NativeIO.getOperatingSystemPageSize(); + } + public void posixFadviseIfPossible(String identifier, + FileDescriptor fd, long offset, long len, int flags) + throws NativeIOException { + POSIX.posixFadviseIfPossible(identifier, fd, offset, + len, flags); + } + public boolean verifyCanMlock() { + return NativeIO.isAvailable(); + } + } + /** + * A CacheManipulator used for testing which does not actually call mlock. + * This allows many tests to be run even when the operating system does not + * allow mlock, or only allows limited mlocking. + */ + @VisibleForTesting + public static class NoMlockCacheManipulator extends CacheManipulator { + public void mlock(String identifier, ByteBuffer buffer, + long len) throws IOException { + LOG.info("mlocking " + identifier); + } + public long getMemlockLimit() { + return 1125899906842624L; + } + public long getOperatingSystemPageSize() { + return 4096; + } + public boolean verifyCanMlock() { + return true; + } + } + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + Configuration conf = new Configuration(); + workaroundNonThreadSafePasswdCalls = conf.getBoolean( + WORKAROUND_NON_THREADSAFE_CALLS_KEY, + WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT); + initNative(); + nativeLoaded = true; + cacheTimeout = conf.getLong( + CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY, + CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) * + 1000; + LOG.debug("Initialized cache for IDs to User/Group mapping with a " + + " cache timeout of " + cacheTimeout/1000 + " seconds."); + } catch (Throwable t) { + // This can happen if the user has an older version of libhadoop.so + // installed - in this case we can continue without native IO + // after warning + PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t); + } + } + } + /** + * Return true if the JNI-based native IO extensions are available. + */ + public static boolean isAvailable() { + return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; + } + private static void assertCodeLoaded() throws IOException { + if (!isAvailable()) { + throw new IOException("nativeio was not loaded"); + } + } + /** Wrapper around open(2) */ + public static native FileDescriptor open(String path, int flags, int mode) throws IOException; + /** Wrapper around fstat(2) */ + private static native Stat fstat(FileDescriptor fd) throws IOException; + /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */ + private static native void chmodImpl(String path, int mode) throws IOException; + public static void chmod(String path, int mode) throws IOException { + if (!Shell.WINDOWS) { + chmodImpl(path, mode); + } else { + try { + chmodImpl(path, mode); + } catch (NativeIOException nioe) { + if (nioe.getErrorCode() == 3) { + throw new NativeIOException("No such file or directory", + Errno.ENOENT); + } else { + LOG.warn(String.format("nativeio.chmod error (%d): %s", + nioe.getErrorCode(), nioe.getMessage())); + throw new NativeIOException("Unknown error", Errno.UNKNOWN); + } + } + } + } + /** Wrapper around posix_fadvise(2) */ + static native void posix_fadvise( + FileDescriptor fd, long offset, long len, int flags) throws NativeIOException; + /** Wrapper around sync_file_range(2) */ + static native void sync_file_range( + FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException; + /** + * Call posix_fadvise on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + * + * @throws NativeIOException if there is an error with the syscall + */ + static void posixFadviseIfPossible(String identifier, + FileDescriptor fd, long offset, long len, int flags) + throws NativeIOException { + if (nativeLoaded && fadvisePossible) { + try { + posix_fadvise(fd, offset, len, flags); + } catch (UnsupportedOperationException uoe) { + fadvisePossible = false; + } catch (UnsatisfiedLinkError ule) { + fadvisePossible = false; + } + } + } + /** + * Call sync_file_range on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + * + * @throws NativeIOException if there is an error with the syscall + */ + public static void syncFileRangeIfPossible( + FileDescriptor fd, long offset, long nbytes, int flags) + throws NativeIOException { + if (nativeLoaded && syncFileRangePossible) { + try { + sync_file_range(fd, offset, nbytes, flags); + } catch (UnsupportedOperationException uoe) { + syncFileRangePossible = false; + } catch (UnsatisfiedLinkError ule) { + syncFileRangePossible = false; + } + } + } + static native void mlock_native( + ByteBuffer buffer, long len) throws NativeIOException; + /** + * Locks the provided direct ByteBuffer into memory, preventing it from + * swapping out. After a buffer is locked, future accesses will not incur + * a page fault. + * + * See the mlock(2) man page for more information. + * + * @throws NativeIOException + */ + static void mlock(ByteBuffer buffer, long len) + throws IOException { + assertCodeLoaded(); + if (!buffer.isDirect()) { + throw new IOException("Cannot mlock a non-direct ByteBuffer"); + } + mlock_native(buffer, len); + } + + /** + * Unmaps the block from memory. See munmap(2). + * + * There isn't any portable way to unmap a memory region in Java. + * So we use the sun.nio method here. + * Note that unmapping a memory region could cause crashes if code + * continues to reference the unmapped code. However, if we don't + * manually unmap the memory, we are dependent on the finalizer to + * do it, and we have no idea when the finalizer will run. + * + * @param buffer The buffer to unmap. + */ + public static void munmap(MappedByteBuffer buffer) { + if (buffer instanceof sun.nio.ch.DirectBuffer) { + sun.misc.Cleaner cleaner = + ((sun.nio.ch.DirectBuffer)buffer).cleaner(); + cleaner.clean(); + } + } + + /** Linux only methods used for getOwner() implementation */ + private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; + private static native String getUserName(long uid) throws IOException; + + /** + * Result type of the fstat call + */ + public static class Stat { + private int ownerId, groupId; + private String owner, group; + private int mode; + + // Mode constants + public static final int S_IFMT = 0170000; /* type of file */ + public static final int S_IFIFO = 0010000; /* named pipe (fifo) */ + public static final int S_IFCHR = 0020000; /* character special */ + public static final int S_IFDIR = 0040000; /* directory */ + public static final int S_IFBLK = 0060000; /* block special */ + public static final int S_IFREG = 0100000; /* regular */ + public static final int S_IFLNK = 0120000; /* symbolic link */ + public static final int S_IFSOCK = 0140000; /* socket */ + public static final int S_IFWHT = 0160000; /* whiteout */ + public static final int S_ISUID = 0004000; /* set user id on execution */ + public static final int S_ISGID = 0002000; /* set group id on execution */ + public static final int S_ISVTX = 0001000; /* save swapped text even after use */ + public static final int S_IRUSR = 0000400; /* read permission, owner */ + public static final int S_IWUSR = 0000200; /* write permission, owner */ + public static final int S_IXUSR = 0000100; /* execute/search permission, owner */ + + Stat(int ownerId, int groupId, int mode) { + this.ownerId = ownerId; + this.groupId = groupId; + this.mode = mode; + } + + Stat(String owner, String group, int mode) { + if (!Shell.WINDOWS) { + this.owner = owner; + } else { + this.owner = stripDomain(owner); + } + if (!Shell.WINDOWS) { + this.group = group; + } else { + this.group = stripDomain(group); + } + this.mode = mode; + } + + @Override + public String toString() { + return "Stat(owner='" + owner + "', group='" + group + "'" + + ", mode=" + mode + ")"; + } + + public String getOwner() { + return owner; + } + public String getGroup() { + return group; + } + public int getMode() { + return mode; + } + } + + /** + * Returns the file stat for a file descriptor. + * + * @param fd file descriptor. + * @return the file descriptor file stat. + * @throws IOException thrown if there was an IO error while obtaining the file stat. + */ + public static Stat getFstat(FileDescriptor fd) throws IOException { + Stat stat = null; + if (!Shell.WINDOWS) { + stat = fstat(fd); + stat.owner = getName(IdCache.USER, stat.ownerId); + stat.group = getName(IdCache.GROUP, stat.groupId); + } else { + try { + stat = fstat(fd); + } catch (NativeIOException nioe) { + if (nioe.getErrorCode() == 6) { + throw new NativeIOException("The handle is invalid.", + Errno.EBADF); + } else { + LOG.warn(String.format("nativeio.getFstat error (%d): %s", + nioe.getErrorCode(), nioe.getMessage())); + throw new NativeIOException("Unknown error", Errno.UNKNOWN); + } + } + } + return stat; + } + + private static String getName(IdCache domain, int id) throws IOException { + Map idNameCache = (domain == IdCache.USER) + ? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE; + String name; + CachedName cachedName = idNameCache.get(id); + long now = System.currentTimeMillis(); + if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) { + name = cachedName.name; + } else { + name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id); + if (LOG.isDebugEnabled()) { + String type = (domain == IdCache.USER) ? "UserName" : "GroupName"; + LOG.debug("Got " + type + " " + name + " for ID " + id + + " from the native implementation"); + } + cachedName = new CachedName(name, now); + idNameCache.put(id, cachedName); + } + return name; + } + + static native String getUserName(int uid) throws IOException; + static native String getGroupName(int uid) throws IOException; + + private static class CachedName { + final long timestamp; + final String name; + + public CachedName(String name, long timestamp) { + this.name = name; + this.timestamp = timestamp; + } + } + + private static final Map USER_ID_NAME_CACHE = + new ConcurrentHashMap(); + + private static final Map GROUP_ID_NAME_CACHE = + new ConcurrentHashMap(); + + private enum IdCache { USER, GROUP } + + public final static int MMAP_PROT_READ = 0x1; + public final static int MMAP_PROT_WRITE = 0x2; + public final static int MMAP_PROT_EXEC = 0x4; + + public static native long mmap(FileDescriptor fd, int prot, + boolean shared, long length) throws IOException; + + public static native void munmap(long addr, long length) + throws IOException; + } + + private static boolean workaroundNonThreadSafePasswdCalls = false; + + + public static class Windows { + // Flags for CreateFile() call on Windows + public static final long GENERIC_READ = 0x80000000L; + public static final long GENERIC_WRITE = 0x40000000L; + + public static final long FILE_SHARE_READ = 0x00000001L; + public static final long FILE_SHARE_WRITE = 0x00000002L; + public static final long FILE_SHARE_DELETE = 0x00000004L; + + public static final long CREATE_NEW = 1; + public static final long CREATE_ALWAYS = 2; + public static final long OPEN_EXISTING = 3; + public static final long OPEN_ALWAYS = 4; + public static final long TRUNCATE_EXISTING = 5; + + public static final long FILE_BEGIN = 0; + public static final long FILE_CURRENT = 1; + public static final long FILE_END = 2; + + public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L; + + /** + * Create a directory with permissions set to the specified mode. By setting + * permissions at creation time, we avoid issues related to the user lacking + * WRITE_DAC rights on subsequent chmod calls. One example where this can + * occur is writing to an SMB share where the user does not have Full Control + * rights, and therefore WRITE_DAC is denied. + * + * @param path directory to create + * @param mode permissions of new directory + * @throws IOException if there is an I/O error + */ + public static void createDirectoryWithMode(File path, int mode) + throws IOException { + createDirectoryWithMode0(path.getAbsolutePath(), mode); + } + + /** Wrapper around CreateDirectory() on Windows */ + private static native void createDirectoryWithMode0(String path, int mode) + throws NativeIOException; + + /** Wrapper around CreateFile() on Windows */ + public static native FileDescriptor createFile(String path, + long desiredAccess, long shareMode, long creationDisposition) + throws IOException; + + /** + * Create a file for write with permissions set to the specified mode. By + * setting permissions at creation time, we avoid issues related to the user + * lacking WRITE_DAC rights on subsequent chmod calls. One example where + * this can occur is writing to an SMB share where the user does not have + * Full Control rights, and therefore WRITE_DAC is denied. + * + * This method mimics the semantics implemented by the JDK in + * {@link FileOutputStream}. The file is opened for truncate or + * append, the sharing mode allows other readers and writers, and paths + * longer than MAX_PATH are supported. (See io_util_md.c in the JDK.) + * + * @param path file to create + * @param append if true, then open file for append + * @param mode permissions of new directory + * @return FileOutputStream of opened file + * @throws IOException if there is an I/O error + */ + public static FileOutputStream createFileOutputStreamWithMode(File path, + boolean append, int mode) throws IOException { + long desiredAccess = GENERIC_WRITE; + long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE; + long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS; + return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(), + desiredAccess, shareMode, creationDisposition, mode)); + } + + /** Wrapper around CreateFile() with security descriptor on Windows */ + private static native FileDescriptor createFileWithMode0(String path, + long desiredAccess, long shareMode, long creationDisposition, int mode) + throws NativeIOException; + + /** Wrapper around SetFilePointer() on Windows */ + public static native long setFilePointer(FileDescriptor fd, + long distanceToMove, long moveMethod) throws IOException; + + /** Windows only methods used for getOwner() implementation */ + private static native String getOwner(FileDescriptor fd) throws IOException; + + /** Supported list of Windows access right flags */ + public static enum AccessRight { + ACCESS_READ (0x0001), // FILE_READ_DATA + ACCESS_WRITE (0x0002), // FILE_WRITE_DATA + ACCESS_EXECUTE (0x0020); // FILE_EXECUTE + + private final int accessRight; + AccessRight(int access) { + accessRight = access; + } + + public int accessRight() { + return accessRight; + } + }; + + /** Windows only method used to check if the current process has requested + * access rights on the given path. */ + private static native boolean access0(String path, int requestedAccess); + + /** + * Checks whether the current process has desired access rights on + * the given path. + * + * Longer term this native function can be substituted with JDK7 + * function Files#isReadable, isWritable, isExecutable. + * + * @param path input path + * @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE + * @return true if access is allowed + * @throws IOException I/O exception on error + */ + public static boolean access(String path, AccessRight desiredAccess) + throws IOException { + + return true; +// return access0(path, desiredAccess.accessRight()); + } + + /** + * Extends both the minimum and maximum working set size of the current + * process. This method gets the current minimum and maximum working set + * size, adds the requested amount to each and then sets the minimum and + * maximum working set size to the new values. Controlling the working set + * size of the process also controls the amount of memory it can lock. + * + * @param delta amount to increment minimum and maximum working set size + * @throws IOException for any error + * @see POSIX#mlock(ByteBuffer, long) + */ + public static native void extendWorkingSetSize(long delta) throws IOException; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + initNative(); + nativeLoaded = true; + } catch (Throwable t) { + // This can happen if the user has an older version of libhadoop.so + // installed - in this case we can continue without native IO + // after warning + PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t); + } + } + } + } + + private static final Log LOG = LogFactory.getLog(NativeIO.class); + + private static boolean nativeLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + initNative(); + nativeLoaded = true; + } catch (Throwable t) { + // This can happen if the user has an older version of libhadoop.so + // installed - in this case we can continue without native IO + // after warning + PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t); + } + } + } + + /** + * Return true if the JNI-based native IO extensions are available. + */ + public static boolean isAvailable() { + return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; + } + + /** Initialize the JNI method ID and class ID cache */ + private static native void initNative(); + + /** + * Get the maximum number of bytes that can be locked into memory at any + * given point. + * + * @return 0 if no bytes can be locked into memory; + * Long.MAX_VALUE if there is no limit; + * The number of bytes that can be locked into memory otherwise. + */ + static long getMemlockLimit() { + return isAvailable() ? getMemlockLimit0() : 0; + } + + private static native long getMemlockLimit0(); + + /** + * @return the operating system's page size. + */ + static long getOperatingSystemPageSize() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe)f.get(null); + return unsafe.pageSize(); + } catch (Throwable e) { + LOG.warn("Unable to get operating system page size. Guessing 4096.", e); + return 4096; + } + } + + private static class CachedUid { + final long timestamp; + final String username; + public CachedUid(String username, long timestamp) { + this.timestamp = timestamp; + this.username = username; + } + } + private static final Map uidCache = + new ConcurrentHashMap(); + private static long cacheTimeout; + private static boolean initialized = false; + + /** + * The Windows logon name has two part, NetBIOS domain name and + * user account name, of the format DOMAIN\UserName. This method + * will remove the domain part of the full logon name. + * + * @param Fthe full principal name containing the domain + * @return name with domain removed + */ + private static String stripDomain(String name) { + int i = name.indexOf('\\'); + if (i != -1) + name = name.substring(i + 1); + return name; + } + + public static String getOwner(FileDescriptor fd) throws IOException { + ensureInitialized(); + if (Shell.WINDOWS) { + String owner = Windows.getOwner(fd); + owner = stripDomain(owner); + return owner; + } else { + long uid = POSIX.getUIDforFDOwnerforOwner(fd); + CachedUid cUid = uidCache.get(uid); + long now = System.currentTimeMillis(); + if (cUid != null && (cUid.timestamp + cacheTimeout) > now) { + return cUid.username; + } + String user = POSIX.getUserName(uid); + LOG.info("Got UserName " + user + " for UID " + uid + + " from the native implementation"); + cUid = new CachedUid(user, now); + uidCache.put(uid, cUid); + return user; + } + } + + /** + * Create a FileInputStream that shares delete permission on the + * file opened, i.e. other process can delete the file the + * FileInputStream is reading. Only Windows implementation uses + * the native interface. + */ + public static FileInputStream getShareDeleteFileInputStream(File f) + throws IOException { + if (!Shell.WINDOWS) { + // On Linux the default FileInputStream shares delete permission + // on the file opened. + // + return new FileInputStream(f); + } else { + // Use Windows native interface to create a FileInputStream that + // shares delete permission on the file opened. + // + FileDescriptor fd = Windows.createFile( + f.getAbsolutePath(), + Windows.GENERIC_READ, + Windows.FILE_SHARE_READ | + Windows.FILE_SHARE_WRITE | + Windows.FILE_SHARE_DELETE, + Windows.OPEN_EXISTING); + return new FileInputStream(fd); + } + } + + /** + * Create a FileInputStream that shares delete permission on the + * file opened at a given offset, i.e. other process can delete + * the file the FileInputStream is reading. Only Windows implementation + * uses the native interface. + */ + public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset) + throws IOException { + if (!Shell.WINDOWS) { + RandomAccessFile rf = new RandomAccessFile(f, "r"); + if (seekOffset > 0) { + rf.seek(seekOffset); + } + return new FileInputStream(rf.getFD()); + } else { + // Use Windows native interface to create a FileInputStream that + // shares delete permission on the file opened, and set it to the + // given offset. + // + FileDescriptor fd = Windows.createFile( + f.getAbsolutePath(), + Windows.GENERIC_READ, + Windows.FILE_SHARE_READ | + Windows.FILE_SHARE_WRITE | + Windows.FILE_SHARE_DELETE, + Windows.OPEN_EXISTING); + if (seekOffset > 0) + Windows.setFilePointer(fd, seekOffset, Windows.FILE_BEGIN); + return new FileInputStream(fd); + } + } + + /** + * Create the specified File for write access, ensuring that it does not exist. + * @param f the file that we want to create + * @param permissions we want to have on the file (if security is enabled) + * + * @throws AlreadyExistsException if the file already exists + * @throws IOException if any other error occurred + */ + public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions) + throws IOException { + if (!Shell.WINDOWS) { + // Use the native wrapper around open(2) + try { + FileDescriptor fd = POSIX.open(f.getAbsolutePath(), + POSIX.O_WRONLY | POSIX.O_CREAT + | POSIX.O_EXCL, permissions); + return new FileOutputStream(fd); + } catch (NativeIOException nioe) { + if (nioe.getErrno() == Errno.EEXIST) { + throw new AlreadyExistsException(nioe); + } + throw nioe; + } + } else { + // Use the Windows native APIs to create equivalent FileOutputStream + try { + FileDescriptor fd = Windows.createFile(f.getCanonicalPath(), + Windows.GENERIC_WRITE, + Windows.FILE_SHARE_DELETE + | Windows.FILE_SHARE_READ + | Windows.FILE_SHARE_WRITE, + Windows.CREATE_NEW); + POSIX.chmod(f.getCanonicalPath(), permissions); + return new FileOutputStream(fd); + } catch (NativeIOException nioe) { + if (nioe.getErrorCode() == 80) { + // ERROR_FILE_EXISTS + // 80 (0x50) + // The file exists + throw new AlreadyExistsException(nioe); + } + throw nioe; + } + } + } + + private synchronized static void ensureInitialized() { + if (!initialized) { + cacheTimeout = + new Configuration().getLong("hadoop.security.uid.cache.secs", + 4*60*60) * 1000; + LOG.info("Initialized cache for UID to User mapping with a cache" + + " timeout of " + cacheTimeout/1000 + " seconds."); + initialized = true; + } + } + + /** + * A version of renameTo that throws a descriptive exception when it fails. + * + * @param src The source path + * @param dst The destination path + * + * @throws NativeIOException On failure. + */ + public static void renameTo(File src, File dst) + throws IOException { + if (!nativeLoaded) { + if (!src.renameTo(dst)) { + throw new IOException("renameTo(src=" + src + ", dst=" + + dst + ") failed."); + } + } else { + renameTo0(src.getAbsolutePath(), dst.getAbsolutePath()); + } + } + + public static void link(File src, File dst) throws IOException { + if (!nativeLoaded) { + HardLink.createHardLink(src, dst); + } else { + link0(src.getAbsolutePath(), dst.getAbsolutePath()); + } + } + + /** + * A version of renameTo that throws a descriptive exception when it fails. + * + * @param src The source path + * @param dst The destination path + * + * @throws NativeIOException On failure. + */ + private static native void renameTo0(String src, String dst) + throws NativeIOException; + + private static native void link0(String src, String dst) + throws NativeIOException; + + /** + * Unbuffered file copy from src to dst without tainting OS buffer cache + * + * In POSIX platform: + * It uses FileChannel#transferTo() which internally attempts + * unbuffered IO on OS with native sendfile64() support and falls back to + * buffered IO otherwise. + * + * It minimizes the number of FileChannel#transferTo call by passing the the + * src file size directly instead of a smaller size as the 3rd parameter. + * This saves the number of sendfile64() system call when native sendfile64() + * is supported. In the two fall back cases where sendfile is not supported, + * FileChannle#transferTo already has its own batching of size 8 MB and 8 KB, + * respectively. + * + * In Windows Platform: + * It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING + * flag, which is supported on Windows Server 2008 and above. + * + * Ideally, we should use FileChannel#transferTo() across both POSIX and Windows + * platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0) + * used by FileChannel#transferTo for unbuffered IO is not implemented on Windows. + * Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0 + * on Windows simply returns IOS_UNSUPPORTED. + * + * Note: This simple native wrapper does minimal parameter checking before copy and + * consistency check (e.g., size) after copy. + * It is recommended to use wrapper function like + * the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy + * checks. + * + * @param src The source path + * @param dst The destination path + * @throws IOException + */ + public static void copyFileUnbuffered(File src, File dst) throws IOException { + if (nativeLoaded && Shell.WINDOWS) { + copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath()); + } else { + FileInputStream fis = null; + FileOutputStream fos = null; + FileChannel input = null; + FileChannel output = null; + try { + fis = new FileInputStream(src); + fos = new FileOutputStream(dst); + input = fis.getChannel(); + output = fos.getChannel(); + long remaining = input.size(); + long position = 0; + long transferred = 0; + while (remaining > 0) { + transferred = input.transferTo(position, remaining, output); + remaining -= transferred; + position += transferred; + } + } finally { + IOUtils.cleanup(LOG, output); + IOUtils.cleanup(LOG, fos); + IOUtils.cleanup(LOG, input); + IOUtils.cleanup(LOG, fis); + } + } + } + + private static native void copyFileUnbuffered0(String src, String dst) + throws NativeIOException; +} \ No newline at end of file diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..a5fbf3d --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,10 @@ +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Spark ???? +spark.org.apache.spark=WARN +spark.streaming=INFO \ No newline at end of file