Skip to content

Feature. Copy method #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"io"
"io/ioutil"
"net"
"os"
"os/user"
Expand Down Expand Up @@ -42,17 +41,17 @@ type Client struct {
// The NamenodeDialFunc and DatanodeDialFunc options can be used to set
// connection timeouts:
//
// dialFunc := (&net.Dialer{
// Timeout: 30 * time.Second,
// KeepAlive: 30 * time.Second,
// DualStack: true,
// }).DialContext
// dialFunc := (&net.Dialer{
// Timeout: 30 * time.Second,
// KeepAlive: 30 * time.Second,
// DualStack: true,
// }).DialContext
//
// options := ClientOptions{
// Addresses: []string{"nn1:9000"},
// NamenodeDialFunc: dialFunc,
// DatanodeDialFunc: dialFunc,
// }
// options := ClientOptions{
// Addresses: []string{"nn1:9000"},
// NamenodeDialFunc: dialFunc,
// DatanodeDialFunc: dialFunc,
// }
type ClientOptions struct {
// Addresses specifies the namenode(s) to connect to.
Addresses []string
Expand Down Expand Up @@ -104,35 +103,35 @@ type ClientOptions struct {
// suitable for creating a Client. Currently this sets the following fields
// on the resulting ClientOptions:
//
// // Determined by fs.defaultFS (or the deprecated fs.default.name), or
// // fields beginning with dfs.namenode.rpc-address.
// Addresses []string
// // Determined by fs.defaultFS (or the deprecated fs.default.name), or
// // fields beginning with dfs.namenode.rpc-address.
// Addresses []string
//
// // Determined by dfs.client.use.datanode.hostname.
// UseDatanodeHostname bool
// // Determined by dfs.client.use.datanode.hostname.
// UseDatanodeHostname bool
//
// // Set to a non-nil but empty client (without credentials) if the value of
// // hadoop.security.authentication is 'kerberos'. It must then be replaced
// // with a credentialed Kerberos client.
// KerberosClient *krb.Client
// // Set to a non-nil but empty client (without credentials) if the value of
// // hadoop.security.authentication is 'kerberos'. It must then be replaced
// // with a credentialed Kerberos client.
// KerberosClient *krb.Client
//
// // Determined by dfs.namenode.kerberos.principal, with the realm
// // (everything after the first '@') chopped off.
// KerberosServicePrincipleName string
// // Determined by dfs.namenode.kerberos.principal, with the realm
// // (everything after the first '@') chopped off.
// KerberosServicePrincipleName string
//
// // Determined by dfs.data.transfer.protection or dfs.encrypt.data.transfer
// // (in the latter case, it is set to 'privacy').
// DataTransferProtection string
// // Determined by dfs.data.transfer.protection or dfs.encrypt.data.transfer
// // (in the latter case, it is set to 'privacy').
// DataTransferProtection string
//
// Because of the way Kerberos can be forced by the Hadoop configuration but not
// actually configured, you should check for whether KerberosClient is set in
// the resulting ClientOptions before proceeding:
//
// options := ClientOptionsFromConf(conf)
// if options.KerberosClient != nil {
// // Replace with a valid credentialed client.
// options.KerberosClient = getKerberosClient()
// }
// options := ClientOptionsFromConf(conf)
// if options.KerberosClient != nil {
// // Replace with a valid credentialed client.
// options.KerberosClient = getKerberosClient()
// }
func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options := ClientOptions{Addresses: conf.Namenodes()}

Expand Down Expand Up @@ -258,7 +257,30 @@ func (c *Client) ReadFile(filename string) ([]byte, error) {
}

defer f.Close()
return ioutil.ReadAll(f)
return io.ReadAll(f)
}

// Copy copies the HDFS file specified by src to the HDFS file at dst.
// If dst already exists, it will be overwritten.
func (c *Client) Copy(src string, dst string) error {
dstFile, err := c.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()

srcFile, err := c.Open(src)
if err != nil {
return err
}

_, err = io.Copy(dstFile, srcFile)
if err != nil {
srcFile.Close()
return err
}

return srcFile.Close()
}

// CopyToLocal copies the HDFS file specified by src to the local file at dst.
Expand Down
19 changes: 16 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package hdfs

import (
"fmt"
"io/ioutil"
"io/io"
"os"
"os/user"
"path/filepath"
Expand Down Expand Up @@ -169,18 +169,31 @@ func TestReadFile(t *testing.T) {
assert.EqualValues(t, "bar\n", string(bytes))
}

func TestCopy(t *testing.T) {
client := getClient(t)

err := client.Copy("/_test/foo.txt", "/_test/foo2.txt")
require.NoError(t, err)

f, err := client.Open("/_test/foo2.txt")
require.NoError(t, err)

bytes, _ := io.ReadAll(f)
assert.EqualValues(t, "bar\n", string(bytes))
}

func TestCopyToLocal(t *testing.T) {
client := getClient(t)

dir, _ := ioutil.TempDir("", "hdfs-test")
dir, _ := io.TempDir("", "hdfs-test")
tmpfile := filepath.Join(dir, "foo.txt")
err := client.CopyToLocal("/_test/foo.txt", tmpfile)
require.NoError(t, err)

f, err := os.Open(tmpfile)
require.NoError(t, err)

bytes, _ := ioutil.ReadAll(f)
bytes, _ := io.ReadAll(f)
assert.EqualValues(t, "bar\n", string(bytes))
}

Expand Down